MassTransit Advanced
Master complex messaging patterns with MassTransit.
By EMEPublished: February 20, 2025
messagingmasstransitrabbitmqdistributedpatterns
A Simple Analogy
MassTransit is like a post office for your application. Messages are packages, RabbitMQ is the postal system, and MassTransit handles routing, delivery, and retries automatically.
Why MassTransit?
- Abstraction: Swap transports (RabbitMQ, Azure Service Bus, etc.)
- Sagas: Long-running processes with state
- Retry logic: Automatic resilience
- Scheduling: Delayed message delivery
- Monitoring: Built-in observability
Basic Setup
// Program.cs
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<OrderCreatedConsumer>();
x.AddConsumer<OrderShippedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost");
cfg.ConfigureEndpoints(context);
});
});
// Consumer
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
private readonly IEmailService _emailService;
public async Task Consume(ConsumeContext<OrderCreated> context)
{
var message = context.Message;
await _emailService.SendOrderConfirmation(message.CustomerId, message.OrderId);
}
}
// Publish event
public class OrderService
{
private readonly IPublishEndpoint _publishEndpoint;
public async Task CreateOrderAsync(Order order)
{
// Save order
await SaveAsync(order);
// Publish event
await _publishEndpoint.Publish(new OrderCreated
{
OrderId = order.Id,
CustomerId = order.CustomerId,
Total = order.Total
});
}
}
Sagas for Long-Running Processes
// Saga state
public class OrderFulfillmentState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string State { get; set; }
public string OrderId { get; set; }
public string CustomerId { get; set; }
public DateTime CreatedAt { get; set; }
}
// Saga state machine
public class OrderFulfillmentStateMachine : StateMachine<OrderFulfillmentState>
{
public State Submitted { get; private set; }
public State PaymentProcessing { get; private set; }
public State Shipped { get; private set; }
public State Completed { get; private set; }
public Event<OrderSubmitted> Submit { get; private set; }
public Event<PaymentApproved> PaymentApproved { get; private set; }
public Event<ShipmentReady> ShipmentReady { get; private set; }
public OrderFulfillmentStateMachine()
{
InstanceState(x => x.State);
Event(() => Submit, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => PaymentApproved);
Event(() => ShipmentReady);
Initially(
When(Submit)
.Then(context =>
{
context.Instance.OrderId = context.Data.OrderId;
context.Instance.CustomerId = context.Data.CustomerId;
context.Instance.CreatedAt = DateTime.UtcNow;
})
.TransitionTo(PaymentProcessing)
.Publish(context => new ProcessPayment { OrderId = context.Data.OrderId })
);
During(PaymentProcessing,
When(PaymentApproved)
.TransitionTo(Shipped)
.Publish(context => new ShipOrder { OrderId = context.Instance.OrderId })
);
During(Shipped,
When(ShipmentReady)
.TransitionTo(Completed)
.Publish(context => new SendDeliveryNotification { OrderId = context.Instance.OrderId })
);
}
}
Message Configuration
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<OrderCreatedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq-host");
cfg.ReceiveEndpoint("order-service", e =>
{
// Automatic retry
e.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
// Circuit breaker
e.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(1);
cb.FailureThreshold = 5;
cb.ResetInterval = TimeSpan.FromSeconds(30);
});
// Prefetch limit
e.PrefetchCount = 16;
// Configure consumer
e.Consumer<OrderCreatedConsumer>(context);
});
});
});
Request/Reply Pattern
// Request
var response = await _requestClient.GetResponse<OrderResponse>(
new GetOrder { OrderId = orderId });
Console.WriteLine($"Order total: {response.Message.Total}");
// Handler
public class GetOrderConsumer : IConsumer<GetOrder>
{
private readonly IOrderRepository _repository;
public async Task Consume(ConsumeContext<GetOrder> context)
{
var order = await _repository.GetAsync(context.Message.OrderId);
await context.RespondAsync(new OrderResponse
{
OrderId = order.Id,
Total = order.Total
});
}
}
Scheduled Messages
// Send message after delay
var scheduledTime = DateTime.UtcNow.AddMinutes(30);
await _scheduledBus.SchedulePublish<OrderReminder>(
scheduledTime,
new OrderReminder { OrderId = orderId });
// Or schedule a command
await _scheduledBus.ScheduleSend<ProcessOrder>(
scheduledTime,
new ProcessOrder { OrderId = orderId });
Best Practices
- Handle failures: Implement retry and dead-letter queues
- Correlate messages: Track related messages
- Design idempotently: Handle duplicate messages
- Monitor: Track message flow and errors
- Version messages: Support multiple versions
Related Concepts
- Distributed transactions
- Event sourcing
- CQRS pattern
- Service bus alternatives
Summary
MassTransit provides robust messaging with sagas, retries, and scheduling. Build resilient, event-driven systems that handle failures gracefully.