Введение в CDRS, Cassandra driver полностью написанный на Rust

в 12:39, , рубрики: cassandra, CQL, Rust

CDRS (Cassandra driver written in Rust) — это мой собственный open source проект, который я решился разрабатывать после того, как обнаружил, что в плане драйверов для Cassandra в Rust экосистеме образовался дефицит.

Конечно, я не скажу, что их совсем нет. Они есть, но одна часть это заброшенные в зачаточном состоянии Hello World пакеты, а вторая часть это, наверное, единственный binding к драйверу от DataStax, написанному на С++.

Что касается CDRS, то средствами Rust он полностью имплементирует спецификацию 4-й версии протокола.

cargo.toml

Чтобы включить драйвер в свой проект, как обычно, необходимо следующее.

Во-первых, добавить CDRS в секцию dependencies вашего cargo.toml файла:

[dependencies]
cdrs = "1.0.0-beta.1"

Это позволит использовать TCP соединение без шифрования.

Если вы намерены создавать SSL-шифрованное соединение со свое базой данных, то CDRS должен быть включен с фичей "ssl":

[dependencies]
openssl = "0.9.6"
[dependencies.cdrs]
version = "1.0.0-beta.1"
features = ["ssl"]

Во-вторых, добавить его в lib.rs

extern crate CDRS

Установка соединения

TCP соединение

Для установки не шифрованного соединения вам понадобятся следующие модули

use cdrs::client::CDRS;
use cdrs::authenticators::{NoneAuthenticator, PasswordAuthenticator};
use cdrs::transport::TransportPlain;

Если так случилось, что ваш кластер не требует авторизации паролем, то соединение может быть установлено следующим образом:

let authenticator = NoneAuthenticator;
let addr = "127.0.0.1:9042";
let tcp_transport = TransportPlain::new(addr).unwrap();

// pass authenticator and transport into CDRS' constructor
let client = CDRS::new(tcp_transport, authenticator);
use cdrs::compression;
// start session without compression
let mut session = try!(client.start(compression::None));

Для установки соединения, требующего авторизации паролем, вместо NoneAuthenticator нужно использовать PasswordAuthenticator:

let authenticator = PasswordAuthenticator::new("user", "pass");

TLS соединение

Установление TLS соединение во многом похоже на процесс, описанный в предыдущем шаге, за исключением того, что вам понадобится PEM сертификат для создания SSL транспорта.

use cdrs::client::CDRS;
use cdrs::authenticators::PasswordAuthenticator;
use cdrs::transport::TransportTls;
use openssl::ssl::{SslConnectorBuilder, SslMethod};
use std::path::Path;

let authenticator = PasswordAuthenticator::new("user", "pass");
let addr = "127.0.0.1:9042";

// here needs to be a path of your SSL certificate
let path = Path::new("./node0.cer.pem");
let mut ssl_connector_builder = SslConnectorBuilder::new(SslMethod::tls()).unwrap();
ssl_connector_builder.builder_mut().set_ca_file(path).unwrap();
let connector = ssl_connector_builder.build();

let ssl_transport = TransportTls::new(addr, &connector).unwrap();

// pass authenticator and SSL transport into CDRS' constructor
let client = CDRS::new(ssl_transport, authenticator);

Connection pool

Для более простого управления существующими соединениям CDRS содержит ConnectionManager, который по своей сути есть адаптор для r2d2.

use cdrs::connection_manager::ConnectionManager;
//...
let config = r2d2::Config::builder()
    .pool_size(3)
    .build();
let transport = TransportPlain::new(ADDR).unwrap();
let authenticator = PasswordAuthenticator::new(USER, PASS);
let manager = ConnectionManager::new(transport, authenticator, Compression::None);

let pool = r2d2::Pool::new(config, manager).unwrap();

for _ in 0..20 {
    let pool = pool.clone();
    thread::spawn(move || {
        let conn = pool.get().unwrap();
        // use the connection
        // it will be returned to the pool when it falls out of scope.
    });
}

Сжатие — lz4 и snappy

Чтобы использовать lz4 и snappy сжатие, достаточно передать в конструктор сессии желаемый декодер:

// session without compression
let mut session_res = client.start(compression::None);
// session with lz4 compression
let mut session_res = client.start(compression::Lz4);
// session with snappy compression
let mut session_res = client.start(compression::Snappy);

Далее CDRS самостоятельно сообщит кластеру, что он готов принимать информацию в сжатом виде с выбранным декодером. Дальнейшая распаковка будет проходить автоматически и не требует каких-либо дальнейших действий от разработчика.

Выполнение запросов

Выполнение запросов к Cassandra серверу проходит исключительно в рамках существующей сессии, после выбора методов авторизации, сжатия, а также типа транспорта.

Для выполнения того или иного запроса необходимо создать объект Query, который с первого взгляда может показаться несколько избыточным для простых запросов, посколько содержит множество параметров, которые, вероятно, используются не так часто.

По этой причине был создан builder, который упрощает процесс конфигурирования запроса. Например, для простого 'USE my_namespace;' достаточно просто

let create_query: Query = QueryBuilder::new("USE my_namespace;").finalize();
let with_tracing = false;
let with_warnings = false;

