Isaac.

Message Queues and RabbitMQ

Decouple services with message queues for reliable, asynchronous communication.

By EMEPublished: February 20, 2025
message queuesrabbitmqasyncpubsubdistributed systems

A Simple Analogy

Message queues are like a post office. Instead of calling someone directly (slow, requires them to be available), you drop a letter in the mailbox. The post office delivers it eventually. Sender and receiver never need to interact directly.


What Are Message Queues?

Message queues enable asynchronous communication between services. Services send messages to a queue instead of calling each other directly, decoupling dependencies and enabling resilience.


Why Use Message Queues?

  • Decoupling: Services don't depend on each other
  • Resilience: Services can go offline without breaking system
  • Scalability: Handle spikes with queue buffering
  • Async: Long operations don't block requests
  • Reliability: Messages persist until processed

Key Concepts

| Concept | Meaning | |---------|---------| | Producer | Service sending messages | | Consumer | Service processing messages | | Queue | Message storage/buffer | | Exchange | Routes messages to queues | | Message | Data being communicated |


RabbitMQ Basics

# Start RabbitMQ
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3.12-management

# Access management UI
# http://localhost:15672 (guest/guest)

.NET with RabbitMQ

// Install: RabbitMQ.Client

// Producer: Send message
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "orders", durable: true);

var message = new { OrderId = 123, Amount = 99.99 };
var json = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(json);

channel.BasicPublish(exchange: "", routingKey: "orders", body: body);
Console.WriteLine("Order queued");

// Consumer: Process message
channel.BasicQos(0, 1, false);  // Process one at a time
channel.BasicConsume(queue: "orders", autoAck: false, consumer: new EventingBasicConsumer(channel)
{
    Received = (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var json = Encoding.UTF8.GetString(body);
        var order = JsonSerializer.Deserialize<Order>(json);
        
        Console.WriteLine($"Processing order {order.OrderId}");
        channel.BasicAck(ea.DeliveryTag, false);
    }
});

Publish-Subscribe Pattern

// Publisher
channel.ExchangeDeclare(exchange: "orders", type: "fanout");
channel.BasicPublish(exchange: "orders", routingKey: "", body: body);

// Subscriber 1: Email service
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "orders", routingKey: "");

// Subscriber 2: Notification service
var queueName2 = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName2, exchange: "orders", routingKey: "");

// Both receive order created event

Practical Example

public class OrderEventHandler
{
    private readonly IConnection _connection;
    
    public async Task PublishOrderCreatedAsync(Order order)
    {
        using var channel = _connection.CreateModel();
        channel.ExchangeDeclare(exchange: "orders", type: "topic", durable: true);
        
        var message = new { order.Id, order.Status, order.CreatedAt };
        var json = JsonSerializer.Serialize(message);
        var body = Encoding.UTF8.GetBytes(json);
        
        channel.BasicPublish(
            exchange: "orders",
            routingKey: "order.created",
            body: body
        );
    }
}

public class EmailService : IHostedService
{
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var channel = _connection.CreateModel();
        channel.ExchangeDeclare(exchange: "orders", type: "topic");
        
        var queueName = channel.QueueDeclare().QueueName;
        channel.QueueBind(queue: queueName, exchange: "orders", routingKey: "order.*");
        
        var consumer = new AsyncEventingBasicConsumer(channel);
        consumer.Received += async (model, ea) =>
        {
            var body = Encoding.UTF8.GetString(ea.Body.ToArray());
            var message = JsonSerializer.Deserialize<dynamic>(body);
            
            await SendConfirmationEmailAsync(message);
            channel.BasicAck(ea.DeliveryTag, false);
        };
        
        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
        await Task.Delay(Timeout.Infinite, cancellationToken);
    }
}

Best Practices

  1. Idempotent processing: Handle duplicates safely
  2. Dead letter queues: Capture failed messages
  3. Acknowledgments: Ensure message processing
  4. Serialization: Use JSON for portability
  5. Error handling: Log failures and retry

Related Concepts to Explore

  • Kafka (high-throughput streaming)
  • Topic-based routing with RabbitMQ
  • Message retry and backoff strategies
  • Event sourcing with message queues
  • Saga pattern for distributed transactions

Summary

Message queues decouple services and enable resilient, scalable systems. Use them for asynchronous processing and service-to-service communication to build robust distributed applications.