Доброго времени суток
У меня на работе возник спор между мной и дотнетчиками насчет потоков в новой версии Node.JS и необходимости их синхронизоровать. Для начала решили выбрать задачу о параллельной записи строк в файл. Тема с worker_threads горячая, прошу под кат.
Немного о самих потоках. Они являются экспериментальной технологией в Node.JS 10.5.0, и для того, чтобы иметь доступ к модулю «worker_threads», необходимо запускать наше Node.JS приложение с флагом "--experimental-worker". Я прописал этот флаг в start скрипте в файле package.json:
{
"name": "worker-test",
"version": "1.0.0",
"description": "",
"main": "app.js",
"scripts": {
"start": "node --experimental-worker app.js "
},
"author": "",
"license": "ISC"
}
Теперь о самой логике. Главный поток порождает N рабочих потоков, все они пишут с каким-то интервалом в файл. В отличие от всех примеров, где главные и дочерние потоки стартуют с одного файла, я отделил потоки в отдельный, мне это кажется более чистым и элегантным.
Собственно, код.
Главный файл app.js — точка входа.
const { Worker } = require('worker_threads');
const path = require('path');
console.log('Hello from main!');
for (var i = 0; i < 10; i++) {
const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
}
Здесь мы просто создаем дочерние потоки используя класс Worker и указывая путь к стартовому файлу для потока './writer-worker-app/app.js'. При создании потока передаем самописный айдишник как данные workerData.
Стартовый файл для потока ./writer-worker-app/app.js:
const { workerData } = require('worker_threads');
const logger = require('./logger');
setInterval(() => {
logger.log(`Hello from worker number ${workerData.id}rn`);
}, 100);
Ну и простейший класс-логер: ./writer-worker-app/logger.js
const fs = require('fs');
function log(message) {
return fs.appendFileSync('./my-file.txt', message);
}
module.exports = {
log
};
При запуске этого приложения мы все надеялись на то, что в итоге получим кашу в файле и дотнетчики закричат, как нужны блокировки с семафорами и прочими радостями параллельного исполнения. Но нет! В файле все строки идут не прерываясь, разве что в случайном порядке:
Hello from worker number 2
Hello from worker number 1
Hello from worker number 0
Hello from worker number 4
Hello from worker number 3
Hello from worker number 2
Hello from worker number 1
Hello from worker number 9
Hello from worker number 0
Замечательный эксперимент, очередная маленькая победа Ноды :-) Моё предположение в том, что вся синхронизация происходит на уровне IO потоков Ноды, но буду рад узнать в комментариях правильный вариант. На всякий случай мы проверили работу, используя не fs.appendFileSync, а fs.createWriteStream и метода stream.write.
Результат вышел такой же.
Но мы на этом не остановились.
Коллега предложил задачу о синхронизации потоков. Для нашего конкретного примера, пусть это будет задача последовательной записи в файл в порядке возврастания айдишников. Сначала пишет первый поток, потом второй, потом третий и так далее.
Для этого я ввёл еще один поток-Менеджер. Можно было обойтись главным, но мне так приятно создавать этих изолированных рабочих и выстраивать общение посредством сообщений. Прежде чем начать писать имплементацию потока-Менеджера, необходимо создать канал связи между ним и писателями-рабочими. Для этого был использован класс MessageChannel. Инстансы этого класса имеют два поля: port1 и port2, каждый из которых умеет слушать и отправлять сообщения другому посредством методов .on('message') и .postMessage(). Этот класс и был создан в рамках модуля «worker_threads» для коммуникации между потоками, потому что обычно при передачи объекта происходит просто его клонирование, и в изолированной среде выполнения потока он будет бесполезен.
Для коммуникации между 2 потоками мы каждому должны дать по порту.
Интересный факт: на 10.5.0 невозможно передать порт через конструктор воркера, необходимо это делать только через worker.postMessage(), причем обязательно указывая порт в transferList параметре!
Сам поток-менеджер будет отсылать команды потокам-писателям в порядке возрастания их идентификаторов, причем следующую команду он отправит только после получения ответа писателя об успешной операции.
Недо-UML-диаграмма приложения:
Наш видоизмененный главный файл ./app.js:
const { Worker, MessageChannel } = require('worker_threads');
const path = require('path');
console.log('Main app initialized and started.');
const workersMeta = [];
for (var i = 0; i < 10; i++) {
const channel = new MessageChannel();
const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
workersMeta.push({ id: i, worker, channel });
}
workersMeta.forEach(({ worker, channel }) => {
worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]);
})
const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js'));
const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 }));
orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port));
console.log('All worker threads have been initialized');
Здесь мы сначала создаем воркеров, потом каждому отправляем порт для связи с менеджером (и только так, через конструктор это сделать невозможно).
Потом создаем поток-менеджер, отправляем ему список портов для связи с потоками-писателями.
Изменим и поведение потока-писателя, чтобы он отправлял сообщение только когда ему скажут, а также возвращал результат, когда операция записи закончена:
./writer-worer-app/app.js
const { workerData, parentPort } = require('worker_threads');
const logger = require('./logger');
const id = workerData.id;
console.log(`Worker ${id} initializad.`);
parentPort.on('message', value => {
const orchestratorPort = value.orchestratorPort;
orchestratorPort.on('message', data => {
if (data.command == 'write') {
console.log(`Worker ${id} received write command`);
sendMessage();
sendResult(orchestratorPort);
}
});
console.log(`Worker ${id} started.`);
});
function sendMessage() {
logger.log(`Hello from worker number ${workerData.id}rn`);
}
function sendResult(port) {
port.postMessage({ id, status: 'completed' });
}
Мы правильно проинициализировались от сообщение родительского потока, начали случать канал потока-менеджера, при получении команды сначала пишем в файл, потом отправляем результат. Нужно заметить, что в файл пишется синхронно, поэтому sendResult() вызывается сразу за sendMessage().
Всё, что осталось — написать имплементацию нашего умного менеджера
./orchestrator-worker-app/app.js:
const { parentPort } = require('worker_threads');
console.log('Orchestrator initialized.')
const intervalTime = 500;
let workerPorts;
parentPort.on('message', (value) => {
workerPorts = value.workerPorts.sort((a, b) => a.id > b.id);
workerPorts.forEach(wp => wp.port.on('message', handleResponse));
console.log('Orchestrator started.');
sendCommand(workerPorts[0]);
});
function handleResponse(status) {
const responseWorkerId = status.id;
let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1);
if (!nextWorker) {
nextWorker = workerPorts[0];
}
setTimeout(() => sendCommand(nextWorker), intervalTime);
}
function sendCommand(worker) {
worker.port.postMessage({ command: 'write' });
}
Получили список портов, упорядочили, для каждого порта установили колбек на респонз, ну и отправили команду первому. В самом колбеке ищем следующего писателя и отправляем команду ему. Чтобы не сильно напрягать систему, был установлен интервал между командами.
Вот и всё, наше многопоточное приложение с управлением потоков готово. Мы научились не просто порождать воркеры-потоки в Node.JS, но и создавать эффективные способы коммуникации между ними. На мой личный взгляд, архитектура изолированных потоков в Node.JS с ожиданием и отправкой сообщений более чем удобная и перспективная. Всем спасибо за внимание.
Весь исходный код может быть найден здесь.
Автор: LoserKiss