It is perfectly legal to bind multiple queues with the same binding
key. In our example we could add a binding between X and Q1 with
binding key black. In that case, the direct exchange will behave
like fanout and will broadcast the message to all the matching
queues. A message with routing key black will be delivered to both
Q1 and Q2.
We'll use this model for our logging system. Instead of fanout we'll
send messages to a direct exchange. We will supply the log severity as
a routing key. That way the receiving program will be able to select
the severity it wants to receive. Let's focus on emitting logs
first.
As always, we need to create an exchange first:
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
And we're ready to send a message:
channel.basicPublish(EXCHANGE_NAME, severity,null, message.getBytes());
To simplify things we will assume that 'severity' can be one of
info, warning, or error.
Receiving messages will work just like in the previous tutorial, with one exception - we're going to create a new binding for each severity we're interested in.
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName,EXCHANGE_NAME, severity);
}
The code for EmitLogDirect.java class:
importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
publicclassEmitLogDirect{
privatestaticfinalStringEXCHANGE_NAME="direct_logs";
publicstaticvoidmain(String[] argv)throwsException{
ConnectionFactory factory =newConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()){
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String severity =getSeverity(argv);
String message =getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity,null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '"+ severity +"':'"+ message +"'");
}
}
//..
}
The code for ReceiveLogsDirect.java:
importcom.rabbitmq.client.*;
publicclassReceiveLogsDirect{
privatestaticfinalStringEXCHANGE_NAME="direct_logs";
publicstaticvoidmain(String[] argv)throwsException{
ConnectionFactory factory =newConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String queueName = channel.queueDeclare().getQueue();
if(argv.length <1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for(String severity : argv){
channel.queueBind(queueName,EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback =(consumerTag, delivery)->{
String message =newString(delivery.getBody(),"UTF-8");
System.out.println(" [x] Received '"+
delivery.getEnvelope().getRoutingKey()+"':'"+ message +"'");
};
channel.basicConsume(queueName,true, deliverCallback, consumerTag ->{});
}
}
Compile as usual (see tutorial one for compilation and classpath advice). For convenience we'll use an environment variable $CP (that's %CP% on Windows) for the classpath when running examples.
javac -cp$CP ReceiveLogsDirect.java EmitLogDirect.java
If you want to save only 'warning' and 'error' (and not 'info') log messages to a file, just open a console and type:
java-cp$CP ReceiveLogsDirect warning error > logs_from_rabbit.log
If you'd like to see all the log messages on your screen, open a new terminal and do:
java-cp$CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C
And, for example, to emit an error log message just type:
java-cp$CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
(Full source code for (EmitLogDirect.java source) and (ReceiveLogsDirect.java source))
Move on to tutorial 5 to find out how to listen for messages based on a pattern.