Недавно был озадачен проблемой мониторинга нескольких десятков серверов (ну наверно редко кто не сталкивался с такой задачей). Проблему можно описать несколькими правилами:
- Нужно периодически пинговать сервер
- Иногда выполнять какое-либо действие с сервером (например, исполнение команды через ssh), которое засабмитил пользователь
- Действия с серверами могут нескольких типов, у каждого действия свой приоритет
- Таски (из п.1-3) нельзя выполнять одновременно для каждого сервера
- Таски могут завершаться с неудачей, например по причине отсутствия связи с сервером, нужно ждать пока связь восстановится и пытатся выполнить запланированную задачу
Первое решение, которое приходит большинству в голову — запустить для каждого сервера свой поток и там делать свои дела. Это неплохо, но что делать если в процессе мониторинга набор серверов будет меняться? Запускать и завершать потоки в процессе мониторинга как-то неэлегантно. А что делать если серверов тысяча? Иметь тысячу потоков наверно можно, но зачем это делать когда большинство времени поток простаивает и ждет своего времени для очередного пинга?
На данную проблему можно взглянуть с другой стороны и представить ее в виде классической задачи «producer-consumer». У нас есть продюсеры, которые производят таски (пинг, команда ssh) и у нас есть консьюмеры, которые эти таски исполняют. Разумеется продюсеров и консьюмеров у нас не по одну экземпляру. Решить нашу задачу «producer-consumer» в JAVA не просто, а очень просто используя классы PriorityQueue и ExecutorService.
Начнем, как обычно, с юнит-теста:
@Test
public void testOffer() {
PollServerQueue xq = new PollServerQueue();
xq.addTask(new MyTask(1, 11));
xq.addTask(new MyTask(2, 12));
xq.addTask(new MyTask(1, 13));
MyTask t1 = (MyTask)xq.poll();
assertEquals(1, t1.getServerId());
assertEquals(11, t1.getTaskId());
MyTask t2 = (MyTask)xq.poll();
assertEquals(2, t2.getServerId());
assertEquals(12, t2.getTaskId());
MyTask t3 = (MyTask)xq.poll();
assertEquals(null, t3);
xq.FinishTask(1);
MyTask t5 = (MyTask)xq.poll();
assertEquals(1, t5.getServerId());
assertEquals(13, t5.getTaskId());
}
В этом юнит-тесте мы добавили в нашу очередь три задачи типа MyTask (первый аргумент конструктора означает serverId, второй — taskId). Метод poll извлекает задачу из очереди. Если извлечь задачу не удалось (например, задачи кончились или в очереди остались задачи для серверов, для которых уже выполняются задачи) — метод poll возвращает null. Из кода видно, что завершение задачи для serverId=1 ведет к тому, что из очереди можно извлечь следующую задачу для данного сервера.
Ура! Юнит-тест написан, можно писать код. Нам потребуется:
- Структура данных (HashMap) для хранения текущих исполняемых задач для каждого сервера (currentTasks)
- Структура данных (HashMap) для хранения задачи, стоящих в очереди на исполнения. Для каждого сервера — своя очередь (waitingTasks)
- Структура данных (PriorityQueue) для последовательного опроса серверов. Необходимо, чтобы в следующий вызов poll() к нам приходила задача для другого сервера. Короче, структура типа револьвера, только пули после каждого выстрела остаются в барабане (peekOrder)
- Cтруктура (HashSet) для хранения и быстрого поиска идентификаторов серверов в револьвере, чтобы каждый раз не просматривать револьвер с первого до последнего элемента (servers)
- Простой объект для синхронизации (syncObject)
Теперь процедура извлечения таски из очереди будет простой и короткой. И хотя код получился компактным, публиковать его здесь я не вижу смысла, а отошлю вас на https://github.com/get-a-clue/PollServerQueueExample
Disclaimer: код на github'e не является законченным, в частности, в нем отсутствует возможность установки приоритетов для задач внутри очереди для каждого сервера и механизма обработки ошибок и возврата failed таски в очередь. Ну и сам код для пингования. Как говорится, меньше кода — лучше спишь. :)
Автор: getaclue