It is perfectly legal to bind multiple queues with the same binding key. In our example we could add a binding between X and Q1 with binding key black. In that case, the direct exchange will behave like fanout and will broadcast the message to all the matching queues. A message with routing key black will be delivered to both Q1 and Q2.

Emitting logs

We'll use this model for our logging system. Instead of fanout we'll send messages to a direct exchange. We will supply the log severity as a routing key. That way the receiving program will be able to select the severity it wants to receive. Let's focus on emitting logs first.

As always, we need to create an exchange first:

channel.exchangeDeclare(EXCHANGE_NAME,"direct");

And we're ready to send a message:

channel.basicPublish(EXCHANGE_NAME, severity,null, message.getBytes());

To simplify things we will assume that 'severity' can be one of info, warning, or error.

Subscribing

Receiving messages will work just like in the previous tutorial, with one exception - we're going to create a new binding for each severity we're interested in.

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
channel.queueBind(queueName,EXCHANGE_NAME, severity);
}

Putting it all together

The code for EmitLogDirect.java class:

importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;

publicclassEmitLogDirect{

privatestaticfinalStringEXCHANGE_NAME="direct_logs";

publicstaticvoidmain(String[] argv)throwsException{
ConnectionFactory factory =newConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()){
channel.exchangeDeclare(EXCHANGE_NAME,"direct");

String severity =getSeverity(argv);
String message =getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, severity,null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '"+ severity +"':'"+ message +"'");
}
}
//..
}

The code for ReceiveLogsDirect.java:

importcom.rabbitmq.client.*;

publicclassReceiveLogsDirect{

privatestaticfinalStringEXCHANGE_NAME="direct_logs";

publicstaticvoidmain(String[] argv)throwsException{
ConnectionFactory factory =newConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String queueName = channel.queueDeclare().getQueue();

if(argv.length <1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}

for(String severity : argv){
channel.queueBind(queueName,EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback =(consumerTag, delivery)->{
String message =newString(delivery.getBody(),"UTF-8");
System.out.println(" [x] Received '"+
delivery.getEnvelope().getRoutingKey()+"':'"+ message +"'");
};
channel.basicConsume(queueName,true, deliverCallback, consumerTag ->{});
}
}

Compile as usual (see tutorial one for compilation and classpath advice). For convenience we'll use an environment variable $CP (that's %CP% on Windows) for the classpath when running examples.

javac -cp$CP ReceiveLogsDirect.java EmitLogDirect.java

If you want to save only 'warning' and 'error' (and not 'info') log messages to a file, just open a console and type:

java-cp$CP ReceiveLogsDirect warning error > logs_from_rabbit.log

If you'd like to see all the log messages on your screen, open a new terminal and do:

java-cp$CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C

And, for example, to emit an error log message just type:

java-cp$CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

(Full source code for (EmitLogDirect.java source) and (ReceiveLogsDirect.java source))

Move on to tutorial 5 to find out how to listen for messages based on a pattern.

AltStyle によって変換されたページ (->オリジナル) /