- 87.8k
- 14
- 104
- 325
So, today iToday I had to write a small tool to help me send HTTP requests in bulk. Rabbit was overloading my server, so iI decided to change my consumers to buffer the contents of the request, before sending. After changing my API, iI did this:
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
#include <cpr/cpr.h>
typedef std::unordered_map<std::string, std::ostringstream> HttpBuffer;
const char* RabbitQueue = getenv("RABBIT_NAME");
char QueueAddress[128];
int main()
{
sprintf(QueueAddress, "amqp://%s:%s", getenv("RABBIT_HOST"), getenv("RABBIT_PORT"));
int MaxThreads = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
for (int i = 0; i < MaxThreads; i++) {
threads.push_back(std::thread(RabbitThread));
}
for (std::thread &thread : threads) {
thread.join();
}
threads.clear();
}
void RabbitThread()
{
struct ev_loop *loop = ev_loop_new(0);
AMQP::LibEvHandler handler(loop);
AMQP::TcpConnection connection(&handler, AMQP::Address(QueueAddress));
AMQP::TcpChannel channel(&connection);
AMQP::MessageCallback onMessage = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
HttpBuffer[message.replyTo()] << message.message().c_str() << ",";
if (HttpBuffer[message.replyTo()].tellp() < 300000) {
channel.ack(deliveryTag);
return;
}
cpr::PostCallback(handle_response, cpr::Url{"http://localhost:8888"}, cpr::Body{HttpBuffer[message.replyTo()].str()});
HttpBuffer[message.replyTo()].str("");
channel.ack(deliveryTag);
};
channel.declareQueue(RabbitQueue);
channel.bindQueue("default", RabbitQueue, "default");
channel.consume(RabbitQueue).onReceived(onMessage);
ev_run(loop);
ev_loop_destroy(loop);
}
What do you guys think? How can iI improve this one? Thanks in advance!
So, today i had to write a small tool to help me send HTTP requests in bulk. Rabbit was overloading my server, so i decided to change my consumers to buffer the contents of the request, before sending. After changing my API, i did this:
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
#include <cpr/cpr.h>
typedef std::unordered_map<std::string, std::ostringstream> HttpBuffer;
const char* RabbitQueue = getenv("RABBIT_NAME");
char QueueAddress[128];
int main()
{
sprintf(QueueAddress, "amqp://%s:%s", getenv("RABBIT_HOST"), getenv("RABBIT_PORT"));
int MaxThreads = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
for (int i = 0; i < MaxThreads; i++) {
threads.push_back(std::thread(RabbitThread));
}
for (std::thread &thread : threads) {
thread.join();
}
threads.clear();
}
void RabbitThread()
{
struct ev_loop *loop = ev_loop_new(0);
AMQP::LibEvHandler handler(loop);
AMQP::TcpConnection connection(&handler, AMQP::Address(QueueAddress));
AMQP::TcpChannel channel(&connection);
AMQP::MessageCallback onMessage = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
HttpBuffer[message.replyTo()] << message.message().c_str() << ",";
if (HttpBuffer[message.replyTo()].tellp() < 300000) {
channel.ack(deliveryTag);
return;
}
cpr::PostCallback(handle_response, cpr::Url{"http://localhost:8888"}, cpr::Body{HttpBuffer[message.replyTo()].str()});
HttpBuffer[message.replyTo()].str("");
channel.ack(deliveryTag);
};
channel.declareQueue(RabbitQueue);
channel.bindQueue("default", RabbitQueue, "default");
channel.consume(RabbitQueue).onReceived(onMessage);
ev_run(loop);
ev_loop_destroy(loop);
}
What do you guys think? How can i improve this one? Thanks in advance!
Today I had to write a small tool to help me send HTTP requests in bulk. Rabbit was overloading my server, so I decided to change my consumers to buffer the contents of the request, before sending. After changing my API, I did this:
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
#include <cpr/cpr.h>
typedef std::unordered_map<std::string, std::ostringstream> HttpBuffer;
const char* RabbitQueue = getenv("RABBIT_NAME");
char QueueAddress[128];
int main()
{
sprintf(QueueAddress, "amqp://%s:%s", getenv("RABBIT_HOST"), getenv("RABBIT_PORT"));
int MaxThreads = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
for (int i = 0; i < MaxThreads; i++) {
threads.push_back(std::thread(RabbitThread));
}
for (std::thread &thread : threads) {
thread.join();
}
threads.clear();
}
void RabbitThread()
{
struct ev_loop *loop = ev_loop_new(0);
AMQP::LibEvHandler handler(loop);
AMQP::TcpConnection connection(&handler, AMQP::Address(QueueAddress));
AMQP::TcpChannel channel(&connection);
AMQP::MessageCallback onMessage = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
HttpBuffer[message.replyTo()] << message.message().c_str() << ",";
if (HttpBuffer[message.replyTo()].tellp() < 300000) {
channel.ack(deliveryTag);
return;
}
cpr::PostCallback(handle_response, cpr::Url{"http://localhost:8888"}, cpr::Body{HttpBuffer[message.replyTo()].str()});
HttpBuffer[message.replyTo()].str("");
channel.ack(deliveryTag);
};
channel.declareQueue(RabbitQueue);
channel.bindQueue("default", RabbitQueue, "default");
channel.consume(RabbitQueue).onReceived(onMessage);
ev_run(loop);
ev_loop_destroy(loop);
}
What do you guys think? How can I improve this one?
Bulk HTTP request queue consumer
So, today i had to write a small tool to help me send HTTP requests in bulk. Rabbit was overloading my server, so i decided to change my consumers to buffer the contents of the request, before sending. After changing my API, i did this:
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
#include <cpr/cpr.h>
typedef std::unordered_map<std::string, std::ostringstream> HttpBuffer;
const char* RabbitQueue = getenv("RABBIT_NAME");
char QueueAddress[128];
int main()
{
sprintf(QueueAddress, "amqp://%s:%s", getenv("RABBIT_HOST"), getenv("RABBIT_PORT"));
int MaxThreads = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
for (int i = 0; i < MaxThreads; i++) {
threads.push_back(std::thread(RabbitThread));
}
for (std::thread &thread : threads) {
thread.join();
}
threads.clear();
}
void RabbitThread()
{
struct ev_loop *loop = ev_loop_new(0);
AMQP::LibEvHandler handler(loop);
AMQP::TcpConnection connection(&handler, AMQP::Address(QueueAddress));
AMQP::TcpChannel channel(&connection);
AMQP::MessageCallback onMessage = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
HttpBuffer[message.replyTo()] << message.message().c_str() << ",";
if (HttpBuffer[message.replyTo()].tellp() < 300000) {
channel.ack(deliveryTag);
return;
}
cpr::PostCallback(handle_response, cpr::Url{"http://localhost:8888"}, cpr::Body{HttpBuffer[message.replyTo()].str()});
HttpBuffer[message.replyTo()].str("");
channel.ack(deliveryTag);
};
channel.declareQueue(RabbitQueue);
channel.bindQueue("default", RabbitQueue, "default");
channel.consume(RabbitQueue).onReceived(onMessage);
ev_run(loop);
ev_loop_destroy(loop);
}
What do you guys think? How can i improve this one? Thanks in advance!