Играем с потоками в Node.JS 10.5.0

в 7:43, , рубрики: javascript, multithreading, node.js, worker_threads

Доброго времени суток

Играем с потоками в Node.JS 10.5.0 - 1
У меня на работе возник спор между мной и дотнетчиками насчет потоков в новой версии Node.JS и необходимости их синхронизоровать. Для начала решили выбрать задачу о параллельной записи строк в файл. Тема с worker_threads горячая, прошу под кат.

Немного о самих потоках. Они являются экспериментальной технологией в Node.JS 10.5.0, и для того, чтобы иметь доступ к модулю «worker_threads», необходимо запускать наше Node.JS приложение с флагом "--experimental-worker". Я прописал этот флаг в start скрипте в файле package.json:

{
  "name": "worker-test",
  "version": "1.0.0",
  "description": "",
  "main": "app.js",
  "scripts": {
    "start": "node --experimental-worker app.js "
  },
  "author": "",
  "license": "ISC"
}

Теперь о самой логике. Главный поток порождает N рабочих потоков, все они пишут с каким-то интервалом в файл. В отличие от всех примеров, где главные и дочерние потоки стартуют с одного файла, я отделил потоки в отдельный, мне это кажется более чистым и элегантным.

Собственно, код.

Главный файл app.js — точка входа.

const { Worker } = require('worker_threads');
const path = require('path');

console.log('Hello from main!');

for (var i = 0; i < 10; i++) {
  const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
}

Здесь мы просто создаем дочерние потоки используя класс Worker и указывая путь к стартовому файлу для потока './writer-worker-app/app.js'. При создании потока передаем самописный айдишник как данные workerData.

Стартовый файл для потока ./writer-worker-app/app.js:

const { workerData } = require('worker_threads');
const logger = require('./logger');

setInterval(() => {
  logger.log(`Hello from worker number ${workerData.id}rn`);
}, 100);

Ну и простейший класс-логер: ./writer-worker-app/logger.js

const fs = require('fs');

function log(message) {
  return fs.appendFileSync('./my-file.txt', message);
}

module.exports = {
  log
};

При запуске этого приложения мы все надеялись на то, что в итоге получим кашу в файле и дотнетчики закричат, как нужны блокировки с семафорами и прочими радостями параллельного исполнения. Но нет! В файле все строки идут не прерываясь, разве что в случайном порядке:

Hello from worker number 2
Hello from worker number 1
Hello from worker number 0
Hello from worker number 4
Hello from worker number 3
Hello from worker number 2
Hello from worker number 1
Hello from worker number 9
Hello from worker number 0

Замечательный эксперимент, очередная маленькая победа Ноды :-) Моё предположение в том, что вся синхронизация происходит на уровне IO потоков Ноды, но буду рад узнать в комментариях правильный вариант. На всякий случай мы проверили работу, используя не fs.appendFileSync, а fs.createWriteStream и метода stream.write.

Результат вышел такой же.

Но мы на этом не остановились.

Коллега предложил задачу о синхронизации потоков. Для нашего конкретного примера, пусть это будет задача последовательной записи в файл в порядке возврастания айдишников. Сначала пишет первый поток, потом второй, потом третий и так далее.

Для этого я ввёл еще один поток-Менеджер. Можно было обойтись главным, но мне так приятно создавать этих изолированных рабочих и выстраивать общение посредством сообщений. Прежде чем начать писать имплементацию потока-Менеджера, необходимо создать канал связи между ним и писателями-рабочими. Для этого был использован класс MessageChannel. Инстансы этого класса имеют два поля: port1 и port2, каждый из которых умеет слушать и отправлять сообщения другому посредством методов .on('message') и .postMessage(). Этот класс и был создан в рамках модуля «worker_threads» для коммуникации между потоками, потому что обычно при передачи объекта происходит просто его клонирование, и в изолированной среде выполнения потока он будет бесполезен.

