Недавно мне довелось поработать над приложением, которое должно было контролировать скорость своих исходящих подключений. Например, подключаясь к одному URL приложение должно было ограничить себя, скажем, 200KiB/sec. А подключаясь к другому URL — всего 30KiB/sec.
Самым интересным моментом здесь оказалось тестирование этих самых ограничений. Мне потребовался HTTP-сервер, который бы отдавал трафик с какой-то заданной скоростью, например, 512KiB/sec. Тогда бы я мог видеть, действительно ли приложение выдерживает скорость 200KiB/sec или же оно срывается на более высокие скорости.
Но где взять такой HTTP-сервер?
Поскольку я имею некоторое отношение к встраиваемому в С++ приложения HTTP-серверу RESTinio, то не придумал ничего лучше, чем быстренько набросать на коленке простой тестовый HTTP-сервер, который способен отдавать клиенту длинный поток исходящих данных.
О том, насколько это было просто и хотелось бы рассказать в статье. Заодно узнать в комментариях, действительно ли это просто или же я сам себя обманываю. В принципе, данную статью можно рассматривать как продолжение предыдущей статьи про RESTinio под названием "RESTinio — это асинхронный HTTP-сервер. Асинхронный". Посему, если кому-то интересно прочитать о реальном, пусть и не очень серьезном применении RESTinio, то милости прошу под кат.
Общая идея
Общая идея упомянутого выше тестового сервера очень проста: когда клиент подключается к серверу и выполняет HTTP GET запрос, то взводится таймер, срабатывающий раз в секунду. Когда таймер срабатывает, то клиенту отсылается очередной блок данных заданного размера.
Но все несколько сложнее
Если клиент вычитывает данные с меньшим темпом, нежели отсылает сервер, то просто отсылать по N килобайт раз в секунду не есть хорошая идея. Поскольку данные начнут скапливаться в сокете и ни к чему хорошему это не приведет.
Поэтому при отсылке данных желательно на стороне HTTP-сервера контролировать готовность сокета к записи. Пока сокет готов (т.е. в нем не скопилось еще слишком много данных), то новую порцию отсылать можно. А вот если не готов, то нужно подождать пока сокет не перейдет в состояние готовности к записи.
Звучит разумно, но ведь операции ввода-вывода скрыты в потрохах RESTinio… Как тут узнать, можно ли записывать следующую порцию данных или нет?
Из данной ситуации можно выйти, если использовать after-write нотификаторы, которые есть в RESTinio. Например, мы можем написать так:
void request_handler(restinio::request_handle_t req) {
req->create_response() // Начинаем формировать ответ.
... // Наполняем ответ содержимым.
.done([](const auto & ec) {
... // Вот этот код будет вызван когда запись ответа закончится.
});
}
Лямбда, переданная в метод done()
будет вызвана когда RESTinio завершит запись исходящих данных. Соответственно, если сокет какое-то время был не готов к записи, то лямбда будет вызвана не сразу, а после того, как сокет придет в должное состояние и примет все исходящие данные.
За счет использования after-write нотификаторов логика работы тестового сервера будет такой:
- отсылаем очередную порцию данных, вычисляем время, когда нам нужно было бы отослать следующую порцию при нормальном развитии событий;
- вешаем after-write нотификатор на очередную порцию данных;
- когда after-write нотификатор вызывается, мы проверяем, наступило ли время отсылки следующей порции. Если наступило, то сразу же инициируем отсылку следующей порции. Если не наступило, то взводим таймер.
В результате получится, что как только запись начнет притормаживать, отсылка новых данных приостановится. И возобновиться когда сокет будет готов принимать новые исходящие данные.
И еще немного сложного: chunked_output
RESTinio поддерживает три способа формирования ответа на HTTP-запрос. Самый простой способ, который применяется по умолчанию, в данном случае не подходит, т.к. мне требуется практически бесконечный поток исходящих данных. И такой поток, естественно, нельзя отдать в единственный вызов метода set_body
.
Поэтому в описываемом тестовом сервере используется т.н. chunked_output. Т.е. при создании ответа я указываю RESTinio, что ответ будет формироваться частями. После чего просто периодически вызываю методы append_chunk
для добавления к ответу очередной части и flush
для записи накопленных частей в сокет.
А давайте уже посмотрим в код!
Пожалуй, достаточно уже вступительных слов и пора перейти к рассмотрению самого кода, который можно найти в этом репозитории. Начнем с функции request_processor
, которая вызывается для обработки каждого корректного HTTP-запроса. При этом углубимся в те функции, которые из request_processor
вызываются. Ну а затем уже посмотрим, как именно request_processor
ставится в соответствие тому или иному входящему HTTP-запросу.
Функция request_processor и её подручные
Функция request_processor
вызывается для обработки нужных мне HTTP GET запросов. Ей в качестве аргументов передаются:
- Asio-шный io_context, на котором ведется вся работа (он потребуется, например, для взведения таймеров);
- размер одной части ответа. Т.е. если мне нужно отдавать исходящий поток с темпом в 512KiB/sec, то в качестве этого параметра будет передано значение 512KiB;
- количество частей в ответе. На случай, если поток должен иметь какую-то ограниченную длину. Например, если нужно отдавать поток с темпом 512KiB/sec в течении 5 минут, то в качестве этого параметра будет передано значение 300 (60 блоков в минуту в течении 5 минут);
- ну и сам входящий запрос для обработки.
Внутри request_processor
создается объект с информацией о запросе и параметрах его обработки, после чего эта самая обработка и начинается:
void request_processor(
asio_ns::io_context & ctx,
std::size_t chunk_size,
std::size_t count,
restinio::request_handle_t req) {
auto data = std::make_shared<response_data>(
ctx,
chunk_size,
req->create_response<output_t>(),
count);
data->response_
.append_header(restinio::http_field::server, "RESTinio")
.append_header_date_field()
.append_header(
restinio::http_field::content_type,
"text/plain; charset=utf-8")
.flush();
send_next_portion(data);
}
Тип response_data
, содержащий все относящиеся к запросу параметры, выглядит следующим образом:
struct response_data {
asio_ns::io_context & io_ctx_;
std::size_t chunk_size_;
response_t response_;
std::size_t counter_;
response_data(
asio_ns::io_context & io_ctx,
std::size_t chunk_size,
response_t response,
std::size_t counter)
: io_ctx_{io_ctx}
, chunk_size_{chunk_size}
, response_{std::move(response)}
, counter_{counter}
{}
};
Тут нужно заметить, что одна из причин появления структуры response_data
состоит в том, что объект типа restinio::response_builder_t<restinio::chunked_output_t>
(а именно этот тип спрятан за коротким псевдонимом response_t
) является moveable-, но не copyable-типом (по аналогии с std::unique_ptr
). Поэтому этот объект нельзя просто так захватить в лямбда-функции, которая затем оборачивается в std::function
. Но если объект-response поместить в динамически созданный экземпляр response_data
, то умный указатель на экземпляр reponse_data
уже можно без проблем захватывать в лямбда-функции с последующим сохранением этой лямбды в std::function
.
Функция send_next_portion
Функция send_next_portion
вызывается каждый раз, когда требуется отослать клиенту очередную часть ответа. Ничего сложного в ней не происходит, поэтому выглядит она достаточно просто и лаконично:
void send_next_portion(response_data_shptr data) {
data->response_.append_chunk(make_buffer(data->chunk_size_));
if(1u == data->counter_) {
data->response_.flush();
data->response_.done();
}
else {
data->counter_ -= 1u;
data->response_.flush(make_done_handler(data));
}
}
Т.е. отсылаем очередную часть. И, если эта часть была последней, то завершаем обработку запроса. А если не последняя, то в метод flush
передается after-write нотификатор, который создается, пожалуй, наиболее сложной функцией данного примера.
Функция make_done_handler
Функция make_done_handler
отвечает за создание лямбды, которая будет передана в RESTinio в качестве after-write нотификатора. Этот нотификатор должен проверить, завершилась ли запись очередной части ответа успешно. Если да, то нужно разобраться, следует ли следующую часть отослать сразу же (т.е. были "тормоза" в сокете и темп отсылки выдерживать не получается), либо же после некоторой паузы. Если нужна пауза, то она обеспечивается через взведение таймера.
В общем-то, несложные действия, но в коде получается лямбда внутри лямбды, что может смутить людей, не привыкших к "современному" С++. Которому не так уж и мало лет чтобы называться современным ;)
auto make_done_handler(response_data_shptr data) {
const auto next_timepoint = steady_clock::now() + 1s;
return [=](const auto & ec) {
if(!ec) {
const auto now = steady_clock::now();
if(now < next_timepoint) {
auto timer = std::make_shared<asio_ns::steady_timer>(data->io_ctx_);
timer->expires_after(next_timepoint - now);
timer->async_wait([timer, data](const auto & ec) {
if(!ec)
send_next_portion(data);
});
}
else
data->io_ctx_.post([data] { send_next_portion(data); });
}
};
}
На мой взгляд, основная сложность в этом коде проистекает из-за особенностей создания и "взвода" таймеров в Asio. По-моему, получается как-то слишком уж многословно. Но тут уж что есть, то есть. Зато не нужно никаких дополнительных библиотек привлекать.
Подключение express-like роутера
Показанные выше request_processor
, send_next_portion
и make_done_handler
в общем-то и составляли самую первую версию моего тестового сервера, написанного буквально за 15 или 20 минут.
Но через пару дней использования этого тестового сервера оказалось, что в нем есть серьезный недостаток: он всегда отдает ответный поток с одинаковой скоростью. Скомпилировал со скоростью 512KiB/sec — отдает всем 512KiB/sec. Перекомпилировал со скоростью 20KiB/sec — будет отдавать всем 20KiB/sec и никак иначе. Что было неудобно, т.к. стало нужно иметь возможность получать ответы разной "толщины".
Тогда и появилась идея: а что, если скорость отдачи будет запрашиваться прямо в URL? Например, сделали запрос на localhost:8080/
и получили ответ с заранее заданной скоростью. А если сделали запрос на localhost:8080/128K
, то стали получать ответ со скоростью 128KiB/sec.
Потом мысль пошла еще дальше: в URL также можно задавать и количество отдельных частей в ответе. Т.е. запрос localhost:8080/128K/3000
приведет к выдаче потока из 3000 частей со скоростью 128KiB/sec.
Нет проблем. В RESTinio есть возможность использовать маршрутизатор запросов, сделанный под влиянием ExpressJS. В итоге появилась вот такая функция описания обработчиков входящих HTTP-запросов:
auto make_router(asio_ns::io_context & ctx) {
auto router = std::make_unique<router_t>();
router->http_get("/", [&ctx](auto req, auto) {
request_processor(ctx, 100u*1024u, 10000u, std::move(req));
return restinio::request_accepted();
});
router->http_get(
R"(/:value(d+):multiplier([MmKkBb]?))",
[&ctx](auto req, auto params) {
const auto chunk_size = extract_chunk_size(params);
if(0u != chunk_size) {
request_processor(ctx, chunk_size, 10000u, std::move(req));
return restinio::request_accepted();
}
else
return restinio::request_rejected();
});
router->http_get(
R"(/:value(d+):multiplier([MmKkBb]?)/:count(d+))",
[&ctx](auto req, auto params) {
const auto chunk_size = extract_chunk_size(params);
const auto count = restinio::cast_to<std::size_t>(params["count"]);
if(0u != chunk_size && 0u != count) {
request_processor(ctx, chunk_size, count, std::move(req));
return restinio::request_accepted();
}
else
return restinio::request_rejected();
});
return router;
}
Здесь формируются обработчики HTTP GET запросов для URL трех типов:
- вида
http://localhost/
; - вида
http://localhost/<speed>[<U>]/
; - вида
http://localhost/<speed>[<U>]/<count>/
Где speed
— это число, определяющее скорость, а U
— это опциональный мультипликатор, который указывает, в каких единицах задана скорость. Так 128
или 128b
означает скорость в 128 байт в секунду. А 128k
— 128 килобайт в секунду.
На каждый URL вешается своя лямбда функция, которая разбирается с полученными параметрами, если все нормально, вызывает уже показанную выше функцию request_processor
.
Вспомогательная функция extract_chunk_size
выглядит следующим образом:
std::size_t extract_chunk_size(const restinio::router::route_params_t & params) {
const auto multiplier = [](const auto sv) noexcept -> std::size_t {
if(sv.empty() || "B" == sv || "b" == sv) return 1u;
else if("K" == sv || "k" == sv) return 1024u;
else return 1024u*1024u;
};
return restinio::cast_to<std::size_t>(params["value"]) *
multiplier(params["multiplier"]);
}
Здесь C++ная лямбда используется для эмуляции локальных функций из других языков программирования.
Функция main
Осталось посмотреть, как все это запускается в функции main:
using router_t = restinio::router::express_router_t<>;
...
int main() {
struct traits_t : public restinio::default_single_thread_traits_t {
using logger_t = restinio::single_threaded_ostream_logger_t;
using request_handler_t = router_t;
};
asio_ns::io_context io_ctx;
restinio::run(
io_ctx,
restinio::on_this_thread<traits_t>()
.port(8080)
.address("localhost")
.write_http_response_timelimit(60s)
.request_handler(make_router(io_ctx)));
return 0;
}
Что здесь происходит:
- Поскольку мне нужен не обычный штатный роутер запросов (который вообще ничего делать сам не может и перекладывает всю работу на плечи программиста), то я определяю новые свойства для своего HTTP-сервера. Для этого беру штатные свойства однопоточного HTTP-сервера (тип
restinio::default_single_thread_traits_t
) и указываю, что в качестве обработчика запросов будет использоваться экземпляр express-like роутера. Заодно, чтобы контролировать, что происходит внутри, указываю, чтобы HTTP-сервер использовал настоящий логгер (по умолчанию используетсяnull_logger_t
который вообще ничего не логирует). - Поскольку мне нужно взводить таймеры внутри after-write нотификаторов, то мне нужен экземпляр io_context, с которым я смог бы работать. Поэтому я его создаю сам. Это дает мне возможность передать ссылку на мой io_context в функцию
make_router
. - Остается только запустить HTTP-сервер в однопоточном варианте на ранее созданном мной io_context-е. Функция
restinio::run
вернет управление только когда HTTP-сервер завершит свою работу.
Заключение
В статье не был показан полный код моего тестового сервера, только его основные моменты. Полный код, которого чуть-чуть больше из-за дополнительных typedef-ов и вспомогательных функций, несколько подлиннее. Увидеть его можно здесь. На момент написания статьи это 185 строк, включая пустые строки и комментарии. Ну и написаны эти 185 строк за пару-тройку подходов суммарной длительностью вряд ли более часа.
Мне такой результат понравился и задача оказалась интересной. В практическом плане быстро был получен нужный мне вспомогательный инструмент. И в плане дальнейшего развития RESTinio появились кое-какие мысли.
В общем, если кто-то еще не пробовал RESTinio, то я приглашаю попробовать. Сам проект живет на BitBucket, есть зеркало на GitHub. Задать вопрос или высказать свои предложения можно в Google-группе или прямо здесь, в комментариях.
Автор: Евгений Охотников