В данной статье вы найдете ответ на поставленный вопрос, а также в ней будут объяснены базовые понятия по технологии распределенных вычислений в Oracle Coherence. Это вводная статья, главной задачей которой является объяснение “словаря” терминов, которым пользуются разработчики Coherence. Я приведу термины в том числе и на английском языке, для облегчения поиска информации для тех, кто захочет узнать больше в этом направлении.
Для тех, кому эта тема интересна, прошу под кат
Итак, предположим, что у нас есть задача посчитать быстро какую нибудь задачу по большому объему данных. Что вообще значит “большой объем” данных? Это такой объем, загружать который на клиента не имеет смысла ввиду того, что клиент не сможет уместить на своей стороне все необходимые данные. Дилемма в том, как получить результат, без загрузки всего объема данных на клиента. Возможным путем решения будет сделать подмножества большого множества данных и собирать на клиенте промежуточные результаты в цикле. Такое решение хорошо всем, кроме того, что последовательное выполнение будет гораздо дольше, чем выполнение по всему множеству за один раз (время будет тратиться на запрос/ответ, подготовку подмножества данных и пересылку подмножеств данных на клиента для подсчета). Также, за время выполнения этого последовательного действия данные могут устареть. То есть интуитивно мы понимаем, что данные должны обрабатываться там, где лежат (без пересылки по сети), и притом одновременно по всему множеству.
Вот тут на подмогу приходят такие решения, как Oracle Coherence, Hadoop, Gemfire и т.д.
Давайте пройдёмся по азам Oracle Coherence.
Читаем документацию и видим следующее: “Oracle Coherence is an in memory data grid solution that enables …”.
“in memory” — это значит, что данные держатся в памяти компьютера (можно и на диске, но об этом потом).
“data grid solution” — это значит, что данные распределены по кластеру и не сконцентрированы в одном месте.
Но обо всем по порядку. Давайте сначала поймем, какие “строительные блоки” имеются для реализации поставленных задач.
Нод
Нод это просто java процесс (запустивший класс com.tangosol.net.DefaultCacheServer) с coherence.jar в classpath и конфигурационными файлами. Можно запустить несколько нодов на одной/разных машинах, под одним или разными пользователями без ограничений. То есть важно понимать, что это просто java процесс и его можно/нужно дебажить так же, как и любое серверное приложение, которое вы пишете.
Кластер
Кластер это набор нескольких нодов. Ноды в конфигурации по умолчанию будут находить друг друга автоматически по multicast. При необходимости можно сконфигурировать WKA (well known addresses), если системные администраторы будут недовольны, что вы “забили всю сеть своим мультикастом”. В кластере всегда есть мастер нод (senior member), который смотрит за тем, что происходит с кластером (сколько нодов есть, какие из них хранят данные, куда копировать данные, если один из нодов “упал” и т.д.). Мастер нод — это первый нод, который стартовал. Если мастер нод “упал” по какой-то причине, то следующий мастер нод назначается автоматически. Следует заметить, что во время обработки данных мастер нод не используется. Вычисления выполняются на нодах, где лежат требуемые данные. Как правило, ноды разделяют по назначению: прокси, вычислительные и ноды для хранения данных. Если вообще все ноды “упали”, то данных у вас нет. То есть надо заранее продумать, как данные/изменения будут сохраняться и как загружаться после загрузки системы.
В процессе разработки рекомендуется конфигурировать среду разработки, похожую на продакшн. Это позволит находить многие ошибки сериализации и коммуникации между нодами до того, как вы выпустили версию в продакшн.
Конфигурация нодов
По умолчанию, конфигурационные файлы не нужны, в этом случае будут использоваться файлы из coherence.jar. Конфигурационные файлы, предоставляемые по умолчанию, не подходят для продакшн системы, их нужно менять под конкретную задачу. Некоторые даже рекомендуют удалять эти файлы из coherence.jar файла.
Существует 3 основных конфигурационных файла, с которыми вам придётся работать:
tangosol-coherence.xml — этот файл отвечает за конфигурацию кластера в целом. Например, имя кластера конфигурируется в этом файле.
coherence-cache-config.xml — этот файл отвечает за конфигурацию различных кэшей, которые кластер будет обслуживать.
coherence-pof-config.xml — этот файл предназначен для того, чтобы определить, какие данные будут обрабатываться кластером. Также, в этом файле определяется, каким образом данные будут сериализироваться для передачи и хранения в кластере.
Существуют так называемые overrirde файлы (например, tangosol-coherence-override.xml). Установки в этом файле переписывают установки базовых файлов. То есть, если у вас есть tangosol-coherence.xml и tangosol-coherence-override.xml в classpath, то все установки загрузятся из первого файла и перепишутся поверх установками из второго.
Вы можете иметь несколько одинаковых файлов в classpath, но только первый найденный файл будет использоваться. Также можно установить необходимые конфигурационные файлы с помощью системных (-D) установок.
Когда кластер стартует, он пишет, какие файлы использовались для конфигурации системы. В логах появится нечто похожее на следующее:
Loaded operational configuration from resource…
Loaded operational overrides from resource…
Loaded operational overrides from resource…
Прокси (extend) ноды
Прокси (extend) ноды это ноды, которые используются для того, чтобы обеспечить доступ к кластеру со стороны клиентов. Конфигурация должна быть сделана как на стороне сервера, так и на стороне клиента. То есть, если у вас есть приложение написанное на .NET платформе, то вам придётся поставить .NET клиент (coherence-net-<версия>.zip) и предоставить coherence-cache-config.xml, в котором будут описаны детали кластера, к которому нужно подсоединяться. На стороне сервера необходимо будет предоставить coherence-cache-config.xml файл, в котором будет описана <proxy-scheme>, где будет указан адрес и порт, на которых необходимо слушать входящие вызовы. И на клиенте, и на сервере необходимо предоставить coherence-pof-config.xml, в котором описаны форматы данных для сообщения между клиентом и сервером.
Прокси ноды не должны использоваться для вычислений. Если в процессе отладки приложения ваш отладчик останавливается на прокси ноде, это значит что конфигурация кластера, как правило, выполнена неправильно.
Ноды для хранения данных (storage nodes)
Это ноды, у которых выставлена переменная окружения -Dtangosol.coherence.distributed.localstorage=true. По-умолчанию нод хранит данные в java heap, но можно так же «скидывать» их на диск и подгружать по надобности. На этих нодах можно производить вычисления, но надо понимать, что нужно производить как можно меньше мусора в процессе вычислений для того, чтобы нод не «упал» из-за недостатка памяти (OutOfMemory). В случае, если нод «упадёт» по какой либо причине, данные с него будут скопированы на другие ноды, таким образом общая ёмкость кластера уменьшится. Это может породить эффект каскада, если свободного места в кластере окажется не достаточно, и тогда может «упасть» весь кластер, нод за нодом. Как правило, важные данные имеют вторую копию (что прописывается в настройках конфигурации), таким образом потеря отдельного нода не критична.
Данные, которые является промежуточным результатом и легко вычисляемые по основным данным, не нуждаются во второй копии. Хранение данных может быть сконфигурировано таким образом, чтобы иметь копии на другом ноде, на другой физической машине или вообще на другой стойке в другом городе. Это всё конфигурационные параметры и программировать здесь ничего не надо. Параметры хранения данных достаточно гибкие и позволяют сконфигурировать систему под конкретную задачу.
Вычислительные ноды (application tier/storage disabled nodes)
Это ноды, у которых выставлена переменная окружения -Dtangosol.coherence.distributed.localstorage=false. Данные ноды используется для того, чтобы равномерно распределить вычисления на кластер. На этих нодах можно также кэшировать промежуточные вычисления. Вся бизнес логика, которую вы хотите реализовать для данного приложения, должна выполняться в кластере на этих нодах.
Давайте рассмотрим, каким образом реализуется процесс пробрасывание вызова через иерархию нодов. У вас есть ноды для хранения данных, вычислительные ноды и прокси ноды. На прокси нодах данные не хранятся и кэши не конфигурируются. На вычислительных нодах вы конфигурируете кэши, но без возможности сохранения данных в кэшах. На нодах для хранения данных у вас находятся данные. Со стороны клиента вы не должны использовать кэши, на которых хранятся данные. То есть вы не выполняете вычисления на самих данных напрямую с клиента, а всегда используете вычислительные ноды для выполнения операций над данными. Таким образом, вы изолируете данные от клиентских приложений, что дает вам возможность в будущем менять архитектуру хранения данных без изменения клиента. Все ноды в кластере «знают», где и какой кэш находится. Получается, что если вы посылаете задачу на выполнение на кэш, сконфигурированный для вычислений, он будет выполнен в группе вычислительных нодов, используя данные с нодов, на которых хранятся данные. Возможно, это звучит не совсем понятно, но это отдельная тема для статьи.
Локализация данных (data affinity)
В некоторых случаях полезно иметь данные, сгруппированные вместе по какому-либо принципу. Например, вы можете сгруппировать данные таким образом, что ноды, находящиеся на одной физической машине, будут иметь зависимые данные. В этом случае у вас не будет задержки сети и вычисления будут происходить быстрее.
Механизмы отправки задачи на выполнение следующие: EntryAggregator, EntryProcessor, InvocationService, MapListener, EventInterceptor
Агрегатор(EntryAggregator) — это задача, которая будет выполнена над копиями данных. То есть поменять данные в кэше из агрегатора у вас не получится. Работа происходит с read-only данными. Типичными задачами является сумма, минимум, максимум.
Процессор(EntryProcessor) — эта задача, которая предполагает изменения данных внутри кэша. То есть, если вы хотите поменять данные внутри кэша, там, где физически находятся данные, вам нужно использовать для этого процессор. Приятной особенностью процессора является блокировка на данные в процессе обработки. То есть, если у вас есть несколько операций, которые должны быть вызваны последовательно, то для этого нужно использовать процессор, так как только один процессор будет работать над этим фрагментом данных в конкретный момент времени.
InvocationService — это задача уровня нода. В данном случае, вы работаете грубо говоря с Java процессами, а не с данными. Задачи данного типа должны реализовывать Invocable, что в свою очередь является Runnable.
MapListener — эта задача будет выполнена асинхронно, как реакция на события на уровне кэша.
EventInterceptor — эта задача похожа на предыдущую в том смысле, что она будет выполнена как реакция на событие, но разница состоит в том, что listener будет выполнен на всех нодах, на которых сконфигурирован кэш, а interceptor — только на нодах, которые имеют данные, для которых выполнено событие. У interceptor'а также есть возможность быть вызванным до или после события.
Детальное объяснение, как работают различные типы задач, выходит за рамки этой статьи.
POF (portable object format) сериализация
Все данные в кластере хранятся в байтовом массиве. Поля сериализованного объекта хранятся последовательно (у каждого поля свой индекс) и именно так, как вы напишите в методах readExternal/writeExternal класса, который реализует интерфейс PortableObject или serialize/deserialize класса, реализующего интерфейс PofSerializer. Внутри байтового массива поля сохраняются последовательно. Сканирование массива также происходит последовательно. Из этого следует не очевидный вывод: наиболее используемые поля должны находиться ближе к началу байтового массива. Данные объекта, записываемые в массив, могут быть вложенные и иметь свою сериализацию. То есть, при реализации интерфейсов PortableObject и PofSerializer, вы переводите иерархическую структуру java объекта в плоскую структуру байтового массива.
Coherence предоставляет сериализацию для стандартных объектов из jdk (java.lang). Все объекты, которые должны быть сохранены в кластере, должны быть описаны в файле coherence-pof-config.xml. Каждый тип данных имеет свой номер. Номера ваших типов данных должны начинаться c 1000. Таким образом, у вас получается структура, хорошо переносимая с одной платформы на другую, и с одного языка программирования на другой. Каждый класс, который будет сериализован в кластере, должен иметь корректно реализованые hashCode и equals методы.
Извлечение данных из кластера (ValueExtractor)
Из предыдущего пункта мы знаем, что данные хранятся в байтовом массиве. Для того, чтобы извлечь данные, нужно написать класс, который реализует интерфейс ValueExtractor. Coherence будет использовать этот класс для того, чтобы достать необходимую часть сериализованного объекта и представить его виде класса, с которым вы можете работать. То есть у вас есть возможность «вытащить» из данных не весь объект целиком, а только то, что вам нужно в данный момент для вычислений. Таким образом, у вас уменьшается количество данных, пересылаемых по сети.
Партишн (partition)
Coherence предоставляет возможность хранить данные в виде «ключ-значение», но ключ и значение — это логические понятия системы. На физическом уровне данные группируются в партишн. То есть, несколько ключей и значений могут принадлежать одной партишн. Партишн является единицей хранения данных. Когда ноды падают и данные перегруппируются между нодами, партишн копируется целиком. Класс, который назначает партишн для конкретного объекта, реализует интерфейс KeyPartitioningStrategy. По умолчанию, партишн назначается согласно хэш-кода Binary объекта ключа (com.tangosol.util.Binary объект «оборачивает» байт массив). Вы сами можете повлиять на то, как назначается партишн, предоставив свою реализацию интерфейса KeyPartitioningStrategy.
Индекс
Как и в базе данных, индекс в Coherence используется для ускорения поиска данных. Для того, чтобы создать индекс, используется метод QueryMap.addIndex(ValueExtractor extractor, boolean ordered, java.util.Comparator comparator).
ValueExtractor используется для того, чтобы выбрать из массива байт необходимые данные для индекса. Когда вы вызываете addIndex метод, это совсем не означает, что кластер начнёт индексировать данные прямо сейчас. Этот вызов является рекомендацией по созданию индекса, когда ресурсы будут позволять это сделать. После его создания, изменения данных будут отображаться корректно в индексе. Данный метод можно вызывать несколько раз, и если индекс уже существует, то он не будет заново создан. Индекс — это структура уровня нода. То есть, когда данные копируются с одного нода на другой, индекс не будет скопирован, вместо этого он будет изменён в соответствии с данными, которые находятся на этом ноде. Данные в индексе хранятся в десериализованном виде, поэтому если у вас есть необходимость достать данные быстро и без десериализации, создайте индекс. Естественно, за удобство и скорость надо «платить», и вы платите свободным местом в кластере и вычислительными ресурсами. Внутри индекс состоит из двух под-индексов (прямой и обратный). Прямой индекс хранит данные В виде ключ->значение (которое предоставил экстрактор), обратный индекс хранит данные в виде значение-> множество ключей.
Кэши: replicated, distributed, near
Replicated — это кэш, в котором все данные хранятся в десериализованном виде на каждом из нодов. Данный тип кэша, который предоставляет самые быстрые операции чтения, но медленные операции записи. Дело в том, что в случае записи, данные должны быть скопированы на все ноды, где сконфигурирован данный кэш. Данный тип кэша, как правило, применяется для редко изменяемых данных малого объема.
Distributed — это основной тип кэша, который используется для хранения данных. Он позволяет преодолеть ограничения по размеру оперативной памяти, выделенной на отдельный нод, как бы «размазывая» данные по всему кластеру. Этот тип кэша также предоставляет горизонтальную масштабируемость за счёт включения новых нодов в кластер, а также отказоустойчивость за счёт хранения копии данных на других нодах.
Near — это гибридный тип кэша, который конфигурируется на вызывающей стороне (вызывающая сторона может быть как клиент так и другой нод внутри кэша). Как правило, этот кэш «стоит» перед distributed кэшем, и хранит наиболее часто используемые данные. Данные хранятся в десериализованном виде. В случае с near кэшем, существует вероятность того, что данные устареют, поэтому вам нужно сконфигурировать, каким образом данные будут обновляться.
Сервис
Это группа java потоков которые отвечают за коммуникацию с другими нодами кластера, выполнение задач, присланных на выполнение для хранимых данных, копирование данных в случае отказа других нодов, а также другие задачи обслуживания данных. Звучит достаточно абстрактно, но это именно то, что позволяет вам с легкостью работать с данными. Сервис может обслуживать несколько кэшей. Единственное, что важно знать и помнить в процессе разработки, это то, что сервис не поддерживает reentry вызовов. Например, вы послали EntryProcessor на выполнение и из него делаете вызов на кэш, обслуживаемый данным сервисом. Вы сразу же получите InterruptedException.
Теперь, когда у нас есть базовые кирпичики понятий, можно ответить на вопрос, зачем же вообще нужен Coherence.
Ответ прост: у вас есть устойчивое к сбоям, горизонтально масштабируемое хранилище данных, которое предоставляет быстрый доступ к данным для параллельных вычислений. За счёт наличия нескольких нодов, у вас нет ограничения по размеру данных, которые вы можете хранить в кластере (конечно, вы ограничены размером доступной памяти на физических машинах, выделенных для данной задачи). У вас нет ограничения на размер отдельной пары ключ-значение. Вы также можете извлекать из хранимых данных только то, что вам нужно для вычислений, таким образом по сети будет копироваться минимум информации. Вообще, вся идеология Coherence построена на том, чтобы пересылать по сети только то, что необходимо. Также, вы можете настраивать сервисы и уровни вычислений достаточно гибко для вашей задачи. В результате, сложные задачи будут решаться быстро.
С точки зрения менеджмента, вы покупаете решение, которое позволит удовлетворить множеству требований. Однажды загрузив данные в систему, вы сможете извлекать их различным образом и использовать в других системах, которые используют Coherence как хранилище данных. Таким образом, поставив в основание Coherence, вы можете построить экосистему по извлечению и обработке данных.
Если у вас возник интерес по данной теме, я могу продолжить серию статей по Coherence. Пишите, что именно вам интересно, и я постараюсь ответить.
Пока что в плане:
- конфигурирование базовой структуры кластера
- работа с EntryAggregator
- работа с EntryProcessor
- асинхронные вызовы в Coherence
- смотрим внутрь Binary объекта
- работа с индексами
Вообще, следует учитывать, что с Coherence очень просто начинать, но очень тяжело сделать хорошо и быстро, поэтому цель цикла статей заполнить пробел между начальным уровнем знакомства с системой и продвинутым уровнем разработчика.
Недавно мой коллега по работе написал книгу для продвинутых разработчиков, которую рекомендую к прочтению. В этой книге вы не найдёте базовых знаний, но найдёте примеры решения (с объяснениями) достаточно сложных задач.
Author: David Whitmarsh
Автор: sergmesh