Существует типичная проблема в большом классе задач, которая возникает при обработке потока сообщений:
— нельзя пропихнуть большого слона через маленькую трубу, или другими словами, обработка сообщений не успевает «проглотить» все сообщения.
При этом существуют некоторые ограничения на поток данных:
- поток не равномерный и состоит из событий разного типа
- количество типов событий заранее не известно, но некоторое конечное число
- каждый тип события имеет свою актуальность во времени
- все типы событий имеют равный приоритет
На диаграмме приведён пример разрешения проблемы: нагребатор(tm), работающий на нитке T1, в то время как разгребатор(tm) работает на нитке T2
- за время обработки события типа A успевают прийти новые события как типа B, так и A
- после обработки события типа B необходимо обработать наиболее актуальное событие типа A
Т.о. стоит задача о выполнении задач по ключу, так, что выполняется только самая актуальная из всех задач по данному ключу.
На суд публике представляется созданный нами ThrottlingExecutor.
Замечание терминологии: stream есть поток данных, тогда как thread есть нитка или нить выполнения. И не стоит путать потоки с нитками.
Замечание 1: проблема осложняется ещё тем, что может быть несколько нагребаторов(tm), при этом каждый нагребатор(tm) может порождать только события одного типа; с другой стороны есть потребность в нескольких (конечно же, для простоты можно выбрать N=1) разгребаторах(tm).
Замечание 2: мало того, что данный код должен работать в многопоточной (конкурентной) среде — т.е то самое множество нагребаторов(tm) — разгребаторов(tm), код должен работать с максимальной производительностью и низкими latency. Резонно к этим всем качествам добавить ещё и свойство garbage less.
И почти в каждом проекте так или иначе возникает эта задача, и каждый её решает по разному, но все они либо не эффективны, либо медленны, либо и то, и другое вместе взятое.
Небольшое лирическое отступление.
На мой взгляд задача очень интересная, вполне практическая и более того — из нашей специфики работы. И именно поэтому, мы задаём нашим кандидатам на собеседовании. Однако мы не просим буквально закодить всё от и до, а построить общий дизайн решения, по возможности освещая ключевые моменты решения кусками кода. После нескольких месяцев собеседований мы таки решили воплотить идеи в виде кода на java.
Впрочем, код, пока не может быть открыт публично, поэтому идеи общие и могут быть применены и воплощены на многих других языках программирования.
Поскольку всё уже закожено, и быть может осталось навести небольшой марафет, опишу ключевые моменты решения.
Итак, оглядевшись по сторонам мы не нашлись как сделать то, что хотим высокоуровневыми структурами данных доступных в jdk, поэтому будем конструировать из самых базовых блоков.
Картина дизайна в целом:
- т.к. нет необходимости хранить все полученные значения, а нужны только самые актуальные — то хорошо бы хранить пары ключ-значение в некотором ассоциативном массиве, перетирая старые значения новыми
- бежать по ассоциативному массиву, забирать (помечая ячейку спец. значением, например, null) самые свежие значения и отдавать в обработчик.
Ассоциативный массив
Ключевой аспект: хранение пары ключ-значение. Можно пренебречь порядком хранения, выигрывая при этом по скорости обновления — т.о. напрашивается использование hash структуры, сложность операций которой O(1).
Негативный эффект на производительность hash структуры оказывает коллизия по hash кодам на выбранном размере. Два самых распостранённых метода разрешения коллизий:
- цепочки — каждый элемент массива представляет из себя связанный список, который хранит элементы с одинаковыми (с точностью до модуля размера массива) hash кодами
- открытая адресация — при возникновении коллизии происходит поиск первой свободной ячейки за (или перед) ячейкой соответствующей hash коду. Как правило в пользу производительности ограничивают количество проб свободной ячейки. Когда превышено число проб нахождения свободной ячейки для вставки нового элемента, производят расширение массива.
Метод цепочек более стабилен, т.к. не спотыкается о проблему плохого распределения hash кодов, в то время как открытая адресация явно не переживёт, если все ключи будут иметь hash код равный, например, некоторой константе. С другой стороны открытая адресация имеет существенно меньшие накладные расходы на память.
Ещё один плюс в пользу открытой адресации — cache locality — данные в массиве лежат последовательно в памяти и так же последовательно будут загружены в cpu cache, т.о. быстрая последовательная итерация в отличии от использования цепочек, где указатели на связанные списки как-то разбросаны по памяти.
Исходя из общих принципов адекватности применения, можно смело рассчитывать, что функция hash кодов не будет вырожденной и выбор ложится на открытую адресацию.
Теперь рассмотрим элемент массива:
atomic refs
Поскольку есть требование работы в многопоточной и высоконагруженной среде, то ни к чему возится с synchronized-блоками, а работать через Compare-And-Swap, поэтому каждый элемент представляет из себя расширение AtomicReference с ключём:
static class Entry<K> extends AtomicReference {
final K key;
public Entry(final K key){
this.key = key;
}
}
atomic array ref
Другим заходом, чтобы обеспечить консистентность и атомарность памяти — использовать не массив AtomicReference-ов, а AtomicReferenceArray.
Мотивы — меньше дополнительных накладных расходов на занимаемую память, последовательность расположения в памяти. При этом существенно сложнее становится схема по расширению/сжатию массива.
Card mark
Основная идея: итерироваться не по всем элементам, а только по измённым элементам (в идеале), или сегментам (состоящим из небольшого разумного числа элементов), которые содержат изменённые элементы.
Simple card mark
Простой в реализации подход, использующий AtomicLong в качестве маски, позволяет покрывать до 64 изменённых сегментов.
Однако, при размере массива уже в 4096 элементов, сегмент состоит из 64 элементов, т.о. потенциально, можно совершить немало пустых чтений.
Binary heap card mark
Следующим шагом хотелось расширить число сегментов, сохранив при этом компактность хранения в памяти и простоту обхода. При этом выбранная структура данных должна быть удобна и с точки зрения wait strategy.
С этих точек зрения очень удобна двоичная куча — нулевой элемент указывает на то были ли изменения вообще или нет (если нет, то можно и погрызть камень / заснуть = применить стратегию ожидания ) и уже последующие элементы указывают на изменённые сегменты исходного массива.
Так, при наличии второго уровня и размере массива в 4096 элементов, сегмент содержит ровно один элемент.
Т.о. simple card mark должен быть хорош при малых размерах массива, а binary heap card mark при больших размерах.
Wait Strategy
Еще один аспект и прямая отсылка к D., которую нельзя не упомянуть в связке с card mark — это применение стратегии ожидания изменённых сегментов: не стоит сразу впадать в состояние пассивного ожидания (т.е вызывать wait на мониторе) если нет изменений, вполне возможно, что удастся получить изменение активно poll'я card mark.
Например, busy spin опрашивает в цикле корневой элемент card mark-а — если есть изменения, выходим — обрабатываем элементы. Если нет — продолжаем цикл. Ограничив цикл, например, сотней попыток, впадаем в состояние пассивного ожидания по wait, и пусть нас разбудит нагребатор(tm), увидев, что card mark-а был кристально чист и пуст.
Впрочем, стратегия может меняться в каждом конкретном случае.
уловки и ухищрения
- поддерживая размеры массива и binary heap как 2K можно избежать использования операций деления, умножения и mod и использовать более дешёвые битовые аналоги: сдвиг вправо K, сдвиг влево K и применение битовых маск (& ((1 << K) — 1)) соответственно
- основная, и самая частая операция в стабильной фазе — операция замещения/replace происходит только за счёт CAS для найденного элемента массива без захвата какого-либо монитора
- вычитывая значение из ячейки или card mark вместо безусловного get-and-set(0) иногда (при высокой разряжённости массива) разумней сделать test-n-get-n-set, дабы избежать излишнего (и более дорогого) volatile write
- расширение/сужение массива (в контексте корректности JMM) относится к начальной фазе
- volatile ссылка на массив — быстрый volatile read
- обновление volatile ссылки при помощи AtomicReferenceFieldUpdater магии
- учитывая, что количество ключей ограничено, можно масштабирование массива производить под монитором
В финале графики распределения latency:
- 1 нагребатор(tm) — 1 разгребатор(tm)
- 700 уникальных ключей, hash коллизии < 1%
- разогрев jvm: 20000 первых итераций игнорируются при измерении latency
- задача, переданная в executor: зафиксировать время (в мкс) прохождения через executor — т.е зафиксировать latency
Гистограмма распределения latency в ThrottlingExecutor,
простой card mark, размер 4096
Гистограмма распределения latency в ThrottlingExecutor,
card mark на двоичной куче, размер 4096
В сильно разреженном массиве:
Гистограмма распределения latency в ThrottlingExecutor,
простой card mark, размер 16384
Гистограмма распределения latency в ThrottlingExecutor,
card mark на двоичной куче, размер 16384
До конца найти всех причин столь разного поведения массива атомарных ссылок и атомарного массива мне не удалось (ожидали скорее увидеть, что атомарный массив с двоичной кучей будет работать куда лучше).
Возможно, и очень надеюсь, когда получится открыть код, мы найдём ошибки или получим ответы.
P.S. Стоит отметить, что испытывая в этом benchmark-е существовавшие ранее решения ThrottlingExecutor-ов, они давали почти ровное распределение до 300 мкс.
Автор: vladimir_dolzhenko