#include <qpid/messaging/Address.h> #include <qpid/messaging/Connection.h> #include <qpid/messaging/Message.h> #include <qpid/messaging/Message_io.h> #include <qpid/messaging/Sender.h> #include <qpid/messaging/Session.h> #include <qpid/types/Variant.h> #include <iostream> #include <sstream> #include <vector> #include <ctime> #include "OptionParser.h" using namespace qpid::messaging; using namespace qpid::types; typedef std::vector<std::string> string_vector; struct Options : OptionParser { std::string url; std::string address; int timeout; bool durable; int count; std::string id; std::string replyto; string_vector properties; string_vector entries; std::string content; std::string connectionOptions; bool print; Options() : OptionParser("Usage: spout [OPTIONS] ADDRESS", "Send messages to the specified address"), url("127.0.0.1"), timeout(0), count(1), durable(false), print(false) { add("broker,b", url, "url of broker to connect to"); add("timeout,t", timeout, "exit after the specified time"); add("durable,d", durable, "make the message durable (def. transient)"); add("count,c", count, "stop after count messages have been sent, zero disables"); add("id,i", id, "use the supplied id instead of generating one"); add("reply-to", replyto, "specify reply-to address"); add("property,P", properties, "specify message property"); add("map,M", entries, "specify entry for map content"); add("content", content, "specify textual content"); add("connection-options", connectionOptions, "connection options string in the form {name1:value1, name2:value2}"); add("print", print, "print each message sent"); } static bool nameval(const std::string& in, std::string& name, std::string& value) { std::string::size_type i = in.find("="); if (i == std::string::npos) { name = in; return false; } else { name = in.substr(0, i); if (i+1 < in.size()) { value = in.substr(i+1); return true; } else { return false; } } } static void setProperty(Message& message, const std::string& property) { std::string name; std::string value; if (nameval(property, name, value)) { message.getProperties()[name] = value; message.getProperties()[name].setEncoding("utf8"); } else { message.getProperties()[name] = Variant(); } } void setProperties(Message& message) const { for (string_vector::const_iterator i = properties.begin(); i != properties.end(); ++i) { setProperty(message, *i); } } void setEntries(Variant::Map& content) const { for (string_vector::const_iterator i = entries.begin(); i != entries.end(); ++i) { std::string name; std::string value; if (nameval(*i, name, value)) { content[name] = value; } else { content[name] = Variant(); } } } bool checkAddress() { if (getArguments().empty()) { error("Address is required"); return false; } else { address = getArguments()[0]; return true; } } bool isDurable() const { return durable; } }; int main(int argc, char** argv) { Options options; if (options.parse(argc, argv) && options.checkAddress()) { Connection connection(options.url, options.connectionOptions); try { connection.open(); Session session = connection.createSession(); Sender sender = session.createSender(options.address); Message message; message.setDurable(options.isDurable()); options.setProperties(message); Variant& obj = message.getContentObject(); if (options.entries.size()) { Variant::Map content; options.setEntries(content); obj = content; } else if (options.content.size()) { obj = options.content; obj.setEncoding("utf8"); } std::time_t start = std::time(0); for (int count = 0; (count < options.count || options.count == 0) && (options.timeout == 0 || std::difftime(std::time(0), start) < options.timeout); count++) { if (!options.replyto.empty()) message.setReplyTo(Address(options.replyto)); std::string id = options.id.empty() ? Uuid(true).str() : options.id; std::stringstream spoutid; spoutid << id << ":" << count; message.getProperties()["spout-id"] = spoutid.str(); if (options.print) std::cout << message << std::endl; sender.send(message); } session.sync(); connection.close(); return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; connection.close(); } } return 1; }
Apache Qpid, Messaging built on AMQP; Copyright © 2015 The Apache Software Foundation; Licensed under the Apache License, Version 2.0; Apache Qpid, Qpid, Qpid Proton, Proton, Apache, the Apache feather logo, and the Apache Qpid project logo are trademarks of The Apache Software Foundation; All other marks mentioned may be trademarks or registered trademarks of their respective owners