Для коммуникации между 2 потоками мы каждому должны дать по порту.

Интересный факт: на 10.5.0 невозможно передать порт через конструктор воркера, необходимо это делать только через worker.postMessage(), причем обязательно указывая порт в transferList параметре!

Сам поток-менеджер будет отсылать команды потокам-писателям в порядке возрастания их идентификаторов, причем следующую команду он отправит только после получения ответа писателя об успешной операции.

Недо-UML-диаграмма приложения:
Играем с потоками в Node.JS 10.5.0 - 2

Наш видоизмененный главный файл ./app.js:

const { Worker, MessageChannel } = require('worker_threads');
const path = require('path');

console.log('Main app initialized and started.');

const workersMeta = [];

for (var i = 0; i < 10; i++) {
  const channel = new MessageChannel();
  const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
  workersMeta.push({ id: i, worker, channel });
}

workersMeta.forEach(({ worker, channel }) => {
  worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]);
})

const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js'));
const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 }));
orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port));

console.log('All worker threads have been initialized');

Здесь мы сначала создаем воркеров, потом каждому отправляем порт для связи с менеджером (и только так, через конструктор это сделать невозможно).

Потом создаем поток-менеджер, отправляем ему список портов для связи с потоками-писателями.

Изменим и поведение потока-писателя, чтобы он отправлял сообщение только когда ему скажут, а также возвращал результат, когда операция записи закончена:
./writer-worer-app/app.js

const { workerData, parentPort } = require('worker_threads');
const logger = require('./logger');

const id = workerData.id;

console.log(`Worker ${id} initializad.`);

parentPort.on('message', value => {
  const orchestratorPort = value.orchestratorPort;
  orchestratorPort.on('message', data => {
    if (data.command == 'write') {
      console.log(`Worker ${id} received write command`);
      sendMessage();
      sendResult(orchestratorPort);
    }
  });
  console.log(`Worker ${id} started.`);
});


function sendMessage() {
  logger.log(`Hello from worker number ${workerData.id}rn`);
}

function sendResult(port) {
  port.postMessage({ id, status: 'completed' });
}

Мы правильно проинициализировались от сообщение родительского потока, начали случать канал потока-менеджера, при получении команды сначала пишем в файл, потом отправляем результат. Нужно заметить, что в файл пишется синхронно, поэтому sendResult() вызывается сразу за sendMessage().

Всё, что осталось — написать имплементацию нашего умного менеджера
./orchestrator-worker-app/app.js:

const { parentPort } = require('worker_threads');

console.log('Orchestrator initialized.')

const intervalTime = 500;

let workerPorts;

parentPort.on('message', (value) => {
  workerPorts = value.workerPorts.sort((a, b) => a.id > b.id);
  workerPorts.forEach(wp => wp.port.on('message', handleResponse));
  console.log('Orchestrator started.');
  sendCommand(workerPorts[0]);
});

function handleResponse(status) {
  const responseWorkerId = status.id;
  let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1);
  if (!nextWorker) {
    nextWorker = workerPorts[0];
  }
  setTimeout(() => sendCommand(nextWorker), intervalTime);
}

function sendCommand(worker) {
  worker.port.postMessage({ command: 'write' });
}

Получили список портов, упорядочили, для каждого порта установили колбек на респонз, ну и отправили команду первому. В самом колбеке ищем следующего писателя и отправляем команду ему. Чтобы не сильно напрягать систему, был установлен интервал между командами.

Вот и всё, наше многопоточное приложение с управлением потоков готово. Мы научились не просто порождать воркеры-потоки в Node.JS, но и создавать эффективные способы коммуникации между ними. На мой личный взгляд, архитектура изолированных потоков в Node.JS с ожиданием и отправкой сообщений более чем удобная и перспективная. Всем спасибо за внимание.

Весь исходный код может быть найден здесь.

Автор: LoserKiss

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js