AMQP’s exchange types aren’t just routing mechanisms; they’re fundamentally about how producers and consumers declare their intent to the broker, and how the broker optimizes message delivery based on those declared intents.

Let’s see this in action. Imagine a simple scenario: a system publishing "order created" events.

First, we need a broker. Let’s use RabbitMQ.

# Install RabbitMQ (example for Ubuntu)
sudo apt update
sudo apt install rabbitmq-server
sudo systemctl start rabbitmq-server

Now, let’s connect a producer and a consumer using pika, the Python AMQP client.

Direct Exchange:

This is the most straightforward. Messages are routed to queues whose binding keys exactly match the message’s routing key.

Producer side (Python):

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare a direct exchange named 'direct_logs'
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = 'info' # Or 'warning', 'error'
message = f'This is a {severity} message'

# Publish the message with the routing key matching the severity
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(f" [x] Sent '{message}' with routing key '{severity}'")

connection.close()

Consumer side (Python):

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# Declare a queue (let's name it based on severity for clarity)
result = channel.queue_declare(queue='log_info')
queue_name = result.method.queue

# Bind the queue to the exchange with a specific routing key
channel.queue_bind(exchange='direct_logs',
                   queue=queue_name,
                   routing_key='info')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()} (routing key: {method.routing_key})")

channel.basic_consume(queue=queue_name,
                      on_message_callback=callback,
                      auto_ack=True)

channel.start_consuming()

If the producer sends a message with routing_key='info', only queues bound with routing_key='info' will receive it. If the producer sends routing_key='warning', a queue bound to 'info' won’t get it.

Fanout Exchange:

This exchange ignores routing keys. All messages published to a fanout exchange are routed to all queues bound to it. It’s like a broadcast.

Producer:

# ... (same connection setup as above)
channel.exchange_declare(exchange='fanout_logs', exchange_type='fanout')
message = 'This is a broadcast message!'
channel.basic_publish(exchange='fanout_logs',
                      routing_key='', # Routing key is ignored for fanout
                      body=message)
print(f" [x] Sent '{message}'")
# ... (close connection)

Consumer 1:

# ... (same connection setup)
channel.exchange_declare(exchange='fanout_logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True) # Anonymous, temporary queue
queue_name = result.method.queue
channel.queue_bind(exchange='fanout_logs', queue=queue_name) # No routing key needed

print(' [*] Waiting for broadcast messages. To exit press CTRL+C')
# ... (callback and basic_consume as above)

Consumer 2:

# ... (repeat Consumer 1 setup, but with a different anonymous queue)

Now, if you run two instances of Consumer 1 (each creating its own anonymous queue bound to fanout_logs), and then publish a message from the producer, both consumers will receive the exact same message.

Topic Exchange:

This is the most flexible. It allows routing based on patterns. Routing keys are treated as a series of words separated by dots (e.g., news.uk.london). Bindings can use wildcards: * (matches exactly one word) and # (matches zero or more words).

Producer:

# ... (same connection setup)
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = 'kern.critical.error'
message = f'System error at {routing_key}'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(f" [x] Sent '{message}' with routing key '{routing_key}'")
# ... (close connection)

Consumer (binding for *.critical.*):

# ... (same connection setup)
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='')
queue_name = result.method.queue

# Bind to all messages where the second word is 'critical'
binding_key = '*.critical.*'
channel.queue_bind(exchange='topic_logs',
                   queue=queue_name,
                   routing_key=binding_key)

print(f' [*] Waiting for critical messages. To exit press CTRL+C. Bound with key: {binding_key}')
# ... (callback and basic_consume as above)

Consumer (binding for news.#):

# ... (same connection setup)
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='')
queue_name = result.method.queue

# Bind to all messages starting with 'news'
binding_key = 'news.#'
channel.queue_bind(exchange='topic_logs',
                   queue=queue_name,
                   routing_key=binding_key)

print(f' [*] Waiting for news messages. To exit press CTRL+C. Bound with key: {binding_key}')
# ... (callback and basic_consume as above)

If a message is published with kern.critical.error, the first consumer (bound to *.critical.*) will receive it. If a message is published with news.uk.london, the second consumer (bound to news.#) will receive it. A message with news.critical.error would be received by both consumers.

Headers Exchange:

This exchange routes messages based on header values rather than routing keys. Producers send messages with a headers dictionary, and consumers bind queues to exchanges using a x-match argument (either all or any) and a dictionary of header key-value pairs.

Producer:

# ... (same connection setup)
channel.exchange_declare(exchange='headers_logs', exchange_type='headers')

properties = pika.BasicProperties(
    delivery_mode=2, # Make message persistent
    headers={
        'color': 'red',
        'size': 'big'
    }
)
message = 'This is a red, big message.'
channel.basic_publish(exchange='headers_logs',
                      routing_key='', # Routing key is ignored for headers
                      properties=properties,
                      body=message)

print(f" [x] Sent '{message}' with headers {{'color': 'red', 'size': 'big'}}")

# ... (close connection)

Consumer (binding with x-match: all):

# ... (same connection setup)
channel.exchange_declare(exchange='headers_logs', exchange_type='headers')
result = channel.queue_declare(queue='')
queue_name = result.method.queue

# Bind to messages where ALL specified headers match
arguments = {
    'x-match': 'all',
    'color': 'red',
    'size': 'big'
}
channel.queue_bind(exchange='headers_logs',
                   queue=queue_name,
                   arguments=arguments)

print(f' [*] Waiting for red, big messages. To exit press CTRL+C. Bound with headers {arguments}')
# ... (callback and basic_consume as above)

Consumer (binding with x-match: any):

# ... (same connection setup)
channel.exchange_declare(exchange='headers_logs', exchange_type='headers')
result = channel.queue_declare(queue='')
queue_name = result.method.queue

# Bind to messages where ANY specified header matches
arguments = {
    'x-match': 'any',
    'color': 'blue',
    'priority': 'high'
}
channel.queue_bind(exchange='headers_logs',
                   queue=queue_name,
                   arguments=arguments)

print(f' [*] Waiting for blue OR high priority messages. To exit press CTRL+C. Bound with headers {arguments}')
# ... (callback and basic_consume as above)

If a message is published with headers={'color': 'red', 'size': 'big'}, the first consumer (bound with x-match: all) will receive it. If a message is published with headers={'color': 'blue', 'size': 'small'}, the second consumer (bound to x-match: any) will receive it because the color header matches.

The most surprising aspect of headers exchange is that it’s often overlooked in favor of topic exchanges, even when it offers a more direct and potentially less complex way to route based on arbitrary metadata. While topic exchanges provide powerful pattern matching on a string, headers exchange allows matching on any key-value pair, which can be more expressive for complex, non-hierarchical metadata.

The next logical step is to explore how to make these messages resilient by looking into message acknowledgments and publisher confirms.

Want structured learning?

Take the full Amqp course →