Skip to content

Messaging

Messaging allows you to publish and subscribe to messages flowing through your application using pub/sub patterns. Foundatio provides multiple message bus implementations through the IMessageBus interface.

The IMessageBus Interface

csharp
public interface IMessageBus : IMessagePublisher, IMessageSubscriber, IDisposable
{
}

public interface IMessagePublisher
{
    Task PublishAsync(Type messageType, object message,
                      MessageOptions options = null,
                      CancellationToken cancellationToken = default);
}

public interface IMessageSubscriber
{
    Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler,
                           CancellationToken cancellationToken = default) where T : class;
}

Implementations

InMemoryMessageBus

An in-memory message bus for development and testing:

csharp
using Foundatio.Messaging;

var messageBus = new InMemoryMessageBus();

// Subscribe to messages
await messageBus.SubscribeAsync<OrderCreated>(async msg =>
{
    Console.WriteLine($"Order created: {msg.OrderId}");
});

// Publish a message
await messageBus.PublishAsync(new OrderCreated { OrderId = 123 });

RedisMessageBus

Distributed messaging using Redis pub/sub (separate package):

csharp
// dotnet add package Foundatio.Redis

using Foundatio.Redis.Messaging;
using StackExchange.Redis;

var redis = await ConnectionMultiplexer.ConnectAsync("localhost:6379");
var messageBus = new RedisMessageBus(o => o.Subscriber = redis.GetSubscriber());

RabbitMQMessageBus

Messaging using RabbitMQ (separate package):

csharp
// dotnet add package Foundatio.RabbitMQ

using Foundatio.RabbitMQ.Messaging;

var messageBus = new RabbitMQMessageBus(o => {
    o.ConnectionString = "amqp://guest:guest@localhost:5672";
});

KafkaMessageBus

Messaging using Apache Kafka (separate package):

csharp
// dotnet add package Foundatio.Kafka

using Foundatio.Kafka.Messaging;

var messageBus = new KafkaMessageBus(o => {
    o.BootstrapServers = "localhost:9092";
});

AzureServiceBusMessageBus

Messaging using Azure Service Bus (separate package):

csharp
// dotnet add package Foundatio.AzureServiceBus

using Foundatio.AzureServiceBus.Messaging;

var messageBus = new AzureServiceBusMessageBus(o => {
    o.ConnectionString = "...";
    o.Topic = "events";
});

Basic Usage

Publishing Messages

csharp
var messageBus = new InMemoryMessageBus();

// Simple publish
await messageBus.PublishAsync(new OrderCreated { OrderId = 123 });

// With options
await messageBus.PublishAsync(new OrderCreated { OrderId = 123 }, new MessageOptions
{
    CorrelationId = "request-abc",
    DeliveryDelay = TimeSpan.FromSeconds(30),
    Properties = new Dictionary<string, string>
    {
        ["source"] = "order-service"
    }
});

// Delayed publish (extension method)
await messageBus.PublishAsync(
    new OrderReminder { OrderId = 123 },
    TimeSpan.FromHours(1)
);

Subscribing to Messages

csharp
var messageBus = new InMemoryMessageBus();

// Simple subscription
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
    Console.WriteLine($"Processing order: {order.OrderId}");
});

// With cancellation token
await messageBus.SubscribeAsync<OrderCreated>(
    async (order, ct) =>
    {
        await ProcessOrderAsync(order, ct);
    },
    cancellationToken
);

// Synchronous handler
await messageBus.SubscribeAsync<OrderCreated>(order =>
{
    Console.WriteLine($"Order: {order.OrderId}");
});

Multiple Subscribers

Each subscriber receives every message:

csharp
var messageBus = new InMemoryMessageBus();

// Handler 1: Logging
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
    _logger.LogInformation("Order {OrderId} created", order.OrderId);
});

// Handler 2: Notification
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
    await _notificationService.SendAsync(order.CustomerId, "Order placed!");
});

// Handler 3: Analytics
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
    await _analytics.TrackAsync("order_created", order.OrderId);
});

// All three handlers receive this message
await messageBus.PublishAsync(new OrderCreated { OrderId = 123 });

Message Types

Define Your Messages

csharp
// Simple message
public record OrderCreated
{
    public int OrderId { get; init; }
    public DateTime CreatedAt { get; init; }
    public string CustomerId { get; init; }
}

