Привет!
В этой статье мы попробуем взглянуть на архитектуру учетных систем (ERP, CRM, WMS, MES, B2B, ...) с позиций функционального программирования. Существующие системы сложны. Они базируются на реляционной схеме данных, и имеют огромный мутабельный стейт в виде сотен связаных таблиц. При этом единственным «источником правды» в таких системах является хронологически-упорядоченный журнал первичных документов (отпечатков событий реального мира), которые, очевидно, должны быть иммутабельными (и это правило соблюдается в аудируемых системах, где корректировки «задним числом» запрещены). Журнал документов составляет от силы 20% объема БД, а все остальное — промежуточные абстракции и агрегаты, с которыми удобно работать на языке SQL, но которые требуют постоянной синхронизации с документами, и между собой.
Если вернуться к истокам (устранить избыточность данных и отказаться от хранения агрегатов), а все бизнес-алгоритмы реализовать в виде функций, применяемых непосредственно к потоку первичных документов — мы получим функциональную СУБД, и построенную на ней функциональную ERP. Проблема производительности решается благодаря мемоизации, а объем функционального кода будет вполне соизмерим с объемом декларативного SQL, и не сложнее для понимания. В данной статье мы продемонстрируем подход, разработав простейшую файловую СУБД на языке TypeScript и рантайме Deno (аналог Node.js), а также протестируем производительность сверток на примере типичных бизнес-задач.
Почему это актуально
1) Мутабельный стейт + избыточность данных — это плохо, особенно когда необходимо обеспечивать его постоянную синхронизацию с потоком документов. Это источник потенциальных расхождений учетных данных (баланс не сходится) и трудно обнаруживаемых побочных эффектов.
2) Жесткая реляционная схема хранения исходных и промежуточных данных дорого обходится в Big Data, гетерогенных системах, и в условиях быстрых изменений — то есть по сути везде. Мы предлагаем хранить документы в исходном виде, упорядочив по времени, разрешив связи «от новых к старым» и никогда наоборот. Это позволит рассчитывать большинство агрегатов однопроходными алгоритмами прямо из документов, а все остальные таблицы — не нужны.
3) SQL устарел, так как предполагает доступность любых данных в любой момент, а в распределенных системах это очевидно не так — при разработке алгоритмов Big Data нужно быть готовым к тому, что часть необходимых данных появится позже, а часть уже появлялась раньше. Это требует небольшого переосмысления языка запросов, и сознательной заботы о кэшировании.
4) Современные ЯП позволяют создать отзывчивую систему, оперирующую миллионами записей локально на ноутбуке, где РСУБД просто не установится. А если говорить о серверах — предлагаемая схема имеет больше возможностей для параллелизма, в том числе на кластерах типа SPARK.
Предыстория вопроса
Проработав достаточно долго с различным бизнес-ПО (системы учета, планирования, WMS), практически везде сталкивался с двумя проблемами — сложность внесения изменений в схему данных, и частое падение производительности, когда эти изменения таки вносились. Вообще, эти системы имеют сложную структуру, поскольку к ним предъявляются противоречивые требования:
1) Аудируемость. Нужно хранить все первичные документы в неизменном виде. Разделение на справочники и операции весьма условно, во взрослых системах справочники огранизованы с версионированием, где каждое изменение оформляется специальным документом. Таким образом, исходные документы — это иммутабельная часть системы, и она является единственным «источником правды», а все остальные данные могут быть восстановлены из нее.
2) Производительность запросов. Например, при создании строки заказа на продажу система должна рассчитать цену товара с учетом скидок, для чего необходимо извлечь статус клиента, его текущий баланс, историю покупок, текущие акции в регионе, и т.д. Естественно, вся необходимая информация не может быть вычислена «на лету», а должна быть доступной в полу-готовом виде. Поэтому существующие системы хранят удобные абстракции над строками документов (проводки), а также заранее рассчитанные агрегаты (регистры накопления, временные срезы, текущие остатки, сводные проводки, и т.д.). Их объем составляет до 80% размера БД, структура таблиц жестко фиксирована, при любых изменениях в алгоритмах — программист должен позаботиться о правильном обновлении агрегатов. По сути агрегаты это и есть мутабельное состояние системы.
3) Транзакционная производительность. При проведении любого документа нужно пересчитать все агретаты, а это обычно блокирующая операция. Поэтому алгоритмы обновления агрегатов — самая болезненная точка системы, и при внесении большого количества изменений имеется существенный риск что-то сломать, и тогда данные «разъедутся», то есть агрегаты перестанут соответствовать документам. Эта ситуация — бич всех проектов внедрения и последующей поддержки.
Устанавливаем основы новой архитектуры
1) Хранение. Основа БД — хронологически упорядоченный журнал документов, отражающих свершившиеся факты реального мира. Справочники это тоже документы, просто длительного действия. И документ, и каждая версия записи справочника — иммутабельны. Никаких других данных в виде проводок / регистров / остатков в системе не хранится (сильное провокационное утверждение, в жизни бывает по разному, но нужно стремиться к совершенству). Документ имеет ряд системных атрибутов:
{
"sys": {
"code": "partner.1", // человеко-читаемый код, идентификатор группы
"ts": 1578263624612, // временная метка записи
"id": "partner.1^1578263624612", // составной уникальный глобальный ID
"cache": 1 // признак необходимости принудительного кэширования
},
...
}
Документы с одинаковым code и разными ts образуют историческую группу, где актуальной считается последняя запись, остальные — историческими. Если установлен атрибут cache, последняя запись из группы попадают в так называемый топ-кэш, и одновременно все записи попадают в фулл-кэш, таким образом мы можем быстро извлечь запись справочника как по id, так и по code.
Документы могут дописываться в конец журнала, и никогда в середину. Изменение или удаление (отмена) старого документа — это всегда новый документ, записываемый в журнал с текущим ts. Таким образом, причинно-следственная связь определяется положением документа в журнале, скачки в прошлое или будущее запрещены (при этом даты принятия к учету, даты исполнения планов могут быть любыми, но с точки зрения ядра системы это просто атрибуты).
2) Связи. Документы могут ссылаться друг на друга по id. В отличие от «sql foreign key» — указывать тип сущности, на которую ссылаемся, нет необходимости, так как сущности лежат вперемешку, а id уникален. Связи от ранних документов к более поздним запрещены. Это означает, что в любом пользовательском алгоритме при обработке текущего документа могут быть востребованы связанные документы, уже встречавшиеся в выборке ранее (и по идее они должны быть кэшированы — либо ядром, либо пользовательским алгоритмом).
3) Горизонт иммутабельности. Часть документов, с которыми ведется активная работа (т.н. открытые документы) не может быть зафиксирована, поэтому вводится понятие горизонта иммутабельности, а база данных разделяется на 2 физических хранилища — иммутабельное хранилище и текущее хранилище. Все документы в первом хранилище имеют временную метку меньше горизонта, они неизменны, а результаты всех сверток кэшируются и переиспользуются. Все что позднее — называется текущим периодом, и при каждом запросе второе хранилище сканируется заново. Такая схема дает линейное время. Горизонт иммутабельности — термин, хорошо знакомый коллегам из 1С, и бухгалтерам. Производительность системы зависит исключительно от размера бардака текущего периода, и в этом вопросе мировая бизнес-практика беспощадна — чем он меньше, тем лучше.
4) Алгоритмы. Журнал документов может храниться в любом виде — последовательный файл, документная БД, таблица РСУБД, поступать из внешнего стрима — главное чтобы они были извлекаемы в прямом либо обратном хронологическом порядке. Любой бизнес-алгоритм — это композиция функций filter(), reduce(), get(), gettop(), примененная к потоку документов. Ввиду отсутствия семантики JOIN, у пользователя остается возможность использовать вложенные подзапросы к БД, либо пытаться ограничиться одним проходом, помещая в пользовательский кэш все, что может потребоваться в будущем. Естественно, помогает системный кэш, хранящий как отдельные документы, к которым уже были запросы по id / code, так и результаты расчетов, выполненных ранее (при полном совпадении входных параметров этих расчетов).
5) Мемоизация, или кэширование. Результаты запросов и расчетов попадают в кэш в случаях:
— документ имеет атрибут cache, при первом reduce() он записывается в фулл-кэш, и обновляет запись в топ-кэше;
— документ извлечен запросом по id / code, и он находится в иммутабельном хранилище;
— reduce() завершил расчет по иммутабельному хранилищу, промежуточный результат клонируется и записывается в кэш, а расчет продолжается по текущему хранилищу.
Мы видим, что в отличие от жестко-структурированного «кэша» в системах, основанных на РСУБД, мы имеем адаптивный кэш, наполняемый по мере работы системы. Необходимость экономить память заставляет ограничивать объем кэшируемой информации, поэтому, например, результат функции filter() не кэшируется, а результат reduce() — обязательно. Пользователю даются ограниченные инструменты управления кэшем.
6) Поиск бывает 3-х видов. Первый — когда при обработке текущего документа нам нужно найти несколько связанных документов по неточным критериям. В этом случае либо подзапрос, либо в своем reduce() заранее сохраням все что может потенциально потребоваться, и когда потребовалось — ищем в этой выборке. Второй вид поиска — когда нам нужно получить актуальные элементы справочника, без исторических данных (т.н. текущий срез). В этом случае используется топ-кэш, который как раз хранит такие элементы. В третьем случае это fullscan по базе в обратной хронологической последовательности. Насколько целесообразно кэшировать результаты пользовательских поисков — вопрос дискуссионный, в какой-то мере очевидно необходимо, например с ограничением объема выборки.
7) Добавление документов и перемещение горизонта. При добавлении нового документа актуализируется только топ-кэш. При перемещении документа из текущего хранилища в иммутабельное — по идее нужно до-обновить все кэшированные агрегаты, эта операция тяжелая (зависит от количества переносимых документов и количества результатов в кэше), и должна выполняться по аналогии с закрытием учетного периода — в часы наименьшей нагрузки.
Простая функциональная СУБД
Итак, попробуем что-нибудь написать. Язык TypeScript выбран за идеальное сочетание скриптового динамизма и типизации, рантайм Deno — за удобную поддержку TypeScript и WASM, а также наличия Rust API, что теоретически дает нам шанс ускорить некоторые алгоритмы (хотя это неточно).
Документы в нашей СУБД будут храниться в виде 2-х последовательных файлов, содержащих объекты JSON, разделенные символом "x01", так как это позволяет написать быстрый потоковый парсер. API чтения состоит пока всего из 3-х функций:
type Document = any
type Result = any
public async get(id: string): Promise<Document | undefined>
public async gettop(code: string): Promise<Document | undefined>
public async reduce(
filter: (result: Result, doc: Document) => Promise<boolean>,
reducer: (result: Result, doc: Document) => Promise<void>,
result: Result
): Promise<Result>
Первая функция возвращает документ по id, вторая возвращает последний документ с заданным code, третья осуществляет фильтрацию и свертку, принимая на вход функцию фильтрации, функцию свертки и начальное значение аккумулятора. Мы сознательно не использовуем цепочку filter().reduce(), так как хотим кэшировать итоговый результат, а в случае цепочки — кэшировать отдельно результат фильтрации расточительно, а кэшировать результат свертки без знания условий фильтрации — бессмысленно. Поэтому reduce() получает на вход сразу все необходимое для расчета, и использует составной хэш от трех параметров в качестве ключа мемоизации.
Собственно, весь пользовательский алгортм представляет собой реализацию колбэков filter и reducer, а аккумулятор-результат может быть любым сериализуемым объектом. Обратите внимание, что оба колбэка возвращают промис, то есть внутри них разрешены вложенные запросы get() и reduce(). Благодаря промисам вложенный цикл (например по строкам текущего документ) можно параллелить (см. второй тест).
Исходные данные
Рассмотрим простейшую систему учета покупок и продаж. Нам нужны справочники контрагентов и номенклатур, а также документы покупки и продажи. Если мы хотим считать себестоимость расходов и маржу, нужен еще один документ — сопоставление приходов с расходами, но это уже тема отдельной статьи.
Партнеры и номенклатуры
{
"sys": {
"code": "partner.1",
"ts": 1578263624612,
"id": "partner.1^1578263624612",
"cache": 1
},
"type": "partner.retail",
"name": "Рога и копыта ООО"
}
{
"sys": {
"code": "invent.1",
"ts": 1578263624612,
"id": "invent.1^1578263624612",
"cache": 1
},
"type": "invent.material",
"name": "Гвоздь строительный 20мм"
}
Атрибут type — пользовательский, его иерархия никак не используется ядром, а лишь в пользовательских алгоритмах. Также не имеет значение семантика атрибута code — для ядра это просто строка.
Покупки и продажи
{
"sys": {
"code": "purch.1",
"ts": 1578263624613,
"id": "purch.1^1578263624613"
},
"type": "purch",
"date": "2020-01-07",
"partner": "partner.3^1578263624612",
"lines": [
{
"invent": "invent.0^1578263624612",
"qty": 2,
"price": 232.62838134273366
},
{
"invent": "invent.1^1578263624917",
"qty": 24,
"price": 174.0459600393788
}
]
}
Документы отличаются только типом (purch | sale), cтроки хранятся прямо в документе (в реляционной схеме они лежали бы в отдельной таблице).
Реализация алгоритмов
Анализ продаж
Считаем общую сумму продаж, средний чек, и среднее количество строк на документ.
import { FuncDB } from "./FuncDB.ts"
const db = FuncDB.open('./sample_database/')
let res = await db.reduce(
async (_, doc) => doc.type == 'sale', // фильтруем только продажи
async (result, doc) => {
result.doccou++
doc.lines.forEach(line => { // цикл по строкам документа
result.linecou++
result.amount += line.price * line.qty
})
},
{amount: 0, doccou: 0, linecou: 0} // инициализируем аккумулятор
)
console.log(`
amount total = ${res.amount}
amount per document = ${res.amount / res.doccou}
lines per document = ${res.linecou / res.doccou}`
)
Обороты в разрезе номенклатур и партнеров
По сути это сводная таблица, поэтому в качестве аккумулятора используем Map.
class ResultRow { // строка результирующей таблицы
invent_name = ''
partner_name = ''
debit_qty = 0
debit_amount = 0
credit_qty = 0
credit_amount = 0
}
let res = await db.reduce(
async (_, doc) => doc.type == 'purch' || doc.type == 'sale',
async (result, doc) => {
// поскольку внутри цикла у нас await - параллелим обработку строк
const promises = doc.lines.map(async (line) => {
const key = line.invent + doc.partner
let row = result.get(key)
if (row === undefined) {
row = new ResultRow()
// наименования получаем подзапросами к базе (они кэшируются)
const invent = await db.get(line.invent)
const partner = await db.get(doc.partner)
row.invent_name = invent ? invent.name : line.invent + ' not found'
row.partner_name = partner ? partner.name : doc.partner + ' not found'
result.set(key, row)
}
if (doc.type == 'purch') {
row.debit_qty += line.qty
row.debit_amount += line.qty * line.price
} else if (doc.type == 'sale') {
row.credit_qty += line.qty
row.credit_amount += line.qty * line.price
}
})
await Promise.all(promises)
},
new Map<string, ResultRow>() // результирующая таблица (аккумулятор)
)
Мы видим, что половину кода составляет извлечение наименований подзапросами. Это легко исправить, написав сервисную функцию, но для общего понимания оставлю так. Обратите внимание, что мы параллелим обработку строк — в случае если номенклатуры нет в кэше — запускается fullscan, результата которого в нашем случае ждать необязательно.
Бенчмаркинг
Тестируем на сгенерированных данных:
иммутабельное хранилище: 100 номенклатур + 100 контрагентов + 100 тыс. документов
текущее хранилище: 10 номенклатур + 10 контрагентов + 10 тыс. документов
Использую доисторический ноутбук с процессором Intel Celeron CPU N2830 @ 2.16 GHz
В первом тесте демонстрируем кэширование — сначала запускаем анализ продаж, затем добавляем новый документ продажи, и снова запускаем расчет. Видно что второй раз иммутабельное хранилище не обрабатывается, и расчет происходит в 10 раз быстрее.
100200 docs parsed (0 errors)
50018 docs processed (0 errors)
11.098s elapsed
file: database_current.json:
10020 docs parsed (0 errors)
4987 docs processed (0 errors)
1.036s elapsed
amount total = 623422871.2641689
amount per document = 11389.839613851627
lines per document = 3.6682561432355896
file: database_current.json:
10021 docs parsed (0 errors)
4988 docs processed (0 errors)
1.034s elapsed
amount total = 623433860.2641689
amount per document = 11389.832290707558
lines per document = 3.6682073954983925
Если честно, я рассчитывал минимум на миллион документов в секунду. Разберемся, где у нас основная задержка на примере обработки первого файла:
8.8s — чтение файла и извлечение строковых JSON, разделенных символом "x01"
1.9s — парсинг JSON в объекты
0.4s — кэширование + пользовательский алгоритм
Заглянув в исходники Deno, я понял, основная задержка возникает при декодировании unicode, ведь V8 в качестве байто-дробилки подходит плохо. Это значит, что переписать критические куски на WASM/Rust будет очень просто, а если в качестве хранилища использовать нормальную объектную БД, то и парсинга JSON можно избежать, и тогда достичь миллиона записей в секунду — более чем реально. И это я не говорю про нормальное железо.
Второй тест мы запускаем сначала на свеже-сгенерированной базе, затем после прогона первого теста. Второй раз производительность упала в 3 раза, потому что в первом тесте мы добавили документ продажи, ссылающийся на несуществующего партнера и номенклатуру, и, не найдя их в кэше, система вынуждена дважды запускать fullscan. Но эта ситуация по сути аварийная, а нормальная выглядит так:
100200 docs parsed (0 errors)
100000 docs processed (0 errors)
13.307s elapsed
file: database_current.json:
10020 docs parsed (0 errors)
10000 docs processed (0 errors)
1.296s elapsed
Наконец-то пошла рабочая нагрузка, мы делаем 2 вложенных асинхронных запроса, и получаем затраты на пользовательский алгоритм — 2.6s. Допускаю, что оборачивание любой функции в промис накладно, а в нашем случае запрос к кэшу выполняется синхронно, и возможно это место стоит оптимизировать.
Резюме
В целом я доволен результатом, схема получилась вполне рабочая, проект можно развивать, если у кого есть мысли — пишите. Буду благодарен за ссылку на публичные обфусцированные данные, приближенные к реальности (счета-фактуры, EDI, или что-то подобное), необходимые для полноценного тестирования.
Спасибо за внимание.
Автор: Евгений Баладжа