RabbitMQ’s AMQP 0-9-1 protocol is the lingua franca of message queuing, and amqp091-go is your direct line to it from Go. This library is essentially a Go-native implementation of the AMQP 0-9-1 specification, meaning it speaks the protocol directly without relying on external C libraries or intermediaries. This gives you fine-grained control and a deep understanding of how your Go application interacts with RabbitMQ.

Let’s see how a simple publisher and consumer work.

Publisher Example:

package main

import (
	"context"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqps://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	body := "Hello, world!"
	err = ch.PublishWithContext(ctx,
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s\n", body)
}

Consumer Example:

package main

import (
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", err, msg)
	}
}

func main() {
	conn, err := amqp.Dial("amqps://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer tag
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	var forever chan struct{}

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

In these examples, we establish a connection (amqp.Dial), open a channel (conn.Channel), declare a queue (ch.QueueDeclare), and then either publish a message (ch.PublishWithContext) or start consuming messages (ch.Consume). The amqp091-go library handles the underlying TCP connection and the AMQP protocol framing, presenting a clean Go API.

The core mental model revolves around these components:

  • Connection: A single TCP connection to a RabbitMQ node. All subsequent operations happen over a connection.
  • Channel: A logical connection within a connection. Channels are multiplexed over a single TCP connection. Most AMQP operations (like declaring queues, publishing, consuming) are performed on a channel. They’re lightweight and intended to be created and destroyed frequently.
  • Exchange: The "message router." Producers send messages to exchanges. Exchanges then route messages to queues based on rules (bindings). Common types include direct, topic, fanout, and headers.
  • Queue: The "message store." Messages are delivered to queues, and consumers read from queues. Queues are bound to exchanges.
  • Binding: The "rule" that tells an exchange how to route messages to a specific queue. It typically involves a routing key.

When you ch.PublishWithContext, you’re sending a message to an exchange. The routing key you provide is used by the exchange to decide which queue(s) to send it to. If you use an empty string for the exchange name, the message is sent to the default exchange, which is a direct exchange named "". This default exchange is pre-declared and is bound to every queue by default. The routing key in this case must be the exact name of the queue you want to send the message to.

The ch.Consume function returns a Go channel (<-chan amqp.Delivery). This channel will be populated with messages as they arrive in the queue. The auto-ack parameter in Consume determines whether RabbitMQ automatically acknowledges a message once it’s delivered to the consumer. Setting it to true is convenient for simple cases but means you lose the message if your consumer crashes before processing it. For reliable processing, you’d set auto-ack to false and manually acknowledge messages using d.Ack(false) after successful processing.

A subtle but powerful aspect of amqp091-go is how it handles errors and reconnections. The amqp.Dial function returns a *amqp.Connection. This connection object has a NotifyClose channel (conn.NotifyClose(make(chan *amqp.Error))) that you can listen to. If the underlying TCP connection breaks or the server closes it, an error will be sent on this channel. A robust application will listen on this channel and attempt to re-establish the connection and its channels and consumers.

When you’re dealing with message acknowledgments and retries, remember that d.Ack(false) acknowledges the delivery tag, but d.Ack(true) acknowledges all previously unacknowledged messages up to and including this delivery tag. This can be very efficient if you’re processing batches of messages.

Want structured learning?

Take the full Amqp course →