// Message with interface for grouping
public interface IOrderEvent { int OrderId { get; } }

public record OrderShipped : IOrderEvent
{
    public int OrderId { get; init; }
    public string TrackingNumber { get; init; }
}

public record OrderDelivered : IOrderEvent
{
    public int OrderId { get; init; }
    public DateTime DeliveredAt { get; init; }
}

Subscribe to Interface

Subscribe to all messages implementing an interface:

csharp
// Receives OrderShipped, OrderDelivered, and any other IOrderEvent
await messageBus.SubscribeAsync<IOrderEvent>(async orderEvent =>
{
    _logger.LogInformation("Order event: {Type} for {OrderId}",
        orderEvent.GetType().Name, orderEvent.OrderId);
});

IMessage Interface

Use the built-in IMessage interface for raw message access:

csharp
await messageBus.SubscribeAsync(async (IMessage message, CancellationToken ct) =>
{
    Console.WriteLine($"Type: {message.Type}");
    Console.WriteLine($"Correlation ID: {message.CorrelationId}");

    // Deserialize the data
    var order = message.GetBody<OrderCreated>();
});

Common Patterns

Event-Driven Architecture

Decouple services with events:

csharp
// Order Service
public class OrderService
{
    private readonly IMessageBus _messageBus;

    public async Task CreateOrderAsync(CreateOrderRequest request)
    {
        var order = await _repository.CreateAsync(request);

        // Publish event for other services
        await _messageBus.PublishAsync(new OrderCreated
        {
            OrderId = order.Id,
            CustomerId = request.CustomerId,
            CreatedAt = DateTime.UtcNow
        });
    }
}

// Inventory Service (separate process/service)
public class InventoryService
{
    public InventoryService(IMessageBus messageBus)
    {
        messageBus.SubscribeAsync<OrderCreated>(async order =>
        {
            await ReserveInventoryAsync(order.OrderId);
        });
    }
}

// Notification Service (separate process/service)
public class NotificationService
{
    public NotificationService(IMessageBus messageBus)
    {
        messageBus.SubscribeAsync<OrderCreated>(async order =>
        {
            await SendConfirmationEmailAsync(order.CustomerId);
        });
    }
}

Cache Invalidation

Coordinate cache across instances:

csharp
public class CacheInvalidationService
{
    private readonly IMessageBus _messageBus;
    private readonly ICacheClient _localCache;

    public CacheInvalidationService(IMessageBus messageBus, ICacheClient localCache)
    {
        _messageBus = messageBus;
        _localCache = localCache;

        // Listen for invalidation messages
        _messageBus.SubscribeAsync<CacheInvalidated>(async msg =>
        {
            await _localCache.RemoveAsync(msg.Key);
        });
    }

    public async Task InvalidateAsync(string key)
    {
        // Remove locally
        await _localCache.RemoveAsync(key);

        // Notify other instances
        await _messageBus.PublishAsync(new CacheInvalidated { Key = key });
    }
}

public record CacheInvalidated { public string Key { get; init; } }

Real-Time Updates

Push updates to clients:

csharp
// Server-side
public class NotificationHub
{
    private readonly IMessageBus _messageBus;

    public NotificationHub(IMessageBus messageBus)
    {
        _messageBus = messageBus;

        // Forward bus messages to SignalR/WebSocket
        _messageBus.SubscribeAsync<UserNotification>(async notification =>
        {
            await _hubContext.Clients
                .User(notification.UserId)
                .SendAsync("notification", notification);
        });
    }
}

// When something happens
await messageBus.PublishAsync(new UserNotification
{
    UserId = "user-123",
    Message = "Your order has shipped!"
});

Saga/Process Manager

Coordinate multi-step processes:

csharp
public class OrderSaga
{
    private readonly IMessageBus _messageBus;

    public OrderSaga(IMessageBus messageBus)
    {
        _messageBus = messageBus;

        // Step 1: Order created -> Reserve inventory
        _messageBus.SubscribeAsync<OrderCreated>(async order =>
        {
            await ReserveInventoryAsync(order.OrderId);
            await _messageBus.PublishAsync(new InventoryReserved { OrderId = order.OrderId });
        });

        // Step 2: Inventory reserved -> Process payment
        _messageBus.SubscribeAsync<InventoryReserved>(async evt =>
        {
            await ProcessPaymentAsync(evt.OrderId);
            await _messageBus.PublishAsync(new PaymentProcessed { OrderId = evt.OrderId });
        });

        // Step 3: Payment processed -> Ship order
        _messageBus.SubscribeAsync<PaymentProcessed>(async evt =>
        {
            await ShipOrderAsync(evt.OrderId);
        });
    }
}

