В этой статье я опишу две абстракции-классы, написанные средствами nodejs, которые предоставляет функционал распределения запросов по открытым каналам (tcp-socket). При этом учитывается общая загруженность системы и, если каналов не хватает, открываются новые, по мере уменьшения общего количества запросов — «лишние» каналы закрываются.
Этот балансировщик можно использовать для распределения запросов по каналам, которые представляют собой по сути net.Socket. Для этого нужно внести изменения в метод по открытию и закрытию канала, добавлению запроса в канал.
В примере, который я опишу, используется библиотека pg, предоставляющая функционал по открытию сокетов к серверу с базой данных. При этом дефолтовое управление пулом коннектов, предоставляемое библиотекой, никак не используется.
Для начала рассмотрим класс Connect, с помощью которого будет осуществляться управление одной сущностью — коннектом:
/* Конструктор класса коннект, в качестве аргумента строка формата "pg://USER:PASSWORD@HOST:PORT/DATABASE" */
function Connect(connString) {
// сохраняем параметры в свойстве объекта
this._connString = connString;
// свойство отвечающее, за запуск обработки запросов
this._isRun = false;
// максимальное количество запросов помещенных в сокет, после которого будет вызвано событие "maxCount"
this._maxQueryCount = 100;
// служебное свойство, используемое в методе _nextTick
this._worked = false;
// количество запросов, висящих на коннекте
this._queryCount = 0;
// "движок" класса
this._emitter = new (require('events').EventEmitter);
// делаем "селфи"
var self = this;
// на открытие коннекта создаем обработчик "open", в котором регистрируем массив коннектов
this._emitter.on('open', function() {
self._arrayQuery = [];
});
// на событие ошибки будет сгенерирована ошибка, которая если не навесить обработчик, повалит выполнение скрипта
this._emitter.on('error', function(err) {
throw err;
});
// на событие достижения лимита этого коннекта, пометим его флагом
this._emitter.on('maxCount', function(message) {
self._setMax = true;
});
// при создании экземпляра класса открываем коннект до базы, здесь может быть открытие любого коннекта,
// который представляет собой по сути net.Socket
pg.connect(this._connString, function(err, client, done) {
if (err) {
return self._emitter.emit('error', err);
}
// запишем в "внутреннее" свойство ссылку на клиент, который общается с базой
self._client = client;
// "мягкое закрытие" клиента
self._done = done;
// вызываем событие готовности (передаем событие далее по цепочке)
self._emitter.emit('open');
});
}
/* метод, который предоставляет функционал по "навешиванию" обработчиков на события */
Connect.prototype.on = function(typeEvent, func) {
if(typeEvent == 'error') {
// если это обработчик на ошибки подменяем стандартный обработчик пользовательским
this._emitter.removeAllListeners('error');
}
this._emitter.addListener(typeEvent, func);
};
/* метод, которые запускает работу по обработке запросов */
Connect.prototype.start = function() {
this._isRun = true;
this._nextTick();
};
/* метод, которые останавливает работу по обработке запросов */
Connect.prototype.stop = function() {
this._isRun = false;
};
/* метод, возвращающий состоянии коннекта (заполнен оли он) */
Connect.prototype.isFull = function() {
return this._setMax;
};
/*
метод, закрывающий мягко коннект
(т.е. если на коннекте висят запросы, программа дождется их выполнения и закроет коннект)
*/
Connect.prototype.close = function () {
if(this._done) {
this._emitter.emit('close');
this._done();
} else {
this._emitter.emit('error', new Error('connect is not active'));
}
};
/* метод, возвращающий массив обрабатываемых запросов */
Connect.prototype.queryQueue = function () {
return this._arrayQuery;
};
/*
главный рабочий метод класса - добавление запроса.
В качестве аргументов сам запрос в виде строки, параметры запроса, коллбэк на завершении запроса
*/
Connect.prototype.addQuery = function (query, params, cb) {
if(!(typeof query == 'string')) {
return this._emitter.emit('error', new Error('not valid query'));
}
if( !(typeof params == "object") || !(params instanceof Array) ) {
return this._emitter.emit('error', new Error('not valid argument'));
}
this._queryCount++;
this._arrayQuery.push({ query: query, params: params, callback: cb });
if(this._queryCount > this._maxQueryCount) {
this._emitter.emit('maxCount', 'in queue added too many requests, the waiting time increases');
}
this._nextTick();
};
/* метод по манипулированию максимальным количеством запросов в коннекте */
Connect.prototype.maxQueryCount = function (count) {
if(count) {
this._maxQueryCount = count;
} else {
return this._maxQueryCount;
}
};
/* возвращает количество обрабатываемых запросов */
Connect.prototype.queryCount = function () {
return this._queryCount;
};
/*
внутренний метод класса, ответственный за выполнение запросов,
в данном случае реализован вариант, когда все запросы сразу отправляются
к базе, возможна реализация в случае с последовательным выполнением
запросы хранятся во внутреннем буффере (массиве _arrayQuery)
и отправляются к базе по мере выполнения предыдущего
*/
Connect.prototype._nextTick = function() {
var self = this;
if(this._worked) {
return;
}
while(this._isRun && this._arrayQuery.length>0) {
this._worked = true;
var el = this._arrayQuery.shift();
// здесь используется синтаксис библиотеки pg, к которой мы привязаны
this._client.query(el.query, el.params, function(err, result) {
self._queryCount--;
if(err) {
return el.callback(err);
}
el.callback(null, result);
if(self._queryCount==0) {
self._emitter.emit('drain');
self._setMax = false;
}
})
}
this._worked = false;
};
Теперь непосредственно класс Balanser, который будет управлять нашими коннектами: открывать новые, закрывать лишние, распределять между ними запросы, предоставлять единый вход для сервиса
/* конструктор класса балансировщика, который будет распределять запросы */
function Balanser(minCountConnect, maxCountConnect) {
// записываем в свойство максимальный предел открытых коннектов до базы
this._maxCountConnect = maxCountConnect;
// записываем в свойство минимальный предел открытых коннектов до базы
this._minCountConnect = minCountConnect;
// массив коннектов
this._connectArray = [];
// закрываемые коннекты
this._closedConnect = [];
// массив задач
this._taskArray = [];
// служебный флаг
this._run = false;
// движок класса
this._emitter = new (require('events').EventEmitter);
// запускаем инициализацию
this._init();
}
/* метод инициализации класса, открывающий коннекты последовательно, один за другим */
Balanser.prototype._init = function() {
this._cursor = 0;
this.activQuery = 0;
var self = this;
var i=0;
// рекурсивный вызов функции, добавляющей новый коннект
var cycle = function() {
i++;
if(i<self._minCountConnect) {
self._addNewConnect(cycle);
} else {
self._emitter.emit('ready');
}
};
this._addNewConnect(cycle);
};
/* собственно метод, открывающий соединение, используем класс коннекта */
Balanser.prototype._addNewConnect = function(cb) {
var self = this;
var connect = new GPSconnect(connString);
connect.on('open', function() {
self._connectArray.push(connect);
cb();
});
};
/* метод, по проверке "загруженности" коннекта и возвращающий индекс коннекта */
Balanser.prototype._cycle = function(pos) {
for (var i=pos;i<this._connectArray.length;i++) {
if( !(this._connectArray[i].isFull()) )
break;
}
return i;
};
/* метод, заполняющий коннект запросами */
Balanser.prototype._next = function(connect, el) {
connect.addQuery(el.query, el.params, el.cb);
connect.start();
this._distribution();
};
/*
Главный метод класса - распределяет запросы между коннектами.
Распределение проходит по принципу "Round-robin" с проверкой на загруженность коннекта.
Это нужно в случае, если какой то запрос оказался "тяжелым",
чтобы снять нагрузку с этого коннекта и перераспределить запросы на другие коннекты
код оформлен конечно криво, надеюсь в скором времени поправить
*/
Balanser.prototype._distribution = function() {
if(this._taskArray.length>0) {
var el = this._taskArray.shift();
this._cursor = this._cycle(this._cursor);
var self = this;
if(this._cursor<this._connectArray.length) {
var connect = this._connectArray[this._cursor];
this._next(connect, el);
this._cursor++;
} else {
this._cursor = this._cycle(0);
if(this._cursor<this._connectArray.length) {
var connect = this._connectArray[this._cursor];
this._next(connect, el);
this._cursor++;
} else if( this._connectArray.length<this._maxCountConnect) {
self._addNewConnect(function() {
self._cursor = self._connectArray.length-1;
var connect = self._connectArray[self._cursor];
self._next(connect, el);
});
} else {
for (var i=0;i<this._connectArray.length;i++) {
if( this.activQuery/this._connectArray.length > this._connectArray[i].queryCount() ) {
break;
}
}
if(i==this._connectArray.length) {
i = 0;
}
this._cursor = i;
var connect = this._connectArray[this._cursor];
this._next(connect, el);
}
}
} else {
this._run = false;
}
};
/* метод, который предоставляет функционал по "навешиванию" обработчиков на события */
Balanser.prototype.on = function(typeEvent, func) {
this._emitter.addListener(typeEvent, func);
};
/*
метод, вызываемый для проверки количества открытых коннектов, и если необходимости в таком количестве нет
"лишние" коннекты исключается из системы распределения
*/
Balanser.prototype._removeLoad = function() {
var self = this;
var temp = this._connectArray[0].maxQueryCount().toFixed();
var currentCount = (this.activQuery/temp < this._minCountConnect) ? this._minCountConnect : temp;
if(currentCount< this._connectArray.length ) {
while( this._connectArray.length != currentCount ) {
var poppedConnect = this._connectArray.pop();
if(poppedConnect.queryCount()==0) {
poppedConnect.close();
} else {
poppedConnect.index = self._closedConnect.length;
poppedConnect.on('drain', function() {
poppedConnect.close();
self._closedConnect.slice(poppedConnect.index, 1);
});
self._closedConnect.push(poppedConnect);
}
}
}
};
/*
Cобственно метод, который предоставляет вход-трубу, через который добавляются все запросы.
Параметр tube, возможно использовать для дифференсации запросов между собой,
пока он никак не используется.
*/
Balanser.prototype.addQuery = function(tube, query, params, cb) {
this.activQuery++;
var self = this;
this._removeLoad();
var wrappCb = function() {
self.activQuery--;
cb.apply(this, arguments);
};
this._taskArray.push({ query: query, params: params, cb: wrappCb });
if(!this._run) {
this._run = true;
this._distribution();
}
};
Как все это проверить? Для тестирования я использую запрос «select pg_sleep(1)», который выполняется 1 секунду и имитирует запрос к базе.
10 000 таких запросов обрабатывались балансировщиком ~101590 ms, при максимальном количестве запросов на коннект равным 100 и границах общего количества каналов=сокетов от 10 до 100.
Использованный скрипт:
var balancer = new Balanser(10,100);
balancer.on('ready', function() {
var y=0;
var time = +new Date();
for(var i=0;i<10000; i++) {
balancer.addQuery('gps', 'select pg_sleep(1)', [], function(err, result) {
if(err) console.log(err);
y++;
if(y==10000) {
console.log(balancer._connectArray.length);
console.log(+new Date()-time);
}
});
}
});
Все исходники доступны на гитхабе.
Балансировщик еще, конечно, сырой, многое нужно допилить/переписать, так что прошу сильно не ругать. Если нужно, могу заняться им плотнее.
Автор: stalehard