AMQP’s exchange types aren’t just routing mechanisms; they’re fundamentally about how producers and consumers declare their intent to the broker, and how the broker optimizes message delivery based on those declared intents.
Let’s see this in action. Imagine a simple scenario: a system publishing "order created" events.
First, we need a broker. Let’s use RabbitMQ.
# Install RabbitMQ (example for Ubuntu)
sudo apt update
sudo apt install rabbitmq-server
sudo systemctl start rabbitmq-server
Now, let’s connect a producer and a consumer using pika, the Python AMQP client.
Direct Exchange:
This is the most straightforward. Messages are routed to queues whose binding keys exactly match the message’s routing key.
Producer side (Python):
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare a direct exchange named 'direct_logs'
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = 'info' # Or 'warning', 'error'
message = f'This is a {severity} message'
# Publish the message with the routing key matching the severity
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(f" [x] Sent '{message}' with routing key '{severity}'")
connection.close()
Consumer side (Python):
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# Declare a queue (let's name it based on severity for clarity)
result = channel.queue_declare(queue='log_info')
queue_name = result.method.queue
# Bind the queue to the exchange with a specific routing key
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key='info')
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()} (routing key: {method.routing_key})")
channel.basic_consume(queue=queue_name,
on_message_callback=callback,
auto_ack=True)
channel.start_consuming()
If the producer sends a message with routing_key='info', only queues bound with routing_key='info' will receive it. If the producer sends routing_key='warning', a queue bound to 'info' won’t get it.
Fanout Exchange:
This exchange ignores routing keys. All messages published to a fanout exchange are routed to all queues bound to it. It’s like a broadcast.
Producer:
# ... (same connection setup as above)
channel.exchange_declare(exchange='fanout_logs', exchange_type='fanout')
message = 'This is a broadcast message!'
channel.basic_publish(exchange='fanout_logs',
routing_key='', # Routing key is ignored for fanout
body=message)
print(f" [x] Sent '{message}'")
# ... (close connection)
Consumer 1:
# ... (same connection setup)
channel.exchange_declare(exchange='fanout_logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True) # Anonymous, temporary queue
queue_name = result.method.queue
channel.queue_bind(exchange='fanout_logs', queue=queue_name) # No routing key needed
print(' [*] Waiting for broadcast messages. To exit press CTRL+C')
# ... (callback and basic_consume as above)
Consumer 2:
# ... (repeat Consumer 1 setup, but with a different anonymous queue)
Now, if you run two instances of Consumer 1 (each creating its own anonymous queue bound to fanout_logs), and then publish a message from the producer, both consumers will receive the exact same message.
Topic Exchange:
This is the most flexible. It allows routing based on patterns. Routing keys are treated as a series of words separated by dots (e.g., news.uk.london). Bindings can use wildcards: * (matches exactly one word) and # (matches zero or more words).
Producer:
# ... (same connection setup)
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = 'kern.critical.error'
message = f'System error at {routing_key}'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(f" [x] Sent '{message}' with routing key '{routing_key}'")
# ... (close connection)
Consumer (binding for *.critical.*):
# ... (same connection setup)
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='')
queue_name = result.method.queue
# Bind to all messages where the second word is 'critical'
binding_key = '*.critical.*'
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(f' [*] Waiting for critical messages. To exit press CTRL+C. Bound with key: {binding_key}')
# ... (callback and basic_consume as above)
Consumer (binding for news.#):
# ... (same connection setup)
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='')
queue_name = result.method.queue
# Bind to all messages starting with 'news'
binding_key = 'news.#'
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(f' [*] Waiting for news messages. To exit press CTRL+C. Bound with key: {binding_key}')
# ... (callback and basic_consume as above)
If a message is published with kern.critical.error, the first consumer (bound to *.critical.*) will receive it. If a message is published with news.uk.london, the second consumer (bound to news.#) will receive it. A message with news.critical.error would be received by both consumers.
Headers Exchange:
This exchange routes messages based on header values rather than routing keys. Producers send messages with a headers dictionary, and consumers bind queues to exchanges using a x-match argument (either all or any) and a dictionary of header key-value pairs.
Producer:
# ... (same connection setup)
channel.exchange_declare(exchange='headers_logs', exchange_type='headers')
properties = pika.BasicProperties(
delivery_mode=2, # Make message persistent
headers={
'color': 'red',
'size': 'big'
}
)
message = 'This is a red, big message.'
channel.basic_publish(exchange='headers_logs',
routing_key='', # Routing key is ignored for headers
properties=properties,
body=message)
print(f" [x] Sent '{message}' with headers {{'color': 'red', 'size': 'big'}}")
# ... (close connection)
Consumer (binding with x-match: all):
# ... (same connection setup)
channel.exchange_declare(exchange='headers_logs', exchange_type='headers')
result = channel.queue_declare(queue='')
queue_name = result.method.queue
# Bind to messages where ALL specified headers match
arguments = {
'x-match': 'all',
'color': 'red',
'size': 'big'
}
channel.queue_bind(exchange='headers_logs',
queue=queue_name,
arguments=arguments)
print(f' [*] Waiting for red, big messages. To exit press CTRL+C. Bound with headers {arguments}')
# ... (callback and basic_consume as above)
Consumer (binding with x-match: any):
# ... (same connection setup)
channel.exchange_declare(exchange='headers_logs', exchange_type='headers')
result = channel.queue_declare(queue='')
queue_name = result.method.queue
# Bind to messages where ANY specified header matches
arguments = {
'x-match': 'any',
'color': 'blue',
'priority': 'high'
}
channel.queue_bind(exchange='headers_logs',
queue=queue_name,
arguments=arguments)
print(f' [*] Waiting for blue OR high priority messages. To exit press CTRL+C. Bound with headers {arguments}')
# ... (callback and basic_consume as above)
If a message is published with headers={'color': 'red', 'size': 'big'}, the first consumer (bound with x-match: all) will receive it. If a message is published with headers={'color': 'blue', 'size': 'small'}, the second consumer (bound to x-match: any) will receive it because the color header matches.
The most surprising aspect of headers exchange is that it’s often overlooked in favor of topic exchanges, even when it offers a more direct and potentially less complex way to route based on arbitrary metadata. While topic exchanges provide powerful pattern matching on a string, headers exchange allows matching on any key-value pair, which can be more expressive for complex, non-hierarchical metadata.
The next logical step is to explore how to make these messages resilient by looking into message acknowledgments and publisher confirms.