Today I had to write a small tool to help me send HTTP requests in bulk. Rabbit was overloading my server, so I decided to change my consumers to buffer the contents of the request, before sending. After changing my API, I did this:
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
#include <cpr/cpr.h>
typedef std::unordered_map<std::string, std::ostringstream> HttpBuffer;
const char* RabbitQueue = getenv("RABBIT_NAME");
char QueueAddress[128];
int main()
{
sprintf(QueueAddress, "amqp://%s:%s", getenv("RABBIT_HOST"), getenv("RABBIT_PORT"));
int MaxThreads = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
for (int i = 0; i < MaxThreads; i++) {
threads.push_back(std::thread(RabbitThread));
}
for (std::thread &thread : threads) {
thread.join();
}
threads.clear();
}
void RabbitThread()
{
struct ev_loop *loop = ev_loop_new(0);
AMQP::LibEvHandler handler(loop);
AMQP::TcpConnection connection(&handler, AMQP::Address(QueueAddress));
AMQP::TcpChannel channel(&connection);
AMQP::MessageCallback onMessage = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
HttpBuffer[message.replyTo()] << message.message().c_str() << ",";
if (HttpBuffer[message.replyTo()].tellp() < 300000) {
channel.ack(deliveryTag);
return;
}
cpr::PostCallback(handle_response, cpr::Url{"http://localhost:8888"}, cpr::Body{HttpBuffer[message.replyTo()].str()});
HttpBuffer[message.replyTo()].str("");
channel.ack(deliveryTag);
};
channel.declareQueue(RabbitQueue);
channel.bindQueue("default", RabbitQueue, "default");
channel.consume(RabbitQueue).onReceived(onMessage);
ev_run(loop);
ev_loop_destroy(loop);
}
What do you guys think? How can I improve this one?
-
\$\begingroup\$ Seems strange to run an event loop in each thread. Can you give us some more description of what is happening? \$\endgroup\$Loki Astari– Loki Astari2016年06月09日 16:19:59 +00:00Commented Jun 9, 2016 at 16:19
-
\$\begingroup\$ Well, i want to have more consumers in a single process. So, i spawn the maximum amount of threads that the machine can handle and i tell them to listen to the queue. \$\endgroup\$vinnylinux– vinnylinux2016年06月09日 20:15:37 +00:00Commented Jun 9, 2016 at 20:15
2 Answers 2
I don't know rabbit and have only small nit-picks regarding C++.
emplace
You might consider constructing threads in place instead of moving them to vector.
Instead of:
threads.push_back(std::thread(RabbitThread));
I would try:
threads.emplace_back(RabbitThread);
clear()
Is threads.clear();
in main()
necessary?
struct and null pointer
This looks like C-style struct
keyword usage. It is very rare to see in C++ (not saying it is wrong).
struct ev_loop *loop = ev_loop_new(0);
This might be also ok and looks more familiar
ev_loop *loop = ev_loop_new(0);
One more thing and this is a wild guess. Is this zero really a NULL
pointer?
struct ev_loop *loop = ev_loop_new(0);
This would be more C++11 way:
struct ev_loop *loop = ev_loop_new(nullptr);
hardware_concurrency()
I really have no idea where lies bottleneck in you use case but I suspect it is not CPU bound so maybe you might utilize even much more threads.
int MaxThreads = std::thread::hardware_concurrency();
-
\$\begingroup\$ I'm not sure if i understand the emplace. What is the difference? About clear... don't i have to free the resources allocated by the threads themselves? \$\endgroup\$vinnylinux– vinnylinux2016年06月17日 00:11:43 +00:00Commented Jun 17, 2016 at 0:11
-
\$\begingroup\$ Without hardware_concurrency(), how can i know the maximum amount of threads to run? \$\endgroup\$vinnylinux– vinnylinux2016年06月17日 00:12:55 +00:00Commented Jun 17, 2016 at 0:12
-
\$\begingroup\$
emplace_back()
means onlystd::thread
constructor is called for each member.push_back()
meansstd::thread
constructor andstd::thread
move constructor are called for each member.hardware_concurrency()
is definitely not maximum amount of threads! It is number of hardware implemented threads (and not guaranteed to be exact). On Linux you can have thousands of logical threads on single core. See stackoverflow.com/questions/344203/… \$\endgroup\$Jan Korous– Jan Korous2016年06月17日 07:56:32 +00:00Commented Jun 17, 2016 at 7:56
I've never used the Rabbit libraries, so I'll only comment on the C++.
#include
what you useI had to add
#include <unordered_map> #include <stdlib.h> #include <string.h> #include <thread> #include <vector>
before I could compile. Don't omit the essentials from code reviews!
Error handling
Did you leave out all the error handling to simplify the review? If so, please tell us this in your question. Otherwise, it looks like you've not thought about it at all!
sprintf
sprintf(QueueAddress, "amqp://%s:%s", getenv("RABBIT_HOST"), getenv("RABBIT_PORT"));
There's no bounds checking here. You could use
snprintf(QueueAddress, sizeof QueueAddress, ...)
, but I'd lean more to building astd::string
(with+
or with astd::ostringstream
), and use itsc_str()
if you need a C-style string:const auto QueueAddress = std::string("amqp://") + getenv("RABBIT_HOST") + ':' + getenv("RABBIT_PORT"); .... auto address = AMQP::Address(QueueAddress.c_str());
threads.push_back
Generally, we prefer
emplace_back()
when creating new objects in a vector. This saves us from having to think whether the class has an efficient move assignment operator.threads.clear()
I'm not sure about clearing the vector immediately before it goes out of scope. I can see that it demonstrates a desire to clean up correctly - but perhaps it shows a lack of confidence in scoped variables. It's obviously good practice in garbage-collected environments, but in C++ it just adds noise to the program.
magic constants
What's that
300000
doing in the middle of the program? I can't tell the significance of it, or how it was chosen; it looks like a policy choice mixed in with the implementation. I'd suggest creating a named constant so that we can understand why that test is there. I think it's the minimum buffered data to trigger a reply, so I'll call itBUFFER_THRESHOLD
.naming
Naming standards can always be contentious, and if you have existing conventions, then you should stick with them. But I find the use of PascalCase for variables jarring - C++ code generally uses snake_case or camelCase here. I note that the Rabbit classes use PascalCase for classes, and camelCase for members, so I recommend doing likewise. Especially as your single-word variables (
threads
,channel
,loop
) are not capitalised.This actually is an issue here, as you create a type alias
HttpBuffer
but then refer to a variable of that name. It's not clear whether thetypedef
should be a declaration instead, or whether you meant to use the type later to declare a local. It appears that you meant the former, but a consistent naming scheme would have helped.HttpBuffer[message.replyTo()]
It's probably worth keeping a local for the result of this call in
onMessage
, as it's used four times:AMQP::MessageCallback onMessage = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool /*unused*/) { auto& reply = HttpBuffer[message.replyTo()]; reply << message.message().c_str() << ","; if (reply.tellp() < BUFFER_THRESHOLD) { channel.ack(deliveryTag); return; } cpr::PostCallback(handle_response, cpr::Url{"http://localhost:8888"}, cpr::Body{reply.str()}); reply.str(""); channel.ack(deliveryTag); };
(And what's the type of
message.message()
? If it's astd::string
, then no need to convert to a C string foroperator<<
.inverted condition
In the callback, you have condition of the form:
if (c) { foo(); return; } ...; foo(); }
It's probably clearer to use the opposite condition to decide whether to take action:
if (!c) { ...; } foo(); }
This gives you
AMQP::MessageCallback onMessage = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool /*unused*/) { const auto& reply_to = message.replyTo(); auto& reply = HttpBuffer[message.replyTo()]; reply << message.message().c_str() << ","; if (reply.tellp() >= BUFFER_THRESHOLD) { cpr::PostCallback(handle_response, cpr::Url{"http://localhost:8888"}, cpr::Body{reply.str()}); reply.str(""); } channel.ack(deliveryTag); };
Final flushing
It's not clear to me whether one or more threads may still have unflushed data at program exit. I can't see any code to ensure that all replies get transmitted.
-
\$\begingroup\$ I haven't added error handling. There aren't many errors to handle here, at least, not that i've seen. \$\endgroup\$vinnylinux– vinnylinux2016年06月17日 12:53:22 +00:00Commented Jun 17, 2016 at 12:53
-
\$\begingroup\$ How can i build an std::string with sprintf? \$\endgroup\$vinnylinux– vinnylinux2016年06月17日 12:54:02 +00:00Commented Jun 17, 2016 at 12:54
-
\$\begingroup\$ As for the cleanup code, i was just trying to avoid memory leaks. \$\endgroup\$vinnylinux– vinnylinux2016年06月17日 12:56:22 +00:00Commented Jun 17, 2016 at 12:56
-
\$\begingroup\$ Oh, i also just noticed a bug. If i stop getting messages, HttpBuffer will have zombie data. Any ideas on how to solve that? \$\endgroup\$vinnylinux– vinnylinux2016年06月17日 13:09:54 +00:00Commented Jun 17, 2016 at 13:09
-
\$\begingroup\$ You don't need
sprintf()
to build the string - I've edited to show. \$\endgroup\$Toby Speight– Toby Speight2016年06月17日 13:12:23 +00:00Commented Jun 17, 2016 at 13:12