Добрый день, Хабрахабр.
Предисловие
Была довольно простая задача: получить набор документов из базы, каждый документ преобразовать и отправить пользователю все преобразованные документы, порядок их менять нельзя, для обработки документа используется асинхронная функция. Если на каком-то документе вылезла ошибка — документы мы не отправляем, только ошибку и заканчиваем обработку документов.
Для решения задачи была выбрана библиотека Q, так как сам поход Promise мне симпатичен. Но возникла одна загвоздка, вроде бы элементарная задача, а выполняется больше секунды, а точнее 1300 мс, вместо ожидаемых 50-80 мс. Дабы разобраться, как все устроено и проникнуться асинхронностью было решено написать специализированный «велосипед» под данную задачу.
Как было устроена работа с Q
В первую очередь, хотелось бы рассказать, как это было реализовано изначально.
1. Процедура, которая проходила последовательно по массиву и возвращала нам promise.
function forEachSerial(array, func) {
var tickFunction = function (el, index, callback) {
if (func.length == 2)
func(el, callback);
else if (func.length == 3)
func(el, index, callback);
}
var functions = [];
array.forEach(function (el, index) {
functions.push(Q.nfcall(tickFunction, el, index));
});
return Q.all(functions);
}
2. Процедура, которая обрабатывала документ и возвращала нам promise.
function documentToJSON(el, options) {
var obj = el.toObject(options);
var deferred = Q.defer();
var columns = ['...','...','...'];
forEachSerial(columns,function (el, next) {
el.somyAsyncFunction(options, function (err, result) {
if (err)
next(err);
else result.somyAsyncFunction2(options, function (err, result) {
if (!err)
obj[el] = result;
next(err);
});
});
}).then(function () {
deferred.resolve(obj);
}, function (err) {
deferred.reject(err);
});
return deferred.promise;
}
3. И головная процедура, отправляющая результат пользователю
exports.get = function (req, res, next) {
dbModel.find(search, {}, { skip: from, limit: limit }).sort({column1: -1}).exec(function (err, result) {
if (err)
next(new errorsHandlers.internalError());
else {
var result = [];
forEachSerial(result,function (el, index, next) {
documentToJSON(el,options).then(function (obj) {
result[index] = obj;
next()
}, function (err) {
next(new errorsHandlers.internalError())
});
}).then(function () {
res.send({responce: result});
}, function (err) {
next(new errorsHandlers.internalError())
});
}
});
};
Кто-то может заметить, что всевозможных «обещаний» очень много даже там, где они не сильно нужны, и что такое большое зацикливание привело к такой скорости, но процедура отдает всего 20 простых документов, преобразования примитивны и выполнять их такое количество времени уже точно никуда не годиться.
Пишем свою библиотеку promise
Как вообще «это» работает
В сети полно описаний что и как. Опишу вкратце. Promise — это своего рода обещание. Какая-то функция нам обещает результат, получить его мы можем с помощью then(success, error), в свою очередь, при успешной обработке мы может назначить новый promise и так же обработать его. В частном случае это выглядит так:
Promise.then(step1).then(step2).then(step3).then(function () {
//All OK
}, function (err) {
//Error in any step
});
Результат каждого этапа передается как параметр в следующий и так последовательно. В итоге мы обрабатываем все ошибки в одном блоке и избавляемся от «лапши».
Внутри выглядит примерно так: создаются события, которые вызываются при успешном завершении или при ошибке:
var promise = fs.stat("foo");
promise.addListener("success", function (value) {
// ok
})
promise.addListener("error", function (error) {
// error
});
Все это можно почитать здесь.
Теория закончилась, приступим к практике.
Начнем с простого — Deferred
Задачей этого объекта будет создать нужные нам события и выдать Promise
function deferred() {
this.events = new EventEmitter(); //Объект события
this.promise = new promise(this); // Возвращаемый нам Promise
this.thenDeferred = []; //Последующие обработчики, нужны для того что бы передать ошибку дальше по цепочке
var self = this;
//Вызывается в успешном случае
this.resolve = function () {
self.events.emit('completed', arguments);
}
//Вызывается в случае ошибки
this.reject = function (error) {
self.events.emit('error', error);
//Передаем ошибку дальше по цепочке
self.thenDeferred.forEach(function (el) {
el.reject(error);
});
}
}
Объект — Promise
Задача его будет отслеживать события "completed" и "error" вызывать нужные функции, которые назначены через "then" и отслеживать что там вернула эта функция: если возвратила нам еще один promise то подключаться к нему для того что бы срабатывали следующие then, если просто данные, то выполнять последующие then, таким образом мы сможем строить цепочки из then.
function promise(def) {
this.def = def;
this.completed = false;
this.events = def.events;
var self = this;
var thenDeferred;
self._successListener = null;
self._errorListener = null;
//Результатом выполнения then - будет возвращаться новый promise
this.then = function (success, error) {
if (success)
self._successListener = success;
if (error)
self._errorListener = error;
thenDeferred = new deferred();
self.def.thenDeferred.push(thenDeferred);
return thenDeferred.promise;
}
//Обрабатываем успешное выполнение задачи
this.events.on('completed', function (result) {
// объекты, аргументы, массивы приводим к виду массива для передачи их в дальнейшем как атрибуты в функцию
var args = inputOfFunctionToArray(result);
//Если вдруг задача была уже выполнена, то дальше не проходим
if (self.completed) return;
self.completed = true;
if (self._successListener) {
var result;
try {
result = self._successListener.apply(self, args);
} catch (e) {
self.def.reject(e);
result;
}
//Если результатом функции Promise и есть последующие then, подключаемся к нему
var promise;
if (isPromise(result))
promise = result;
else if (result instanceof deferred)
promise = result.promise;
if (promise && thenDeferred) {
promise.then(function () {
var args = arguments;
process.nextTick(function () {
thenDeferred.resolve.apply(self, args);
});
}, function (error) {
process.nextTick(function () {
thenDeferred.reject(error);
});
});
} else if (thenDeferred)
process.nextTick(function () {
//Для скалярных параметров просто запускаем следующие then
thenDeferred.resolve.apply(self, [result]);
});
} else if (thenDeferred)
process.nextTick(function () {
thenDeferred.resolve.apply(self, []);
});
});
//Обрабатываем ошибки
this.events.on('error', function (error) {
if (self.completed) return;
self.completed = true;
if (self._errorListener)
process.nextTick(function () {
self._errorListener.apply(self, [error]);
});
});
}
Итак, базовая модель готова. Осталось сделать обвязку для функций с callback
PromiseFn
Его задача — сделать обертку для функции с callback с возможностью указания this и аргументов запуска
var promisefn = function (bind, fn) {
var def = new deferred();
//bind является не обязательным параметром
if (typeof bind === 'function' && !fn) {
fn = bind;
bind = def;
}
//Назначаем наш callback для данной функции
var callback = function (err) {
if (err)
def.reject(err);
else {
var args = [];
for (var key in arguments)
args.push(arguments[key]);
args.splice(0, 1);
def.resolve.apply(bind, args);
}
};
var result = function () {
var args = [];
for (var key in arguments)
args.push(arguments[key]);
args.push(callback);
process.nextTick(function () {
fn.apply(bind, args);
});
return def.promise;
}
return result;
}
И напоследок ALL — последовательное выполнение функций с callback
Тут все просто: нам передают массив функций, мы их обвязываем через promisefn и, когда они все выполнятся — вызываем resolve
var all = function (functions) {
var def = new deferred();
process.nextTick(function () {
var index = -1;
var result = [];
var next = function (err, arguments) {
if (err) {
def.reject(err);
return;
}
if (arguments) result.push(inputOfFunctionToArray(arguments));
index++;
if (index >= functions.length) {
def.resolve(result);
} else process.nextTick(function () {
promisefn(functions[index])().then(function () {
var args = arguments;
process.nextTick(function () {
next(err, args);
});
}, function (err) {
process.nextTick(function () {
next(err);
});
});
});
}
process.nextTick(next);
});
return def.promise;
}
В заключение
После тестирования старый подход (через библиотеку Q) был переписан заменено пару объявлений и запущен в тех же условиях. Результат положительный — 50-100 мс (вместо прежних 1300 мс).
Все исходники доступны на Github, там же можно найти и примеры. Изобретение «велосипедов» полезно хотя бы тем, что это улучшает понимание.
Спасибо за внимание!
Автор: a696385