Isaac.

MassTransit Advanced

Master complex messaging patterns with MassTransit.

By EMEPublished: February 20, 2025
messagingmasstransitrabbitmqdistributedpatterns

A Simple Analogy

MassTransit is like a post office for your application. Messages are packages, RabbitMQ is the postal system, and MassTransit handles routing, delivery, and retries automatically.


Why MassTransit?

  • Abstraction: Swap transports (RabbitMQ, Azure Service Bus, etc.)
  • Sagas: Long-running processes with state
  • Retry logic: Automatic resilience
  • Scheduling: Delayed message delivery
  • Monitoring: Built-in observability

Basic Setup

// Program.cs
builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<OrderCreatedConsumer>();
    x.AddConsumer<OrderShippedConsumer>();
    
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost");
        cfg.ConfigureEndpoints(context);
    });
});

// Consumer
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
    private readonly IEmailService _emailService;
    
    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        var message = context.Message;
        await _emailService.SendOrderConfirmation(message.CustomerId, message.OrderId);
    }
}

// Publish event
public class OrderService
{
    private readonly IPublishEndpoint _publishEndpoint;
    
    public async Task CreateOrderAsync(Order order)
    {
        // Save order
        await SaveAsync(order);
        
        // Publish event
        await _publishEndpoint.Publish(new OrderCreated
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            Total = order.Total
        });
    }
}

Sagas for Long-Running Processes

// Saga state
public class OrderFulfillmentState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string State { get; set; }
    public string OrderId { get; set; }
    public string CustomerId { get; set; }
    public DateTime CreatedAt { get; set; }
}

// Saga state machine
public class OrderFulfillmentStateMachine : StateMachine<OrderFulfillmentState>
{
    public State Submitted { get; private set; }
    public State PaymentProcessing { get; private set; }
    public State Shipped { get; private set; }
    public State Completed { get; private set; }

    public Event<OrderSubmitted> Submit { get; private set; }
    public Event<PaymentApproved> PaymentApproved { get; private set; }
    public Event<ShipmentReady> ShipmentReady { get; private set; }

    public OrderFulfillmentStateMachine()
    {
        InstanceState(x => x.State);

        Event(() => Submit, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => PaymentApproved);
        Event(() => ShipmentReady);

        Initially(
            When(Submit)
                .Then(context =>
                {
                    context.Instance.OrderId = context.Data.OrderId;
                    context.Instance.CustomerId = context.Data.CustomerId;
                    context.Instance.CreatedAt = DateTime.UtcNow;
                })
                .TransitionTo(PaymentProcessing)
                .Publish(context => new ProcessPayment { OrderId = context.Data.OrderId })
        );

        During(PaymentProcessing,
            When(PaymentApproved)
                .TransitionTo(Shipped)
                .Publish(context => new ShipOrder { OrderId = context.Instance.OrderId })
        );

        During(Shipped,
            When(ShipmentReady)
                .TransitionTo(Completed)
                .Publish(context => new SendDeliveryNotification { OrderId = context.Instance.OrderId })
        );
    }
}

Message Configuration

builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<OrderCreatedConsumer>();
    
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("rabbitmq-host");
        
        cfg.ReceiveEndpoint("order-service", e =>
        {
            // Automatic retry
            e.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
            
            // Circuit breaker
            e.UseCircuitBreaker(cb =>
            {
                cb.TrackingPeriod = TimeSpan.FromMinutes(1);
                cb.FailureThreshold = 5;
                cb.ResetInterval = TimeSpan.FromSeconds(30);
            });
            
            // Prefetch limit
            e.PrefetchCount = 16;
            
            // Configure consumer
            e.Consumer<OrderCreatedConsumer>(context);
        });
    });
});

Request/Reply Pattern

// Request
var response = await _requestClient.GetResponse<OrderResponse>(
    new GetOrder { OrderId = orderId });

Console.WriteLine($"Order total: {response.Message.Total}");

// Handler
public class GetOrderConsumer : IConsumer<GetOrder>
{
    private readonly IOrderRepository _repository;
    
    public async Task Consume(ConsumeContext<GetOrder> context)
    {
        var order = await _repository.GetAsync(context.Message.OrderId);
        
        await context.RespondAsync(new OrderResponse
        {
            OrderId = order.Id,
            Total = order.Total
        });
    }
}

Scheduled Messages

// Send message after delay
var scheduledTime = DateTime.UtcNow.AddMinutes(30);

await _scheduledBus.SchedulePublish<OrderReminder>(
    scheduledTime,
    new OrderReminder { OrderId = orderId });

// Or schedule a command
await _scheduledBus.ScheduleSend<ProcessOrder>(
    scheduledTime,
    new ProcessOrder { OrderId = orderId });

Best Practices

  1. Handle failures: Implement retry and dead-letter queues
  2. Correlate messages: Track related messages
  3. Design idempotently: Handle duplicate messages
  4. Monitor: Track message flow and errors
  5. Version messages: Support multiple versions

Related Concepts

  • Distributed transactions
  • Event sourcing
  • CQRS pattern
  • Service bus alternatives

Summary

MassTransit provides robust messaging with sagas, retries, and scheduling. Build resilient, event-driven systems that handle failures gracefully.