It is perfectly legal to bind multiple queues with the same binding key. In our example we could add a binding between X and Q1 with binding key black. In that case, the direct exchange will behave like fanout and will broadcast the message to all the matching queues. A message with routing key black will be delivered to both Q1 and Q2.

Emitting logs

We'll use this model for our logging system. Instead of fanout we'll send messages to a direct exchange. We will supply the log severity as a routing key. That way the receiving script will be able to select the severity it wants to receive. Let's focus on emitting logs first.

Like always we need to create an exchange first:

channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')

And we're ready to send a message:

channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)

To simplify things we will assume that 'severity' can be one of info, warning, or error.

Subscribing

Receiving messages will work just like in the previous tutorial, with one exception - we're going to create a new binding for each severity we're interested in.

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)

Putting it all together

emit_log_direct.py (source)

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1]iflen(sys.argv)>1else'info'
message =' '.join(sys.argv[2:])or'Hello World!'
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
print(f" [x] Sent {severity}:{message}")
connection.close()

receive_logs_direct.py (source)

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
ifnot severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n"% sys.argv[0])
sys.exit(1)

for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


defcallback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")


channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

If you want to save only 'warning' and 'error' (and not 'info') log messages to a file, just open a console and type:

# -u is used to avoid buffering of the messages printed to the standard output
python -u receive_logs_direct.py warning error > logs_from_rabbit.log

If you'd like to see all the log messages on your screen, open a new terminal and do:

python receive_logs_direct.py info warning error
# => [*] Waiting for logs. To exit press CTRL+C

And, for example, to emit an error log message just type:

python emit_log_direct.py error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

Move on to tutorial 5 to find out how to listen for messages based on a pattern.

AltStyle によって変換されたページ (->オリジナル) /