В своей предыдущей статье я показал, что при использовании асинхронных запросов, скорость опроса устройств по протоколу SNMP может достигать 9000 запросов в секунду (при условии, что у нас есть достаточное количество устройств для формирования такого потока ответов). Вопрос о том, что делать с этим потоком данных остался открытым.
Обычной практикой является обработка данных мониторинга посредством RDBMS (таких как Oracle Database). Но способны ли традиционные реляционные базы данных справиться с такой нагрузкой? Попробуем в этом разобраться.
День жестянщика
Начнем с разбора ошибок, обычно допускаемых новичками, впервые сталкивающимися с Oracle. Обработка данных мониторинга может предусматривать достаточно сложную логику, но об этом мы будем говорить позже. Пока просто попробуем сохранить полученные данные в следующую таблицу:
create sequence test_data_seq;
create table test_data (
id number not null,
device_id number not null,
parameter_id number not null,
value varchar2(100),
event_date date default sysdate not null
);
create unique index test_data_pk on test_data(id);
alter table test_data add
constraint pk_test_data primary key(id);
Автоматизируем генерацию ID триггером, как это советуют многие online-источники:
create or replace trigger test_data_bri
before insert
on test_data
for each row
begin
select test_data_seq.nextval into :new.id from dual;
end;
/
И начнем кодить:
package com.acme.ae.tests.jdbc;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class Test {
private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
private final static String USER_CONN = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
private final static String USER_NAME = "ais";
private final static String USER_PASS = "ais";
private Connection c = null;
private void test() throws SQLException {
StringBuffer sb = new StringBuffer();
Long timestamp = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
sb.setLength(0);
sb.append("insert into test_data(device_id, parameter_id, value) values (");
sb.append(i);
sb.append(",1,'0')");
CallableStatement st = c.prepareCall(sb.toString());
try {
st.execute();
} finally {
st.close();
}
}
System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
}
private void start() throws ClassNotFoundException, SQLException {
Class.forName(CLASS_NAME);
c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
}
private void stop() throws SQLException {
if (c != null) {
c.close();
}
}
public static void main(String[] args) {
Test t = new Test();
try {
try {
t.start();
t.test();
} finally {
t.stop();
}
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
Запустив на выполнение этот код, получаем количество вставок, выполняемых в секунду:
614
Вроде бы неплохо. Но явно меньше 9000. Что мы делаем не так?
Довольно многое (пункты даны в порядке убывания влияния на производительность):
- Мы провоцируем hard parse, передавая каждый раз новый вариант запроса (у такого подхода есть и другие недостатки, такие как возможность получения sql injection или заполнение библиотечного кэша, но нам хватит и первой причины, чтобы больше никогда так не делать)
- Создавая сессию, JDBC по умолчанию устанавливает включенной настройку auto commit (ниже мы посмотрим, к чему это приводит с точки зрения производительности)
- Вызов триггера приводит к переключению контекстов SQL и PL/SQL (это не очень существенно при вставке одиночных записей, но весьма заметно при выполнении массовых вставок)
- При массовой вставке, sequence имеет смысл кэшировать (но это не наш случай, поскольку мы вставляем по одной записи из одного потока)
Избавимся от триггера и жестких разборов. Поскольку для более сложной обработки данных нам все равно понадобится хранимый код, перенесем вставку в пакет:
create or replace package test_package as
procedure addData( p_device in number
, p_parameter in number
, p_value in number );
end test_package;
/
show errors;
create or replace package body test_package as
procedure addData( p_device in number
, p_parameter in number
, p_value in number ) as
begin
insert into test_data(id, device_id, parameter_id, value)
values (test_data_seq.nextval, p_device, p_parameter, p_value);
end;
end test_package;
/
show errors;
package com.acme.ae.tests.jdbc;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class Test {
private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
private final static String USER_CONN = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
private final static String USER_NAME = "ais";
private final static String USER_PASS = "ais";
private final static boolean AUTO_COMMIT_MODE = true;
private final static String ADD_DATA_SQL =
"begin test_package.addData(?,?,?); end;";
private Connection c = null;
private void test() throws SQLException {
CallableStatement st = c.prepareCall(ADD_DATA_SQL);
try {
Long timestamp = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
st.setInt(1, i);
st.setInt(2, 1);
st.setString(3, "0");
st.execute();
}
System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
} finally {
st.close();
}
}
private void start() throws ClassNotFoundException, SQLException {
Class.forName(CLASS_NAME);
c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
c.setAutoCommit(AUTO_COMMIT_MODE);
}
private void stop() throws SQLException {
if (c != null) {
c.close();
}
}
public static void main(String[] args) {
Test t = new Test();
try {
try {
t.start();
t.test();
} finally {
t.stop();
}
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
Хочу обратить внимание, что теперь текст запроса заканчивается точкой с запятой. По ней Oracle определяет, что мы собираемся выполнять не SQL, а PL/SQL-код.
Запускаем на выполнение:
956
Лучше, но все равно мало.
Быстрее, выше, сильнее!
Что можно сделать чтобы ускорить вставку? Для начала стоит отключить auto commit. При работе с Oracle, слишком частая фиксация транзакций это не только накладные расходы, но и (в некоторых случаях) хороший шанс спровоцировать ошибку ORA-01555.
Разумеется, размер транзакции должен определяться бизнес логикой. Если бизнес-логика требует фиксировать каждую одиночную вставку в отдельной транзакции, то именно так и придется делать. Но (на мой взгляд) лучше управлять фиксацией транзакций самостоятельно чем отдавать этот важный вопрос на откуп JDBC.
Какого размера должна быть транзакция при массовой вставке данных? Универсального ответа, разумеется, нет. До определенного предела, действует правило — чем больше размер транзакции, тем быстрее сохраняются данные. В этой статье я не буду экспериментировать с размером транзакций, а просто выполню вставку всех строк в одной транзакции.
package com.acme.ae.tests.jdbc;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class Test {
private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
private final static String USER_CONN = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
private final static String USER_NAME = "ais";
private final static String USER_PASS = "ais";
private final static boolean AUTO_COMMIT_MODE = false;
private final static String ADD_DATA_SQL =
"begin test_package.addData(?,?,?); end;";
private Connection c = null;
private void test() throws SQLException {
CallableStatement st = c.prepareCall(ADD_DATA_SQL);
try {
Long timestamp = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
st.setInt(1, i);
st.setInt(2, 1);
st.setString(3, "0");
st.execute();
}
c.commit();
System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
} finally {
st.close();
}
}
private void start() throws ClassNotFoundException, SQLException {
Class.forName(CLASS_NAME);
c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
c.setAutoCommit(AUTO_COMMIT_MODE);
}
private void stop() throws SQLException {
if (c != null) {
c.close();
}
}
public static void main(String[] args) {
Test t = new Test();
try {
try {
t.start();
t.test();
} finally {
t.stop();
}
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
Результат улучшился почти в два раза:
1524
Чтобы улучшить его еще больше, можно вспомнить о временных таблицах:
drop table test_data;
create global temporary table test_data (
device_id number not null,
parameter_id number not null,
value varchar2(100),
event_date date default sysdate not null
) on commit delete rows;
Поскольку мы убрали из таблицы столбец id, внесем изменения в реализацию пакета:
create or replace package body test_package as
procedure addData( p_device in number
, p_parameter in number
, p_value in number ) as
begin
insert into test_data(device_id, parameter_id, value)
values (p_device, p_parameter, p_value);
end;
end test_package;
/
show errors;
Так как вставка данных во временную таблицу не фиксируется в REDO-логе (не требуется восстанавливать эти данные при сбое), это ведет к закономерному снижению накладных расходов:
1779
Если использовать DML-запросы вместо обращения к пакету, можно довести это значение до 2000. Неужели это предел?
Разумеется нет! Сейчас, когда мы практически свели на нет накладные расходы связанные с фиксацией транзакций, мы упираемся в другое бутылочное горлышко — в количество запросов, передаваемых по сети. Чтобы решить эту проблему, были разработаны bulk-запросы, позволяющие передавать несколько однотипных изменений в одном запросе.
Вносим необходимые изменения в код:
package com.acme.ae.tests.jdbc;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class Test {
private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
private final static String USER_CONN = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
private final static String USER_NAME = "ais";
private final static String USER_PASS = "ais";
private final static boolean AUTO_COMMIT_MODE = false;
private final static int BULK_SIZE = 10;
private final static String ADD_DATA_SQL =
"begin test_package.addData(?,?,?); end;";
private Connection c = null;
private void test() throws SQLException {
CallableStatement st = c.prepareCall(ADD_DATA_SQL);
try {
int bulkSize = BULK_SIZE;
Long timestamp = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
st.setInt(1, i);
st.setInt(2, 1);
st.setString(3, "0");
st.addBatch();
if (--bulkSize <= 0) {
st.executeBatch();
bulkSize = BULK_SIZE;
}
}
if (bulkSize < BULK_SIZE) {
st.executeBatch();
}
c.commit();
System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
} finally {
st.close();
}
}
private void start() throws ClassNotFoundException, SQLException {
Class.forName(CLASS_NAME);
c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
c.setAutoCommit(AUTO_COMMIT_MODE);
}
private void stop() throws SQLException {
if (c != null) {
c.close();
}
}
public static void main(String[] args) {
Test t = new Test();
try {
try {
t.start();
t.test();
} finally {
t.stop();
}
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
Запускаем на выполнение:
1779
И не замечаем никакой разницы. Почему?
Дело в том, что bulk-и работают только для DML-запросов (insert, update, delete). Если мы пытаемся вызывать bulk-запросом хранимую процедуру, JDBC эмулирует bulk, отсылая по сети ровно то-же самое количество запросов, что было бы без него.
Исправим этот досадный промах:
package com.acme.ae.tests.jdbc;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class Test {
private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
private final static String USER_CONN = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
private final static String USER_NAME = "ais";
private final static String USER_PASS = "ais";
private final static boolean AUTO_COMMIT_MODE = false;
private final static int BULK_SIZE = 10;
private final static String ADD_DATA_SQL =
"insert into test_data(device_id, parameter_id, value) values (?,?,?)";
private Connection c = null;
private void test() throws SQLException {
CallableStatement st = c.prepareCall(ADD_DATA_SQL);
try {
int bulkSize = BULK_SIZE;
Long timestamp = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
st.setInt(1, i);
st.setInt(2, 1);
st.setString(3, "0");
st.addBatch();
if (--bulkSize <= 0) {
st.executeBatch();
bulkSize = BULK_SIZE;
}
}
if (bulkSize < BULK_SIZE) {
st.executeBatch();
}
c.commit();
System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
} finally {
st.close();
}
}
private void start() throws ClassNotFoundException, SQLException {
Class.forName(CLASS_NAME);
c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
c.setAutoCommit(AUTO_COMMIT_MODE);
}
private void stop() throws SQLException {
if (c != null) {
c.close();
}
}
public static void main(String[] args) {
Test t = new Test();
try {
try {
t.start();
t.test();
} finally {
t.stop();
}
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
И запустим код на выполнение снова:
12658
Ха, мы определенно идем на рекорд! Доведя BULK_SIZE до 100, можно получить и вовсе волшебное значение:
31250
Итак, мы научились очень быстро вставлять данные во временную таблицу и… терять их при commit-е, так ничего с ними и не сделав.
Где логика?
Сейчас добавим. Для начала спроектируем схему данных:
Пусть у нас имеется список устройств test_device и список параметров test_parameter. Текущее значение каждого параметра будет храниться в test_state, а в test_history будем записывать хронологию изменения значений. Для упрощения кода, будем считать что параметры не могут принимать значение null.
Кроме того, мы будем обрабатывать значения двух типов, заданных в test_parameter_type. Тип параметра будет определять, в каком случае выполняется добавление записи в test_history:
- Тип 'default' будем использовать (в частности) для хранения оперативного состояния устройств, запись в test_history добавляется при любом изменении значения
- Тип 'uptime' используем для хранения времени безостановочной работы, значение в test_history добавляется из test_state в том случае, если новое значение меньше предыдущего (то есть устройство перезагружалось)
Разумеется, в реальной системе, типов параметров будет больше.
create table test_device (
id number not null,
name varchar2(100)
);
create unique index test_device_pk on test_device(id);
alter table test_device add
constraint pk_test_device primary key(id);
create table test_parameter_type (
id number not null,
name varchar2(100)
);
create unique index test_parameter_type_pk on test_parameter_type(id);
alter table test_parameter_type add
constraint pk_test_parameter_type primary key(id);
create table test_parameter (
id number not null,
type_id number not null,
name varchar2(100)
);
create unique index test_parameter_pk on test_parameter(id);
create index test_parameter_fk on test_parameter(type_id);
alter table test_parameter add
constraint pk_test_parameter primary key(id);
alter table test_parameter add
constraint fk_test_parameter foreign key (type_id)
references test_parameter_type(id);
create table test_state (
device_id number not null,
parameter_id number not null,
value varchar2(100) not null,
last_date date default sysdate not null
);
create unique index test_state_pk on test_state(device_id, parameter_id);
create index test_state_fk on test_state(parameter_id);
alter table test_state add
constraint pk_test_state primary key(device_id, parameter_id);
alter table test_state add
constraint fk_test_state foreign key (device_id)
references test_device(id);
alter table test_state add
constraint fk_test_state_parameter foreign key (parameter_id)
references test_parameter(id);
create sequence test_history_seq;
create table test_history (
id number not null,
device_id number not null,
parameter_id number not null,
value varchar2(100) not null,
event_date date default sysdate not null
);
create unique index test_history_pk on test_history(id);
create index test_history_device_fk on test_history(device_id);
create index test_history_parameter_fk on test_history(parameter_id);
alter table test_history add
constraint pk_test_history primary key(id);
alter table test_history add
constraint fk_test_history_device foreign key (device_id)
references test_device(id);
alter table test_history add
constraint fk_test_history_parameter foreign key (parameter_id)
references test_parameter(id);
Insert into TEST_PARAMETER_TYPE
(ID, NAME)
Values
(1, 'default');
Insert into TEST_PARAMETER_TYPE
(ID, NAME)
Values
(2, 'uptime');
COMMIT;
Insert into TEST_PARAMETER
(ID, TYPE_ID, NAME)
Values
(1, 1, 'status');
Insert into TEST_PARAMETER
(ID, TYPE_ID, NAME)
Values
(2, 2, 'uptime');
COMMIT;
insert into test_device(id, name)
select rownum, object_name
from all_objects;
commit;
Далее, добавим в пакет процедуру массовой обработки значений:
CREATE OR REPLACE package test_package as
procedure saveData;
end test_package;
/
show errors;
CREATE OR REPLACE package body test_package as
procedure saveData as
begin
-- Добавляем history для параметров типа default
insert into test_history(id, device_id, parameter_id, value, event_date)
select test_history_seq.nextval, a.device_id, a.parameter_id, a.value, a.event_date
from test_data a
inner join test_device b on ( b.id = a.device_id )
inner join test_parameter c on ( c.id = a.parameter_id )
inner join test_parameter_type d on ( d.id = c.type_id and d.name = 'default' )
left join test_state e on ( e.device_id = a.device_id and
e.parameter_id = a.parameter_id )
where e.value is null or e.value <> a.value;
-- Добавляем history для параметров типа uptime
insert into test_history(id, device_id, parameter_id, value, event_date)
select test_history_seq.nextval, a.device_id, a.parameter_id, e.value, e.last_date
from test_data a
inner join test_device b on ( b.id = a.device_id )
inner join test_parameter c on ( c.id = a.parameter_id )
inner join test_parameter_type d on ( d.id = c.type_id and d.name = 'uptime' )
left join test_state e on ( e.device_id = a.device_id and
e.parameter_id = a.parameter_id )
where e.value > a.value;
-- Обновляем state
merge into test_state a
using ( select c.device_id, c.parameter_id, c.value, c.event_date
from test_data c
inner join test_device d on ( d.id = c.device_id )
inner join test_parameter e on ( e.id = c.parameter_id )
) b
on ( b.device_id = a.device_id and b.parameter_id = a.parameter_id )
when matched then
update set a.value = b.value, a.last_date = b.event_date
when not matched then
insert (device_id, parameter_id, value, last_date)
values (b.device_id, b.parameter_id, b.value, b.event_date);
-- Сохраняем изменения, очищая временную таблицу
commit;
end;
end test_package;
/
show errors;
Можно заметить, что запросы получились не простые, но сложные запросы это именно то, что Oracle умеет выполнять лучше всего.
Внесем изменения в код:
package com.acme.ae.tests.jdbc;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class Test {
private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
private final static String USER_CONN = "jdbc:oracle:thin:@192.168.124.35:1521:orcl";
private final static String USER_NAME = "ais";
private final static String USER_PASS = "ais";
private final static boolean AUTO_COMMIT_MODE = false;
private final static int BULK_SIZE = 100;
private final static String ADD_DATA_SQL =
"insert into test_data(device_id, parameter_id, value) values (?,?,?)";
private final static String SAVE_DATA_SQL =
"begin test_package.saveData; end;";
private Connection c = null;
private void test() throws SQLException {
Long timestamp = System.currentTimeMillis();
CallableStatement st = c.prepareCall(ADD_DATA_SQL);
try {
int bulkSize = BULK_SIZE;
for (int i = 0; i < 1000; i++) {
st.setInt(1, i);
st.setInt(2, 1);
st.setString(3, "0");
st.addBatch();
if (--bulkSize <= 0) {
st.executeBatch();
bulkSize = BULK_SIZE;
}
}
if (bulkSize < BULK_SIZE) {
st.executeBatch();
}
} finally {
st.close();
}
st = c.prepareCall(SAVE_DATA_SQL);
try {
st.execute();
} finally {
st.close();
}
System.out.println(1000000L / (System.currentTimeMillis() - timestamp));
}
private void start() throws ClassNotFoundException, SQLException {
Class.forName(CLASS_NAME);
c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
c.setAutoCommit(AUTO_COMMIT_MODE);
}
private void stop() throws SQLException {
if (c != null) {
c.close();
}
}
public static void main(String[] args) {
Test t = new Test();
try {
try {
t.start();
t.test();
} finally {
t.stop();
}
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
и запустим его на выполнение:
5319
Увы, чуда не произошло. Необходимость сохранения данных в журналируемых таблицах сыграла свою роль. Но 5000 записей в секунду, в любом случае, гораздо лучше 600. Теперь мы знаем максимальную скорость, с которой мы можем сохранять данные в Oracle (естественно на выбранном для тестов сервере).
Если нам требуется более высокая скорость обработки или меньшее время обработки каждого запроса, имеет смысл смотреть с сторону InMemory DB. Но это тема для другого разговора.
Готовы ли вы потерять свои данные?
Можно заметить, что мы добились высокой скорости загрузки данных в обмен на риск потери той части данных, сохранение которой не зафиксировано. При проектировании приложения, этот риск можно свести к минимуму, но полностью избавиться от него нельзя.
Использование InMemory или NoSQL DB не изменит эту картину радикально. Хотя эти СУБД предусматривают защиту данных (например при помощи REDO-лога в случае Oracle TimesTen), добиться максимальной производительности можно только отключив эту защиту (такая возможность предоставляется очень часто).
Приходится выбирать, что важнее — обеспечить максимально возможную сохранность данных или максимальную производительность? Ответ на этот вопрос может дать только постановщик задачи. Мы же, со своей стороны, можем воплотить его требования в жизнь, добиваясь максимальной эффективности от того инструмента, который используем.
Автор: GlukKazan