Topic exchange is powerful and can behave like other exchanges.

When a queue is bound with # (hash) binding key - it will receive all the messages, regardless of the routing key - like in fanout exchange.

When special characters, * (star) and # (hash), aren't used in bindings, the topic exchange will behave just like a direct one.

Putting it all together

We're going to use a topic exchange in our logging system. We'll start off with a working assumption that the routing keys of logs will have two words: <facility>.<severity>.

The code is almost the same as in the previous tutorial.

The code for EmitLogTopic.java:

importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;

publicclassEmitLogTopic{

privatestaticfinalStringEXCHANGE_NAME="topic_logs";

publicstaticvoidmain(String[] argv)throwsException{
ConnectionFactory factory =newConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()){

channel.exchangeDeclare(EXCHANGE_NAME,"topic");

String routingKey =getRouting(argv);
String message =getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, routingKey,null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '"+ routingKey +"':'"+ message +"'");
}
}
//..
}

The code for ReceiveLogsTopic.java:

importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
importcom.rabbitmq.client.DeliverCallback;

publicclassReceiveLogsTopic{

privatestaticfinalStringEXCHANGE_NAME="topic_logs";

publicstaticvoidmain(String[] argv)throwsException{
ConnectionFactory factory =newConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queueName = channel.queueDeclare().getQueue();

if(argv.length <1){
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}

for(String bindingKey : argv){
channel.queueBind(queueName,EXCHANGE_NAME, bindingKey);
}

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback =(consumerTag, delivery)->{
String message =newString(delivery.getBody(),"UTF-8");
System.out.println(" [x] Received '"+
delivery.getEnvelope().getRoutingKey()+"':'"+ message +"'");
};
channel.basicConsume(queueName,true, deliverCallback, consumerTag ->{});
}
}

Compile and run the examples, including the classpath as in Tutorial 1 - on Windows, use %CP%.

To compile:

javac -cp$CP ReceiveLogsTopic.java EmitLogTopic.java

To receive all the logs:

java-cp$CP ReceiveLogsTopic "#"

To receive all logs from the facility kern:

java-cp$CP ReceiveLogsTopic "kern.*"

Or if you want to hear only about critical logs:

java-cp$CP ReceiveLogsTopic "*.critical"

You can create multiple bindings:

java-cp$CP ReceiveLogsTopic "kern.*""*.critical"

And to emit a log with a routing key kern.critical type:

java-cp$CP EmitLogTopic "kern.critical""A critical kernel error"

Have fun playing with these programs. Note that the code doesn't make any assumption about the routing or binding keys, you may want to play with more than two routing key parameters.

(Full source code for EmitLogTopic.java and ReceiveLogsTopic.java)

Next, find out how to do a round trip message as a remote procedure call in tutorial 6

AltStyle によって変換されたページ (->オリジナル) /