let switched = session.query(create_query, with_tracing, with_warnings).is_ok();

Создание новой таблицы

Чтобы создать новую таблицу в Cassandra кластере, как и раньше, необходимо вначале сконфигурировать Queryи после этого выполнить запрос:

use std::default::Default;
use cdrs::query::{Query, QueryBuilder};
use cdrs::consistency::Consistency;

let mut create_query: Query = QueryBuilder::new("CREATE TABLE keyspace.authors (
    id int,
    name text,
    messages list<text>,
    PRIMARY KEY (id)
    );")
    .consistency(Consistency::One)
    .finalize();
let with_tracing = false;
let with_warnings = false;

let table_created = session.query(create_query, with_tracing, with_warnings).is_ok();

Что касается самого CQL запроса создания новой таблицы, то за более полной информацией лучше обратиться к специализированным ресурсам, например DataStax.

SELECT запрос и маппинг результатов

Предположим, что в нашей базе данных существует таблица авторов, при чем каждый автор имеет список своих сообщений. Пусть эти сообщения хранятся внутри list-колонки. В терминах Rust автор должен иметь следующий вид:

struct Author {
    pub name: String,
    pub messages: Vec<String>
}

Сам запрос может быть выполнен через Session::query метод, как это было сделано в случае создания таблицы. Естественно, CQL должен быть в данном случае чем-то вроде 'SELECT * FROM keyspace.authors;'. Если таблица содержит данные о каких-то авторах, мы можем попытаться отобразить полученные данные в коллекцию Rust структур, типа 'Vec<Author>'

//...
use cdrs::error::{Result as CResult};
let res_body = parsed.get_body();
let rows = res_body.into_rows().unwrap();
let messages: Vec<Author> = rows
    .iter()
    .map(|row| {
        let name: String = row.get_by_name("name").unwrap();
        let messages: Vec<String> = row
            // unwrap Option<CResult<T>>, where T implements AsRust
            .get_by_name("messages").unwrap().unwrap()
            .as_rust().unwrap();
        return Author {
            author: name,
            text: messages
        };
    })
    .collect();

Во время отображения результатов следует обратить внимание на следующие трейты:

  1. IntoRustByName. Говоря простым языком, этот трейт применяется по отношению к сложным типам Cassandra таким, как row (которая, строго говоря не является отдельным типом, определенным в спецификации, но по своему внутреннему устройству может рассматриваться, как что-то близкое к User Defined Type) и UDT. Грубо говоря, get_by_name пытается отыскать "свойство" по его имени, и если находит, то возвращает результат преобразования этого свойства к Rust типу или к CDRS типам, таким как List, 'Map', UDT. Сами же эти типы есть отображение соответствующих типов данных определенных в спецификации.

  2. AsRust. Этот трейт предназначен для конечного отображения в Rust типы. Полный список имплиментаторов можно увидеть в приведенной ссылке.

Prepare & Execute

Иногда бывает удобным вначале единожды подготовить сложный запрос, а после этого выполнить его несколько раз с различными данными в разное время. Для этого прекрасно подходит Prepare & Execute.

// prepare just once
let insert_table_cql = " insert into user_keyspace.users (user_name, password, gender, session_token, state) values  (?, ?, ?, ?, ?)";

let prepared = session.prepare(insert_table_cql.to_string(), true, true)
    .unwrap()
    .get_body()
    .into_prepared()
    .unwrap();

// execute later and possible few times with different values
let v: Vec<Value> = vec![Value::new_normal(String::from("john").into_bytes()),
                             Value::new_normal(String::from("pwd").into_bytes()),
                             Value::new_normal(String::from("male").into_bytes()),
                             Value::new_normal(String::from("09000").into_bytes()),
                             Value::new_normal(String::from("FL").into_bytes())];
let execution_params = QueryParamsBuilder::new(Consistency::One).values(v).finalize();
// without tracing and warnings
let executed = session.execute(prepared.id, execution_params, false, false);

Также имеет смысл комбинировать Prepare & Batch для выполнения сразу нескольких подготовленных запросов. Простейший пример Batch также можно найти в примерах.

Cassandra events

Кроме всего вышеописанного, CDRS предоставляет возможность подписаться и следить за событиями, которые публикует сервер.

let (mut listener, stream) = session.listen_for(vec![SimpleServerEvent::SchemaChange]).unwrap();

thread::spawn(move || listener.start(&Compression::None).unwrap());

let topology_changes = stream
    // inspects all events in a stream
    .inspect(|event| println!("inspect event {:?}", event))
    // filter by event's type: topology changes
    .filter(|event| event == &SimpleServerEvent::TopologyChange)
    // filter by event's specific information: new node was added
    .filter(|event| {
        match event {
            &ServerEvent::TopologyChange(ref event) => {
                event.change_type == TopologyChangeType::NewNode
            },
            _ => false
        }
    });

println!("Start listen for server events");

for change in topology_changes {
    println!("server event {:?}", change);
}

Чтобы найти полный список событий лучше всего обратиться в саму спецификацию, а также к документации драйвера.

В будущем есть планы использовать события для "умного" load balancing.

Полезные ссылки

Автор: AlexPikalov

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js