Isaac.

RabbitMQ Message Patterns

Implement reliable messaging with RabbitMQ patterns.

By EMEPublished: February 20, 2025
rabbitmqmessagingpatternsreliabilityevent-driven

A Simple Analogy

RabbitMQ is like a telephone switchboard for your services. Messages (calls) are routed correctly, queued if needed, and delivered reliably.


Why RabbitMQ?

  • Reliability: Messages persist until consumed
  • Routing: Flexible message routing
  • Scaling: Handle thousands of messages/sec
  • Decoupling: Services don't need to know each other
  • Patterns: Publish-subscribe, work queues, RPC

Basic Setup

// Connection
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// Declare queue
channel.QueueDeclare(queue: "orders", durable: true, exclusive: false);

// Publish message
var message = "Order123";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "orders", body: body);

Console.WriteLine(" [x] Sent {0}", message);

Publisher/Subscriber

// Publisher
channel.ExchangeDeclare(exchange: "orders-exchange", type: "fanout");

var message = "Order created: 123";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "orders-exchange", routingKey: "", body: body);

// Subscriber 1 (Email service)
channel.ExchangeDeclare(exchange: "orders-exchange", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "orders-exchange", routingKey: "");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    Console.WriteLine(" [x] {0}", message);
    // Send email
    channel.BasicAck(ea.DeliveryTag, false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

// Subscriber 2 (Notification service)
// Receives same message independently

Routing Pattern

// Publish with routing key
channel.ExchangeDeclare(exchange: "orders", type: "direct");

channel.BasicPublish(
    exchange: "orders",
    routingKey: "order.created",  // Routing key
    body: body);

// Subscriber interested only in created orders
channel.QueueBind(queue: queueName, exchange: "orders", routingKey: "order.created");

// Subscriber interested only in cancelled orders
channel.QueueBind(queue: queueName2, exchange: "orders", routingKey: "order.cancelled");

// Topic pattern: can use wildcards
channel.ExchangeDeclare(exchange: "orders", type: "topic");
channel.BasicPublish(exchange: "orders", routingKey: "order.us.created", body: body);
channel.BasicPublish(exchange: "orders", routingKey: "order.eu.cancelled", body: body);

// Subscribe to pattern
channel.QueueBind(queue: queueName, exchange: "orders", routingKey: "order.us.*");
channel.QueueBind(queue: queueName2, exchange: "orders", routingKey: "order.*");

Work Queue with Multiple Consumers

// Publisher
channel.QueueDeclare(queue: "tasks", durable: true);
channel.BasicQos(0, 1, false);  // Fair dispatch

var message = JsonSerializer.Serialize(new Task { Id = 123, Work = "process" });
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "tasks", body: body);

// Consumer 1 (worker)
channel.BasicQos(0, 1, false);  // Process one at a time
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    var task = JsonSerializer.Deserialize<Task>(message);
    
    // Process task
    ProcessTask(task);
    
    // Acknowledge when done
    channel.BasicAck(ea.DeliveryTag, false);
};

channel.BasicConsume(queue: "tasks", autoAck: false, consumer: consumer);

// Consumer 2 (another worker)
// Automatically handles different task

Dead Letter Queue

// Main queue
channel.QueueDeclare(
    queue: "main-queue",
    durable: true,
    arguments: new Dictionary<string, object>
    {
        { "x-dead-letter-exchange", "dlx-exchange" },
        { "x-dead-letter-routing-key", "dead-letter-queue" },
        { "x-message-ttl", 60000 }  // 60s before dead letter
    });

// Dead letter queue
channel.QueueDeclare(queue: "dead-letter-queue", durable: true);
channel.QueueBind(queue: "dead-letter-queue", exchange: "dlx-exchange", routingKey: "dead-letter-queue");

// Messages that fail/expire go to DLQ

RPC Pattern

// Client
var replyQueueName = channel.QueueDeclare().QueueName;
var corrId = Guid.NewGuid().ToString();

var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;

var message = "5";
channel.BasicPublish(exchange: "", routingKey: "rpc_queue", body: body, basicProperties: props);

// Receive response
var consumer = new EventingBasicConsumer(channel);
var response = null;

consumer.Received += (model, ea) =>
{
    if (ea.BasicProperties.CorrelationId == corrId)
    {
        response = Encoding.UTF8.GetString(ea.Body.ToArray());
    }
};

channel.BasicConsume(queue: replyQueueName, autoAck: true, consumer: consumer);

// Server
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    var n = int.Parse(message);
    var response = Fibonacci(n).ToString();
    
    var props = channel.CreateBasicProperties();
    props.CorrelationId = ea.BasicProperties.CorrelationId;
    
    channel.BasicPublish(
        exchange: "",
        routingKey: ea.BasicProperties.ReplyTo,
        basicProperties: props,
        body: Encoding.UTF8.GetBytes(response));
};

channel.BasicConsume(queue: "rpc_queue", autoAck: true, consumer: consumer);

Best Practices

  1. Make messages idempotent: Handle duplicates
  2. Acknowledge properly: Only after processing
  3. Use dead letter queues: Catch failed messages
  4. Monitor queue depth: Track backlog
  5. Set message TTL: Clean up old messages

Related Concepts

  • Apache Kafka for streaming
  • Azure Service Bus
  • AWS SQS/SNS
  • Event-driven architectures

Summary

RabbitMQ provides reliable messaging with flexible routing patterns. Use publish-subscribe, work queues, and RPC patterns to decouple services and build event-driven systems.