Messaging
Messaging allows you to publish and subscribe to messages flowing through your application using pub/sub patterns. Foundatio provides multiple message bus implementations through the IMessageBus interface.
The IMessageBus Interface
public interface IMessageBus : IMessagePublisher, IMessageSubscriber, IDisposable
{
}
public interface IMessagePublisher
{
Task PublishAsync(Type messageType, object message,
MessageOptions options = null,
CancellationToken cancellationToken = default);
}
public interface IMessageSubscriber
{
Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler,
CancellationToken cancellationToken = default) where T : class;
}Implementations
InMemoryMessageBus
An in-memory message bus for development and testing:
using Foundatio.Messaging;
var messageBus = new InMemoryMessageBus();
// Subscribe to messages
await messageBus.SubscribeAsync<OrderCreated>(async msg =>
{
Console.WriteLine($"Order created: {msg.OrderId}");
});
// Publish a message
await messageBus.PublishAsync(new OrderCreated { OrderId = 123 });RedisMessageBus
Distributed messaging using Redis pub/sub (separate package):
// dotnet add package Foundatio.Redis
using Foundatio.Redis.Messaging;
using StackExchange.Redis;
var redis = await ConnectionMultiplexer.ConnectAsync("localhost:6379");
var messageBus = new RedisMessageBus(o => o.Subscriber = redis.GetSubscriber());RabbitMQMessageBus
Messaging using RabbitMQ (separate package):
// dotnet add package Foundatio.RabbitMQ
using Foundatio.RabbitMQ.Messaging;
var messageBus = new RabbitMQMessageBus(o => {
o.ConnectionString = "amqp://guest:guest@localhost:5672";
});KafkaMessageBus
Messaging using Apache Kafka (separate package):
// dotnet add package Foundatio.Kafka
using Foundatio.Kafka.Messaging;
var messageBus = new KafkaMessageBus(o => {
o.BootstrapServers = "localhost:9092";
});AzureServiceBusMessageBus
Messaging using Azure Service Bus (separate package):
// dotnet add package Foundatio.AzureServiceBus
using Foundatio.AzureServiceBus.Messaging;
var messageBus = new AzureServiceBusMessageBus(o => {
o.ConnectionString = "...";
o.Topic = "events";
});Basic Usage
Publishing Messages
var messageBus = new InMemoryMessageBus();
// Simple publish
await messageBus.PublishAsync(new OrderCreated { OrderId = 123 });
// With options
await messageBus.PublishAsync(new OrderCreated { OrderId = 123 }, new MessageOptions
{
CorrelationId = "request-abc",
DeliveryDelay = TimeSpan.FromSeconds(30),
Properties = new Dictionary<string, string>
{
["source"] = "order-service"
}
});
// Delayed publish (extension method)
await messageBus.PublishAsync(
new OrderReminder { OrderId = 123 },
TimeSpan.FromHours(1)
);Subscribing to Messages
var messageBus = new InMemoryMessageBus();
// Simple subscription
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
Console.WriteLine($"Processing order: {order.OrderId}");
});
// With cancellation token
await messageBus.SubscribeAsync<OrderCreated>(
async (order, ct) =>
{
await ProcessOrderAsync(order, ct);
},
cancellationToken
);
// Synchronous handler
await messageBus.SubscribeAsync<OrderCreated>(order =>
{
Console.WriteLine($"Order: {order.OrderId}");
});Multiple Subscribers
Each subscriber receives every message:
var messageBus = new InMemoryMessageBus();
// Handler 1: Logging
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
_logger.LogInformation("Order {OrderId} created", order.OrderId);
});
// Handler 2: Notification
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
await _notificationService.SendAsync(order.CustomerId, "Order placed!");
});
// Handler 3: Analytics
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
await _analytics.TrackAsync("order_created", order.OrderId);
});
// All three handlers receive this message
await messageBus.PublishAsync(new OrderCreated { OrderId = 123 });Message Types
Define Your Messages
// Simple message
public record OrderCreated
{
public int OrderId { get; init; }
public DateTime CreatedAt { get; init; }
public string CustomerId { get; init; }
}
// Message with interface for grouping
public interface IOrderEvent { int OrderId { get; } }
public record OrderShipped : IOrderEvent
{
public int OrderId { get; init; }
public string TrackingNumber { get; init; }
}
public record OrderDelivered : IOrderEvent
{
public int OrderId { get; init; }
public DateTime DeliveredAt { get; init; }
}Subscribe to Interface
Subscribe to all messages implementing an interface:
// Receives OrderShipped, OrderDelivered, and any other IOrderEvent
await messageBus.SubscribeAsync<IOrderEvent>(async orderEvent =>
{
_logger.LogInformation("Order event: {Type} for {OrderId}",
orderEvent.GetType().Name, orderEvent.OrderId);
});IMessage Interface
Use the built-in IMessage interface for raw message access:
await messageBus.SubscribeAsync(async (IMessage message, CancellationToken ct) =>
{
Console.WriteLine($"Type: {message.Type}");
Console.WriteLine($"Correlation ID: {message.CorrelationId}");
// Deserialize the data
var order = message.GetBody<OrderCreated>();
});Common Patterns
Event-Driven Architecture
Decouple services with events:
// Order Service
public class OrderService
{
private readonly IMessageBus _messageBus;
public async Task CreateOrderAsync(CreateOrderRequest request)
{
var order = await _repository.CreateAsync(request);
// Publish event for other services
await _messageBus.PublishAsync(new OrderCreated
{
OrderId = order.Id,
CustomerId = request.CustomerId,
CreatedAt = DateTime.UtcNow
});
}
}
// Inventory Service (separate process/service)
public class InventoryService
{
public InventoryService(IMessageBus messageBus)
{
messageBus.SubscribeAsync<OrderCreated>(async order =>
{
await ReserveInventoryAsync(order.OrderId);
});
}
}
// Notification Service (separate process/service)
public class NotificationService
{
public NotificationService(IMessageBus messageBus)
{
messageBus.SubscribeAsync<OrderCreated>(async order =>
{
await SendConfirmationEmailAsync(order.CustomerId);
});
}
}Cache Invalidation
Coordinate cache across instances:
public class CacheInvalidationService
{
private readonly IMessageBus _messageBus;
private readonly ICacheClient _localCache;
public CacheInvalidationService(IMessageBus messageBus, ICacheClient localCache)
{
_messageBus = messageBus;
_localCache = localCache;
// Listen for invalidation messages
_messageBus.SubscribeAsync<CacheInvalidated>(async msg =>
{
await _localCache.RemoveAsync(msg.Key);
});
}
public async Task InvalidateAsync(string key)
{
// Remove locally
await _localCache.RemoveAsync(key);
// Notify other instances
await _messageBus.PublishAsync(new CacheInvalidated { Key = key });
}
}
public record CacheInvalidated { public string Key { get; init; } }Real-Time Updates
Push updates to clients:
// Server-side
public class NotificationHub
{
private readonly IMessageBus _messageBus;
public NotificationHub(IMessageBus messageBus)
{
_messageBus = messageBus;
// Forward bus messages to SignalR/WebSocket
_messageBus.SubscribeAsync<UserNotification>(async notification =>
{
await _hubContext.Clients
.User(notification.UserId)
.SendAsync("notification", notification);
});
}
}
// When something happens
await messageBus.PublishAsync(new UserNotification
{
UserId = "user-123",
Message = "Your order has shipped!"
});Saga/Process Manager
Coordinate multi-step processes:
public class OrderSaga
{
private readonly IMessageBus _messageBus;
public OrderSaga(IMessageBus messageBus)
{
_messageBus = messageBus;
// Step 1: Order created -> Reserve inventory
_messageBus.SubscribeAsync<OrderCreated>(async order =>
{
await ReserveInventoryAsync(order.OrderId);
await _messageBus.PublishAsync(new InventoryReserved { OrderId = order.OrderId });
});
// Step 2: Inventory reserved -> Process payment
_messageBus.SubscribeAsync<InventoryReserved>(async evt =>
{
await ProcessPaymentAsync(evt.OrderId);
await _messageBus.PublishAsync(new PaymentProcessed { OrderId = evt.OrderId });
});
// Step 3: Payment processed -> Ship order
_messageBus.SubscribeAsync<PaymentProcessed>(async evt =>
{
await ShipOrderAsync(evt.OrderId);
});
}
}Message Options
Configure message delivery:
await messageBus.PublishAsync(new OrderCreated { OrderId = 123 }, new MessageOptions
{
// Unique message identifier
UniqueId = Guid.NewGuid().ToString(),
// For tracing across services
CorrelationId = Activity.Current?.Id,
// Delayed delivery
DeliveryDelay = TimeSpan.FromMinutes(5),
// Custom properties
Properties = new Dictionary<string, string>
{
["source"] = "order-service",
["version"] = "1.0"
}
});Dependency Injection
Basic Registration
// In-memory (development)
services.AddSingleton<IMessageBus, InMemoryMessageBus>();
// Redis (production)
services.AddSingleton<IMessageBus>(sp =>
{
var redis = sp.GetRequiredService<IConnectionMultiplexer>();
return new RedisMessageBus(o => o.Subscriber = redis.GetSubscriber());
});Subscribe at Startup
public class MessageSubscriber : IHostedService
{
private readonly IMessageBus _messageBus;
private readonly IServiceProvider _services;
public MessageSubscriber(IMessageBus messageBus, IServiceProvider services)
{
_messageBus = messageBus;
_services = services;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _messageBus.SubscribeAsync<OrderCreated>(async (msg, ct) =>
{
using var scope = _services.CreateScope();
var handler = scope.ServiceProvider.GetRequiredService<IOrderHandler>();
await handler.HandleAsync(msg, ct);
}, cancellationToken);
}
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
// Register
services.AddHostedService<MessageSubscriber>();Error Handling
In Subscribers
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
try
{
await ProcessOrderAsync(order);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process order {OrderId}", order.OrderId);
// Optionally publish failure event
await _messageBus.PublishAsync(new OrderProcessingFailed
{
OrderId = order.OrderId,
Error = ex.Message
});
}
});With Retry
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
await _resiliencePolicy.ExecuteAsync(async ct =>
{
await ProcessOrderAsync(order, ct);
});
});Best Practices
1. Use Immutable Messages
// ✅ Good: Immutable record
public record OrderCreated
{
public int OrderId { get; init; }
public required string CustomerId { get; init; }
}
// ❌ Bad: Mutable class
public class OrderCreated
{
public int OrderId { get; set; }
public string CustomerId { get; set; }
}2. Include Timestamp and Correlation
public record OrderCreated
{
public int OrderId { get; init; }
public DateTime OccurredAt { get; init; } = DateTime.UtcNow;
public string CorrelationId { get; init; } = Activity.Current?.Id;
}3. Handle Idempotency
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
// Check if already processed
if (await _processedEvents.ContainsAsync(order.EventId))
{
_logger.LogDebug("Already processed {EventId}", order.EventId);
return;
}
await ProcessOrderAsync(order);
await _processedEvents.AddAsync(order.EventId);
});4. Use Specific Message Types
// ✅ Good: Specific, intentional messages
public record OrderCreated { ... }
public record OrderShipped { ... }
public record OrderCancelled { ... }
// ❌ Bad: Generic, multi-purpose messages
public record OrderEvent { public string Action { get; set; } }5. Keep Messages Small
// ✅ Good: Just identifiers
public record OrderCreated
{
public int OrderId { get; init; }
}
// ❌ Bad: Full entity in message
public record OrderCreated
{
public Order FullOrderWithAllDetails { get; init; }
}