Хорошие лабораторные по многопоточности (простые, понятные, нетривиальные и полезные в народном хозяйстве) — большая редкость. Предлагаю Вам одно условие и четыре лабораторные работы по элементарной многопоточности на Java.
Условия
Это реализация однопоточного побайтового копировальщика из InputStream в OutputStream. Копирование происходит в потоке вызвавшем метод copy(...)
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class CopyUtil {
public static void copy(InputStream src, OutputStream dst)throws IOException{
try (InputStream src0 = src; OutputStream dst0 = dst) {
int b;
while ((b = src.read()) != -1) {
dst.write(b);
}
}
}
}
Это реализация однопоточного копировальщика массивами из InputStream в OutputStream. Копирование происходит в потоке вызвавшем метод copy(...)
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class CopyUtil {
public static void copy(InputStream src, OutputStream dst)throws IOException{
byte[] buff = new byte[128];
try (InputStream src0 = src; OutputStream dst0 = dst) {
int count;
while ((count = src.read(buff)) != -1) {
dst.write(buff, 0, count);
}
}
}
}
Это реализация многопоточного копировальщика массивами из InputStream в OutputStream. Мы заводим на чтение и на запись по отдельному новому потоку и соединяем их блокирующей ограниченной очередью для передачи данных от читателя к писателю
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
public class CopyUtil {
public static void copy(final InputStream src, final OutputStream dst) throws IOException {
// reader-to-writer byte[]-channel
final BlockingQueue<byte[]> buffer = new ArrayBlockingQueue<>(64);
// exception-channel from reader/writer threads?
final AtomicReference<Throwable> ex = new AtomicReference<>();
final ThreadGroup group = new ThreadGroup("read-write") {
public void uncaughtException(Thread t, Throwable e) {ex.set(e);}
};
// reader from 'src'
Thread reader = new Thread(group, () -> {
try (InputStream src0 = src) { // 'src0' for auto-closing
while (true) {
byte[] data = new byte[128]; // new data buffer
int count = src.read(data, 1, 127); // read up to 127 bytes
data[0] = (byte) count; // 0-byte is length-field
buffer.put(data); // send to writer
if (count == -1) {break;} // src empty
}
} catch (Exception e) {group.interrupt();} // interrupt writer
});
reader.start();
// writer to 'dst'
Thread writer = new Thread(group, () -> {
try (OutputStream dst0 = dst) { // 'dst0' for auto-closing
while (true) {
byte[] data = buffer.take(); // get new data from reader
if (data[0] == -1) {break;} // its last data
dst.write(data, 1, data[0]); //
}
} catch (Exception e) {group.interrupt();} // interrupt writer
});
writer.start();
// wait to complete read/write operations
try {
reader.join(); // wait for reader
writer.join(); // wait for writer
} catch (InterruptedException e) {throw new IOException(e);}
if (ex.get() != null) {throw new IOException(ex.get());}
}
}
Для проверки корректности копирования можно использовать следующий тест
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
public class Test {
public static void main(String[] args) throws IOException {
Random rnd = new Random(0);
byte[] testData = new byte[64 * 1024];
rnd.nextBytes(testData);
ByteArrayOutputStream dst = new ByteArrayOutputStream();
CopyUtil.copy(new ByteArrayInputStream(testData), dst);
if (!Arrays.equals(testData, dst.toByteArray())) {
throw new AssertionError("Lab decision wrong!");
} else {
System.out.println("OK!");
}
}
}
Задание #1
В последнем двупоточном решении мы стартуем два потока — для чтения и для записи. Перепишите код, что бы чтение осуществлялось в новом потоке, а запись производилась потоком, вызвавшим copy(...). Кстати, тогда можно будет избавиться от пары join-ов, так как поток на принимающем конце буфера знает, когда закончились данные.
Задание #2
В последнем двупоточном решении читатель постоянно создает новые byte[]-буфера, передает их писателю, а тот отправляет на съедение GC. Создайте отдельную обратную очередь пустых буферов от писателя к читателю.
Задание #3
Во всех трех примерах кода мы реализовывали передачу данных от одного читателя — одному писателю. Реализуйте многопоточное решение передачи данных от одного читателя — многим писателям. Все писатели получают идентичные данные. Читатель и писатели работают каждый в своем отдельном потоке. Не создавайте отдельные копии данных для каждого писателя — пусть писатели читают из одних на всех буферов, но храните эти буфера одновременно в разных очередях (от читателя к каждому писателю тянется отдельная очередь).
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class CopyUtil {
public static void copy(InputStream src, OutputStream ... dst) throws IOException {
// some code
}
}
Задание #4
Сделайте предыдущее задание #3 но образуйте не топологию 'звезда', где в центре читатель и от него исходят лучи к писателям, а топологию 'кольцо'. В которой читатель и писатели выстраиваются в круг и передают буфер по кругу. Читатель — первому писателю, первый писатель — второму,… последний писатель — читателю. И после чего читатель может использовать буфер повторно.
Контакты
Я занимаюсь разработкой курса программирования по Java Core (online-курс).
email: GolovachCourses@gmail.com
skype: GolovachCourses
Автор: IvanGolovach