RabbitMQ Message Patterns
Implement reliable messaging with RabbitMQ patterns.
By EMEPublished: February 20, 2025
rabbitmqmessagingpatternsreliabilityevent-driven
A Simple Analogy
RabbitMQ is like a telephone switchboard for your services. Messages (calls) are routed correctly, queued if needed, and delivered reliably.
Why RabbitMQ?
- Reliability: Messages persist until consumed
- Routing: Flexible message routing
- Scaling: Handle thousands of messages/sec
- Decoupling: Services don't need to know each other
- Patterns: Publish-subscribe, work queues, RPC
Basic Setup
// Connection
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// Declare queue
channel.QueueDeclare(queue: "orders", durable: true, exclusive: false);
// Publish message
var message = "Order123";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "orders", body: body);
Console.WriteLine(" [x] Sent {0}", message);
Publisher/Subscriber
// Publisher
channel.ExchangeDeclare(exchange: "orders-exchange", type: "fanout");
var message = "Order created: 123";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "orders-exchange", routingKey: "", body: body);
// Subscriber 1 (Email service)
channel.ExchangeDeclare(exchange: "orders-exchange", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "orders-exchange", routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine(" [x] {0}", message);
// Send email
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
// Subscriber 2 (Notification service)
// Receives same message independently
Routing Pattern
// Publish with routing key
channel.ExchangeDeclare(exchange: "orders", type: "direct");
channel.BasicPublish(
exchange: "orders",
routingKey: "order.created", // Routing key
body: body);
// Subscriber interested only in created orders
channel.QueueBind(queue: queueName, exchange: "orders", routingKey: "order.created");
// Subscriber interested only in cancelled orders
channel.QueueBind(queue: queueName2, exchange: "orders", routingKey: "order.cancelled");
// Topic pattern: can use wildcards
channel.ExchangeDeclare(exchange: "orders", type: "topic");
channel.BasicPublish(exchange: "orders", routingKey: "order.us.created", body: body);
channel.BasicPublish(exchange: "orders", routingKey: "order.eu.cancelled", body: body);
// Subscribe to pattern
channel.QueueBind(queue: queueName, exchange: "orders", routingKey: "order.us.*");
channel.QueueBind(queue: queueName2, exchange: "orders", routingKey: "order.*");
Work Queue with Multiple Consumers
// Publisher
channel.QueueDeclare(queue: "tasks", durable: true);
channel.BasicQos(0, 1, false); // Fair dispatch
var message = JsonSerializer.Serialize(new Task { Id = 123, Work = "process" });
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "tasks", body: body);
// Consumer 1 (worker)
channel.BasicQos(0, 1, false); // Process one at a time
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var task = JsonSerializer.Deserialize<Task>(message);
// Process task
ProcessTask(task);
// Acknowledge when done
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: "tasks", autoAck: false, consumer: consumer);
// Consumer 2 (another worker)
// Automatically handles different task
Dead Letter Queue
// Main queue
channel.QueueDeclare(
queue: "main-queue",
durable: true,
arguments: new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "dlx-exchange" },
{ "x-dead-letter-routing-key", "dead-letter-queue" },
{ "x-message-ttl", 60000 } // 60s before dead letter
});
// Dead letter queue
channel.QueueDeclare(queue: "dead-letter-queue", durable: true);
channel.QueueBind(queue: "dead-letter-queue", exchange: "dlx-exchange", routingKey: "dead-letter-queue");
// Messages that fail/expire go to DLQ
RPC Pattern
// Client
var replyQueueName = channel.QueueDeclare().QueueName;
var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;
var message = "5";
channel.BasicPublish(exchange: "", routingKey: "rpc_queue", body: body, basicProperties: props);
// Receive response
var consumer = new EventingBasicConsumer(channel);
var response = null;
consumer.Received += (model, ea) =>
{
if (ea.BasicProperties.CorrelationId == corrId)
{
response = Encoding.UTF8.GetString(ea.Body.ToArray());
}
};
channel.BasicConsume(queue: replyQueueName, autoAck: true, consumer: consumer);
// Server
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var n = int.Parse(message);
var response = Fibonacci(n).ToString();
var props = channel.CreateBasicProperties();
props.CorrelationId = ea.BasicProperties.CorrelationId;
channel.BasicPublish(
exchange: "",
routingKey: ea.BasicProperties.ReplyTo,
basicProperties: props,
body: Encoding.UTF8.GetBytes(response));
};
channel.BasicConsume(queue: "rpc_queue", autoAck: true, consumer: consumer);
Best Practices
- Make messages idempotent: Handle duplicates
- Acknowledge properly: Only after processing
- Use dead letter queues: Catch failed messages
- Monitor queue depth: Track backlog
- Set message TTL: Clean up old messages
Related Concepts
- Apache Kafka for streaming
- Azure Service Bus
- AWS SQS/SNS
- Event-driven architectures
Summary
RabbitMQ provides reliable messaging with flexible routing patterns. Use publish-subscribe, work queues, and RPC patterns to decouple services and build event-driven systems.