На сайте rabbitmq.com в разделе tutorials приведены примеры реализации на различных языках, но среди них нет C++. Под катом собраны ссылки на переведенные руководства, материалы и код под спойлером.
Кому удобнее просматривать код из под интерфейса GitHub, можно сразу перейти в репозиторий.
Данный материал использует реализацию клиента AMQP-CPP и POCO C++ для работы с сокетом.
«RabbitMQ tutorial 1 — Hello World»
receive.cpp
#include <iostream>
#include "SimplePocoHandler.h"
int main(void)
{
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareQueue("hello");
channel.consume("hello", AMQP::noack).onReceived(
[](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
std::cout <<" [x] Received "<<message.message() << std::endl;
});
std::cout << " [*] Waiting for messages. To exit press CTRL-Cn";
handler.loop();
return 0;
}
send.cpp
#include <iostream>
#include "SimplePocoHandler.h"
int main(void)
{
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.onReady([&]()
{
if(handler.connected())
{
channel.publish("", "hello", "Hello World!");
std::cout << " [x] Sent 'Hello World!'" << std::endl;
handler.quit();
}
});
handler.loop();
return 0;
}
«RabbitMQ tutorial 2 — Очередь задач»
worker.cpp
#include <iostream>
#include <algorithm>
#include <thread>
#include <chrono>
#include "SimplePocoHandler.h"
int main(void)
{
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareQueue("task_queue", AMQP::durable);
channel.consume("task_queue", AMQP::noack).onReceived(
[&channel](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
const auto body = message.message();
std::cout<<" [x] Received "<<body<<std::endl;
size_t count = 0;
std::for_each(body.cbegin(), body.cend(), [&](const char& ch)
{
if(ch =='.')
{
++count;
}
});
std::this_thread::sleep_for (std::chrono::seconds(count));
std::cout<<" [x] Done"<<std::endl;
channel.ack(deliveryTag);
});
channel.setQos(1);
std::cout << " [*] Waiting for messages. To exit press CTRL-Cn";
handler.loop();
return 0;
}
new_task.cpp
#include <iostream>
#include "SimplePocoHandler.h"
#include "tools.h"
int main(int argc, const char* argv[])
{
const std::string msg =
argc > 1 ? join(&argv[1], &argv[argc], " ") : "Hello World!";
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
auto callback =
[&](const std::string &name, int msgcount, int consumercount)
{
channel.publish("", "task_queue", msg);
std::cout<<" [x] Sent '"<<msg<<"'n";
handler.quit();
};
channel.declareQueue("task_queue", AMQP::durable).onSuccess(callback);
handler.loop();
return 0;
}
«RabbitMQ tutorial 3 — Публикация/Подписка»
receive_logs.cpp
#include <iostream>
#include "SimplePocoHandler.h"
int main(void)
{
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
auto receiveMessageCallback = [](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
std::cout <<" [x] "<<message.message() << std::endl;
};
auto callback =
[&](const std::string &name, int msgcount, int consumercount)
{
channel.bindQueue("logs", name,"");
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
};
channel.declareExchange("logs", AMQP::fanout).onSuccess([&]()
{
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
});
std::cout << " [*] Waiting for messages. To exit press CTRL-Cn";
handler.loop();
return 0;
}
emit_log.cpp
#include <iostream>
#include "SimplePocoHandler.h"
#include "tools.h"
int main(int argc, const char* argv[])
{
const std::string msg =
argc > 1 ? join(&argv[1], &argv[argc], " ") : "info: Hello World!";
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareExchange("logs", AMQP::fanout).onSuccess([&]()
{
channel.publish("logs", "", msg);
std::cout << " [x] Sent "<<msg<< std::endl;
handler.quit();
});
handler.loop();
return 0;
}
«RabbitMQ tutorial 4 — Роутинг»
receive_logs_direct.cpp
#include <iostream>
#include <algorithm>
#include "SimplePocoHandler.h"
int main(int argc, const char* argv[])
{
if(argc==1)
{
std::cout<<"Usage: "<<argv[0]<<" [info] [warning] [error]"<<std::endl;
return 1;
}
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareExchange("direct_logs", AMQP::direct);
auto receiveMessageCallback =
[](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
std::cout <<" [x] "
<<message.routingKey()
<<":"
<<message.message()
<< std::endl;
};
auto callback = [&](const std::string &name,
int msgcount,
int consumercount)
{
std::for_each(&argv[1],
&argv[argc],
[&](const char* severity)
{
channel.bindQueue("direct_logs","", severity);
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
});
};
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
std::cout << " [*] Waiting for messages. To exit press CTRL-Cn";
handler.loop();
return 0;
}
emit_log_direct.cpp
#include <iostream>
#include "SimplePocoHandler.h"
#include "tools.h"
int main(int argc, const char* argv[])
{
const std::string severity = argc > 2 ? argv[1] : "info";
const std::string msg =
argc > 2 ? join(&argv[2], &argv[argc], " ") : "Hello World!";
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareExchange("direct_logs", AMQP::direct).onSuccess([&]()
{
channel.publish("direct_logs", severity, msg);
std::cout << " [x] Sent "<<severity<<":"<<msg<< std::endl;
handler.quit();
});
handler.loop();
return 0;
}
«RabbitMQ tutorial 5 — Тематики»
receive_logs_topic.cpp
#include <iostream>
#include <algorithm>
#include "SimplePocoHandler.h"
int main(int argc, const char* argv[])
{
if(argc==1)
{
std::cout<<"Usage: "<<argv[0]<<" [binding_key]..."<<std::endl;
return 1;
}
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareExchange("topic_logs", AMQP::topic);
auto receiveMessageCallback =
[](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
std::cout <<" [x] "
<<message.routingKey()
<<":"
<<message.message()
<< std::endl;
};
auto callback = [&](const std::string &name,
int msgcount,
int consumercount)
{
std::for_each(&argv[1],
&argv[argc],
[&](const char* bindingKeys)
{
std::cout<<bindingKeys<<std::endl;
channel.bindQueue("topic_logs",name, bindingKeys);
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
});
};
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
std::cout << " [*] Waiting for messages. To exit press CTRL-Cn";
handler.loop();
return 0;
}
emit_log_topic.cpp
#include <iostream>
#include "SimplePocoHandler.h"
#include "tools.h"
int main(int argc, const char* argv[])
{
const std::string msg =
argc > 1 ? join(&argv[2], &argv[argc], " ") : "Hello World!";
const std::string routing_key = argc > 1 ? argv[1] : "anonymous.info";
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareExchange("topic_logs", AMQP::topic).onSuccess([&]()
{
channel.publish("topic_logs", routing_key, msg);
std::cout << " [x] Sent "<<routing_key<<":"<<msg<< std::endl;
handler.quit();
});
handler.loop();
return 0;
}
«RabbitMQ tutorial 6 — Удаленный вызов процедур»
rpc_server.cpp
#include <iostream>
#include <algorithm>
#include <thread>
#include <chrono>
#include "SimplePocoHandler.h"
int fib(int n)
{
switch (n)
{
case 0:
return 0;
case 1:
return 1;
default:
return fib(n - 1) + fib(n - 2);
}
}
int main(void)
{
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareQueue("rpc_queue");
channel.consume("").onReceived([&channel](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
const auto body = message.message();
std::cout<<" [.] fib("<<body<<")"<<std::endl;
AMQP::Envelope env(std::to_string(fib(std::stoi(body))));
env.setCorrelationID(message.correlationID());
channel.publish("", message.replyTo(), env);
channel.ack(deliveryTag);
});
channel.setQos(1);
std::cout << " [x] Awaiting RPC requests" << std::endl;
handler.loop();
return 0;
}
rpc_client.cpp
#include <iostream>
#include "tools.h"
#include "SimplePocoHandler.h"
int main(int argc, const char* argv[])
{
const std::string correlation(uuid());
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
auto callback = [&](const std::string &name,
int msgcount,
int consumercount)
{
AMQP::Envelope env("30");
env.setCorrelationID(correlation);
env.setReplyTo(name);
channel.publish("","rpc_queue",env);
std::cout<<" [x] Requesting fib(30)"<<std::endl;
};
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
auto receiveCallback = [&](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
if(message.correlationID() != correlation)
return;
std::cout<<" [.] Got "<<message.message()<<std::endl;
handler.quit();
};
channel.consume("", AMQP::noack).onReceived(receiveCallback);
handler.loop();
return 0;
}
Автор: RPG18