На работе в рамках проектирования новой системы интеграции устройств для мониторинга аудио/видео потоков возникла задача отслеживания, накопления и последующего анализа изменений их состояния. Состояние выдаётся через зоопарк динамических XML-документов, используемых, в основном, для наполнения legacy web-UI.
Для упрощения интеграции мною была предложена идея создания обобщённой библиотеки для сохранения структурированных diff-ов для (почти) произвольного XML. Поскольку эти diff-ы будут сохраняться с учётом структуры документа, это дало бы возможность очень экономно аккумулировать изменения состояния устройств, а также в будущем генерировать отчёты с аналитикой, диаграммами, и т.п. После недели запойного программирования я набросал работающий proof-of-concept, которым и хочу поделиться в данной статье.
Создание схемы документа
Библиотека использует XSD в качестве источника информации о структуре документа. Получить XSD очень просто: есть много online-сервисов, позволяющих по XML сгенерировать некоторый валидирующий его XSD. Для большинства случаев этого будет достаточно.
Далее требуется слегка модифицировать полученную XSD-схему. Для каждого элемента исходного XML-документа, предполагающего множественные вхождения, требуется добавить атрибут `monId` в соответствующий XSD `element`. Его значением будет имя атрибута, однозначно идентифицирующего повторяющийся элемент. Например, мы собираемся мониторить документы следующего вида:
<element1>
<element2 attr1="value1">
<element3>
<element4 attr2="value2">value3</element4>
<element4 attr2="value4">value5</element4>
<element4 attr2="value6">value7</element4>
</element3>
</element2>
<element2 attr1="value8">
<element3>
<element4 attr2="value9">value10</element4>
<element4 attr2="value11">value12</element4>
</element3>
</element2>
</element1>
По структуре документа понятно, что как минимум следующие элементы имеют множественное вхождение:
- /element1/element2
- /element1/element2/element3/element4
Поэтому в соответствующие XSD `elements` должны быть добавлены `monId` с именами идентифицирующих атрибутов:
…
<xs:element name=«element2» maxOccurs=«unbounded» minOccurs=«0» monId=«attr1»>
…
<xs:element name=«element4» maxOccurs=«unbounded» minOccurs=«0» monId=«attr2»>
…
Как это работает
Итак, библиотека парсит XSD (на самом деле, пока поддерживается только его ограниченное подмножество, достаточное для переваривания большинства автоматически сгенерированных схем), и на его основе создаёт таблицы, соотвествующие элементам исходного документа.
После создания внутреннего представления схемы документа каждому его элементу будет соответствовать таблица в базе данных. Любое изменение элемента приведёт к добавлению новой записи в такой таблице. Т.е. каждая запись означает некоторое событие (добавление, изменение, удаление, snapshot). Другими словами, для извлечения версии документа, соответствующей заданной временной метке, библиотека сканирует все события, соответствующие данному элементу, и реконструирует его состояние.
Поскольку событий может быть множество, такая реконструкция будет требовать всё больше и больше времени. Вот почему для каждого документа периодически требуется сохранять снимок его текущего состояния (snapshot). Таким образом, реконструкция элементов будет производиться не с начала существования документа, а с ближайшего snapshot-а для указанной временной метки.
Использование
Библиотека написана на golang и хранит документы в PostgreSQL. В качестве драйвера базы данных используется libpq. В текущем состоянии библиотека умеет только сохранять и реконструировать XML-документы (для произвольной временной метки).
package main
import (
"btc/data"
"btc/mon"
"btc/xmls"
"database/sql"
"log"
"os"
"time"
)
func install(db *sql.DB) {
var err error
if err = mon.Install(db); err != nil {
log.Fatalf("failed to install data monitor: %s", err)
}
var root *xmls.Element
root, err = xmls.FromFile("tmp/etr.xsd")
if err != nil {
log.Fatalf("failed to create xml schema: %s", err)
}
schema := mon.NewSchema("etr", "probe ETR-290 checks")
if err = mon.AddSchema(db, schema, root); err != nil {
log.Fatalf("failed to install schema: %s", err)
}
doc := mon.NewDoc("hw4_172_etr", "etr",
"http://10.0.30.172/probe/etrdata?inputId=0&tuningSetupId=1",
60, 86400)
if err = mon.AddDoc(db, doc); err != nil {
log.Fatalf("failed to add document: %s", err)
}
}
func commit(db *sql.DB) {
file, err := os.Open("tmp/etr.xml")
if err != nil {
log.Fatalf("failed to open xml doc: %s", err)
}
defer file.Close()
if err = mon.CommitDoc(db, "hw4_172_etr", file, false); err != nil {
log.Fatalf("failed to commit doc: %s", err)
}
}
func checkout(db *sql.DB) {
timestamp, err := time.Parse(
time.RFC3339, "2015-12-25T18:26:58+01:00")
if err != nil {
log.Fatalf("failed to parse timestamp: %s", err)
}
if err := mon.CheckoutDoc(
db, "hw4_172_etr", timestamp,
os.Stdout, " ", " "); err != nil {
log.Fatalf("failed to checkout doc: %s", err)
}
}
func main() {
config, err := NewConfig("config.json")
if err != nil {
log.Fatalf("failed to load config: %s", err)
}
var db *sql.DB
db, err = data.Open(config.DbConnStr)
if err != nil {
log.Fatalf("failed to establish db connection: %s", err)
}
defer db.Close()
//install(db)
//commit(db)
checkout(db)
}
Автор: ababo