Message Options

Configure message delivery:

csharp
await messageBus.PublishAsync(new OrderCreated { OrderId = 123 }, new MessageOptions
{
    // Unique message identifier
    UniqueId = Guid.NewGuid().ToString(),

    // For tracing across services
    CorrelationId = Activity.Current?.Id,

    // Delayed delivery
    DeliveryDelay = TimeSpan.FromMinutes(5),

    // Custom properties
    Properties = new Dictionary<string, string>
    {
        ["source"] = "order-service",
        ["version"] = "1.0"
    }
});

Dependency Injection

Basic Registration

csharp
// In-memory (development)
services.AddSingleton<IMessageBus, InMemoryMessageBus>();

// Redis (production)
services.AddSingleton<IMessageBus>(sp =>
{
    var redis = sp.GetRequiredService<IConnectionMultiplexer>();
    return new RedisMessageBus(o => o.Subscriber = redis.GetSubscriber());
});

Subscribe at Startup

csharp
public class MessageSubscriber : IHostedService
{
    private readonly IMessageBus _messageBus;
    private readonly IServiceProvider _services;

    public MessageSubscriber(IMessageBus messageBus, IServiceProvider services)
    {
        _messageBus = messageBus;
        _services = services;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        await _messageBus.SubscribeAsync<OrderCreated>(async (msg, ct) =>
        {
            using var scope = _services.CreateScope();
            var handler = scope.ServiceProvider.GetRequiredService<IOrderHandler>();
            await handler.HandleAsync(msg, ct);
        }, cancellationToken);
    }

    public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}

// Register
services.AddHostedService<MessageSubscriber>();

Error Handling

In Subscribers

csharp
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
    try
    {
        await ProcessOrderAsync(order);
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Failed to process order {OrderId}", order.OrderId);

        // Optionally publish failure event
        await _messageBus.PublishAsync(new OrderProcessingFailed
        {
            OrderId = order.OrderId,
            Error = ex.Message
        });
    }
});

With Retry

csharp
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
    await _resiliencePolicy.ExecuteAsync(async ct =>
    {
        await ProcessOrderAsync(order, ct);
    });
});

Best Practices

1. Use Immutable Messages

csharp
// ✅ Good: Immutable record
public record OrderCreated
{
    public int OrderId { get; init; }
    public required string CustomerId { get; init; }
}

// ❌ Bad: Mutable class
public class OrderCreated
{
    public int OrderId { get; set; }
    public string CustomerId { get; set; }
}

2. Include Timestamp and Correlation

csharp
public record OrderCreated
{
    public int OrderId { get; init; }
    public DateTime OccurredAt { get; init; } = DateTime.UtcNow;
    public string CorrelationId { get; init; } = Activity.Current?.Id;
}

3. Handle Idempotency

csharp
await messageBus.SubscribeAsync<OrderCreated>(async order =>
{
    // Check if already processed
    if (await _processedEvents.ContainsAsync(order.EventId))
    {
        _logger.LogDebug("Already processed {EventId}", order.EventId);
        return;
    }

    await ProcessOrderAsync(order);
    await _processedEvents.AddAsync(order.EventId);
});

4. Use Specific Message Types

csharp
// ✅ Good: Specific, intentional messages
public record OrderCreated { ... }
public record OrderShipped { ... }
public record OrderCancelled { ... }

// ❌ Bad: Generic, multi-purpose messages
public record OrderEvent { public string Action { get; set; } }

5. Keep Messages Small

csharp
// ✅ Good: Just identifiers
public record OrderCreated
{
    public int OrderId { get; init; }
}

// ❌ Bad: Full entity in message
public record OrderCreated
{
    public Order FullOrderWithAllDetails { get; init; }
}

Next Steps

  • Queues - For guaranteed delivery with acknowledgment
  • Caching - Cache invalidation with messaging
  • Jobs - Background processing triggered by messages

Released under the Apache 2.0 License.