Message Queue Patterns
Design reliable systems with message queues and async processing.
By EMEPublished: February 20, 2025
message queuesasyncrabbitmqpatternsreliability
A Simple Analogy
Message queues are like a postal system. You drop your message in a queue, and it's reliably delivered to the recipient, even if they're not home.
Why Message Queues?
- Decoupling: Systems don't depend directly
- Scalability: Process at own pace
- Reliability: Messages persist
- Async processing: Don't wait for slow operations
- Load balancing: Distribute work
Publisher-Subscriber
// Publisher
public class OrderService
{
private readonly IMessagePublisher _publisher;
public async Task<Order> CreateOrderAsync(CreateOrderRequest request)
{
var order = new Order { /* ... */ };
await _context.Orders.AddAsync(order);
await _context.SaveChangesAsync();
// Publish event
await _publisher.PublishAsync(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
Total = order.Total
});
return order;
}
}
// Subscriber
public class EmailNotificationHandler : IMessageHandler<OrderCreatedEvent>
{
private readonly IEmailService _emailService;
public async Task HandleAsync(OrderCreatedEvent @event)
{
var customer = await _context.Customers.FindAsync(@event.CustomerId);
await _emailService.SendAsync(new EmailMessage
{
To = customer.Email,
Subject = "Order Confirmation",
Body = $"Order #{@event.OrderId} for ${@event.Total}"
});
}
}
Work Queue Pattern
// Producer
public class ImageProcessingService
{
private readonly IQueuePublisher _queue;
public async Task QueueImageProcessingAsync(string imageId)
{
await _queue.EnqueueAsync(new ImageProcessingJob
{
ImageId = imageId,
CreatedAt = DateTime.UtcNow
}, "image-processing-queue");
}
}
// Consumer
public class ImageProcessingWorker : BackgroundService
{
private readonly IQueueConsumer _queue;
private readonly IImageProcessor _processor;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _queue.ConsumeAsync<ImageProcessingJob>(
"image-processing-queue",
async job =>
{
try
{
await _processor.ProcessAsync(job.ImageId);
}
catch (Exception ex)
{
// Move to dead letter queue
await _queue.EnqueueAsync(job, "image-processing-dlq");
}
},
stoppingToken);
}
}
Request-Reply Pattern
// Request
public class PricingService
{
private readonly IMessageClient _client;
public async Task<decimal> GetPriceAsync(string productId)
{
var response = await _client.RequestAsync<GetPriceRequest, GetPriceResponse>(
new GetPriceRequest { ProductId = productId },
timeout: TimeSpan.FromSeconds(5));
return response.Price;
}
}
// Reply
public class ProductService : IMessageHandler<GetPriceRequest>
{
public async Task<GetPriceResponse> HandleAsync(GetPriceRequest request)
{
var product = await _context.Products.FindAsync(request.ProductId);
return new GetPriceResponse { Price = product.Price };
}
}
Dead Letter Queue
public class QueueConsumer
{
private readonly IQueuePublisher _queue;
private const int MaxRetries = 3;
public async Task ConsumeWithRetryAsync<T>(string queueName, Func<T, Task> handler)
{
var message = await _queue.DequeueAsync<T>(queueName);
var retryCount = message.Headers.Get("retry-count", 0);
try
{
await handler(message.Body);
}
catch (Exception ex)
{
if (retryCount < MaxRetries)
{
// Retry with exponential backoff
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
message.Headers.Set("retry-count", retryCount + 1);
await Task.Delay(delay);
await _queue.EnqueueAsync(message, queueName);
}
else
{
// Move to dead letter queue
await _queue.EnqueueAsync(message, $"{queueName}-dlq");
}
}
}
}
Best Practices
- Idempotent handlers: Safe to process twice
- Error handling: Implement retry and DLQ
- Monitoring: Track queue depth
- Ordering: Maintain message order when needed
- TTL: Set message expiration
Related Concepts
- Event sourcing
- CQRS pattern
- Saga pattern
- Distributed transactions
Summary
Message queues enable scalable, decoupled systems. Implement reliable patterns with proper error handling, retries, and dead letter queues.