Isaac.

Message Queue Patterns

Design reliable systems with message queues and async processing.

By EMEPublished: February 20, 2025
message queuesasyncrabbitmqpatternsreliability

A Simple Analogy

Message queues are like a postal system. You drop your message in a queue, and it's reliably delivered to the recipient, even if they're not home.


Why Message Queues?

  • Decoupling: Systems don't depend directly
  • Scalability: Process at own pace
  • Reliability: Messages persist
  • Async processing: Don't wait for slow operations
  • Load balancing: Distribute work

Publisher-Subscriber

// Publisher
public class OrderService
{
    private readonly IMessagePublisher _publisher;
    
    public async Task<Order> CreateOrderAsync(CreateOrderRequest request)
    {
        var order = new Order { /* ... */ };
        await _context.Orders.AddAsync(order);
        await _context.SaveChangesAsync();
        
        // Publish event
        await _publisher.PublishAsync(new OrderCreatedEvent
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            Total = order.Total
        });
        
        return order;
    }
}

// Subscriber
public class EmailNotificationHandler : IMessageHandler<OrderCreatedEvent>
{
    private readonly IEmailService _emailService;
    
    public async Task HandleAsync(OrderCreatedEvent @event)
    {
        var customer = await _context.Customers.FindAsync(@event.CustomerId);
        await _emailService.SendAsync(new EmailMessage
        {
            To = customer.Email,
            Subject = "Order Confirmation",
            Body = $"Order #{@event.OrderId} for ${@event.Total}"
        });
    }
}

Work Queue Pattern

// Producer
public class ImageProcessingService
{
    private readonly IQueuePublisher _queue;
    
    public async Task QueueImageProcessingAsync(string imageId)
    {
        await _queue.EnqueueAsync(new ImageProcessingJob
        {
            ImageId = imageId,
            CreatedAt = DateTime.UtcNow
        }, "image-processing-queue");
    }
}

// Consumer
public class ImageProcessingWorker : BackgroundService
{
    private readonly IQueueConsumer _queue;
    private readonly IImageProcessor _processor;
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await _queue.ConsumeAsync<ImageProcessingJob>(
            "image-processing-queue",
            async job =>
            {
                try
                {
                    await _processor.ProcessAsync(job.ImageId);
                }
                catch (Exception ex)
                {
                    // Move to dead letter queue
                    await _queue.EnqueueAsync(job, "image-processing-dlq");
                }
            },
            stoppingToken);
    }
}

Request-Reply Pattern

// Request
public class PricingService
{
    private readonly IMessageClient _client;
    
    public async Task<decimal> GetPriceAsync(string productId)
    {
        var response = await _client.RequestAsync<GetPriceRequest, GetPriceResponse>(
            new GetPriceRequest { ProductId = productId },
            timeout: TimeSpan.FromSeconds(5));
        
        return response.Price;
    }
}

// Reply
public class ProductService : IMessageHandler<GetPriceRequest>
{
    public async Task<GetPriceResponse> HandleAsync(GetPriceRequest request)
    {
        var product = await _context.Products.FindAsync(request.ProductId);
        return new GetPriceResponse { Price = product.Price };
    }
}

Dead Letter Queue

public class QueueConsumer
{
    private readonly IQueuePublisher _queue;
    private const int MaxRetries = 3;
    
    public async Task ConsumeWithRetryAsync<T>(string queueName, Func<T, Task> handler)
    {
        var message = await _queue.DequeueAsync<T>(queueName);
        var retryCount = message.Headers.Get("retry-count", 0);
        
        try
        {
            await handler(message.Body);
        }
        catch (Exception ex)
        {
            if (retryCount < MaxRetries)
            {
                // Retry with exponential backoff
                var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
                message.Headers.Set("retry-count", retryCount + 1);
                
                await Task.Delay(delay);
                await _queue.EnqueueAsync(message, queueName);
            }
            else
            {
                // Move to dead letter queue
                await _queue.EnqueueAsync(message, $"{queueName}-dlq");
            }
        }
    }
}

Best Practices

  1. Idempotent handlers: Safe to process twice
  2. Error handling: Implement retry and DLQ
  3. Monitoring: Track queue depth
  4. Ordering: Maintain message order when needed
  5. TTL: Set message expiration

Related Concepts

  • Event sourcing
  • CQRS pattern
  • Saga pattern
  • Distributed transactions

Summary

Message queues enable scalable, decoupled systems. Implement reliable patterns with proper error handling, retries, and dead letter queues.