It is perfectly legal to bind multiple queues with the same binding
key. In our example we could add a binding between X and Q1 with
binding key black. In that case, the direct exchange will behave
like fanout and will broadcast the message to all the matching
queues. A message with routing key black will be delivered to both
Q1 and Q2.
We'll use this model for our logging system. Instead of fanout we'll
send messages to a direct exchange. We will supply the log severity as
a routing key. That way the receiving script will be able to select
the severity it wants to receive. Let's focus on emitting logs
first.
Like always we need to create an exchange first:
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
And we're ready to send a message:
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
To simplify things we will assume that 'severity' can be one of
info, warning, or error.
Receiving messages will work just like in the previous tutorial, with one exception - we're going to create a new binding for each severity we're interested in.
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
emit_log_direct.py (source)
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1]iflen(sys.argv)>1else'info'
message =' '.join(sys.argv[2:])or'Hello World!'
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
print(f" [x] Sent {severity}:{message}")
connection.close()
receive_logs_direct.py (source)
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
ifnot severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n"% sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
defcallback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
If you want to save only 'warning' and 'error' (and not 'info') log messages to a file, just open a console and type:
# -u is used to avoid buffering of the messages printed to the standard output
python -u receive_logs_direct.py warning error > logs_from_rabbit.log
If you'd like to see all the log messages on your screen, open a new terminal and do:
python receive_logs_direct.py info warning error
# => [*] Waiting for logs. To exit press CTRL+C
And, for example, to emit an error log message just type:
python emit_log_direct.py error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
Move on to tutorial 5 to find out how to listen for messages based on a pattern.