Building a Distributed Order Processing System with AMQP
10 minute read
This tutorial demonstrates how to build a distributed order processing system using AMQP (Advanced Message Queuing Protocol) with RabbitMQ and .NET. You’ll create a solution that includes:
- An order producer service that publishes order messages
- An order processing service that consumes these messages
- A notification service that receives events about processed orders
Prerequisites
- .NET 6.0 SDK or higher
- Docker (for running RabbitMQ)
- Visual Studio 2022, Visual Studio Code, or JetBrains Rider
Step 1: Set Up RabbitMQ
Start by running RabbitMQ in Docker:
docker run -d --hostname my-rabbit --name rabbit-mq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
This command:
- Starts RabbitMQ with the management plugin
- Exposes port 5672 (AMQP) and 15672 (Management UI)
- The Management UI will be available at http://localhost:15672 (credentials: guest/guest)
Step 2: Create the Solution Structure
Create a new solution with three projects:
# Create solution
dotnet new sln -n AmqpOrderSystem
# Create projects
dotnet new console -n OrderProducer -o OrderProducer
dotnet new console -n OrderProcessor -o OrderProcessor
dotnet new console -n NotificationService -o NotificationService
dotnet new classlib -n Shared -o Shared
# Add projects to solution
dotnet sln add OrderProducer/OrderProducer.csproj
dotnet sln add OrderProcessor/OrderProcessor.csproj
dotnet sln add NotificationService/NotificationService.csproj
dotnet sln add Shared/Shared.csproj
# Add RabbitMQ.Client package to all projects
dotnet add OrderProducer/OrderProducer.csproj package RabbitMQ.Client
dotnet add OrderProcessor/OrderProcessor.csproj package RabbitMQ.Client
dotnet add NotificationService/NotificationService.csproj package RabbitMQ.Client
dotnet add Shared/Shared.csproj package RabbitMQ.Client
Step 3: Create Shared Models
In the Shared project, create message models that will be used by all services.
Create a file named Models.cs
in the Shared project:
using System;
namespace Shared
{
// Base message for all messages
public abstract class Message
{
public Guid Id { get; set; } = Guid.NewGuid();
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
// Command message for creating an order
public class CreateOrderCommand : Message
{
public string CustomerId { get; set; }
public string CustomerName { get; set; }
public decimal TotalAmount { get; set; }
public OrderItem[] Items { get; set; }
}
// Event message for when an order is processed
public class OrderProcessedEvent : Message
{
public string OrderId { get; set; }
public string CustomerId { get; set; }
public string CustomerName { get; set; }
public decimal TotalAmount { get; set; }
public string Status { get; set; }
public DateTime ProcessedAt { get; set; }
}
// Data structure for order items
public class OrderItem
{
public string ProductId { get; set; }
public string ProductName { get; set; }
public int Quantity { get; set; }
public decimal UnitPrice { get; set; }
}
}
Create a file named AmqpSettings.cs
in the Shared project:
namespace Shared
{
public static class AmqpSettings
{
public const string HostName = "localhost";
public const string OrderExchange = "order_exchange";
public const string OrderQueue = "order_queue";
public const string NotificationExchange = "notification_exchange";
public const string NotificationQueue = "notification_queue";
public const string CreateOrderRoutingKey = "order.create";
public const string OrderProcessedRoutingKey = "order.processed";
}
}
Step 4: Implement the Order Producer
Create the following files in the OrderProducer project:
Create AmqpProducer.cs
:
using System;
using System.Text;
using System.Text.Json;
using RabbitMQ.Client;
using Shared;
namespace OrderProducer
{
public class AmqpProducer : IDisposable
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly string _exchangeName;
public AmqpProducer(string hostName, string exchangeName)
{
_exchangeName = exchangeName;
ConnectionFactory factory = new ConnectionFactory { HostName = hostName };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(
exchange: exchangeName,
type: ExchangeType.Topic,
durable: true,
autoDelete: false);
}
public void SendMessage<T>(T message, string routingKey) where T : Message
{
string json = JsonSerializer.Serialize(message);
byte[] body = Encoding.UTF8.GetBytes(json);
IBasicProperties properties = _channel.CreateBasicProperties();
properties.Persistent = true;
properties.ContentType = "application/json";
properties.MessageId = message.Id.ToString();
properties.Timestamp = new AmqpTimestamp(
new DateTimeOffset(message.Timestamp).ToUnixTimeSeconds());
properties.Type = typeof(T).Name;
_channel.BasicPublish(
exchange: _exchangeName,
routingKey: routingKey,
basicProperties: properties,
body: body);
Console.WriteLine($"Sent message {typeof(T).Name} with routing key {routingKey}");
}
public void Dispose()
{
_channel?.Dispose();
_connection?.Dispose();
}
}
}
Update Program.cs
:
using System;
using System.Threading;
using Shared;
namespace OrderProducer
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Order Producer Service");
Console.WriteLine("Press any key to start sending orders...");
Console.ReadKey();
using (AmqpProducer producer = new AmqpProducer(
AmqpSettings.HostName,
AmqpSettings.OrderExchange))
{
while (true)
{
try
{
// Create a random order
CreateOrderCommand order = CreateRandomOrder();
// Send the order
producer.SendMessage(order, AmqpSettings.CreateOrderRoutingKey);
Console.WriteLine($"Order created for {order.CustomerName} with total ${order.TotalAmount}");
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
Console.WriteLine("Press Enter to send another order or Escape to exit");
ConsoleKeyInfo key = Console.ReadKey();
if (key.Key == ConsoleKey.Escape)
break;
}
}
}
static CreateOrderCommand CreateRandomOrder()
{
string[] customers = new[] { "Alice", "Bob", "Charlie", "Dave", "Eve" };
string[] products = new[] { "Laptop", "Monitor", "Keyboard", "Mouse", "Headphones" };
Random random = new Random();
string customerId = Guid.NewGuid().ToString("N").Substring(0, 8);
string customerName = customers[random.Next(customers.Length)];
int itemCount = random.Next(1, 4);
OrderItem[] items = new OrderItem[itemCount];
decimal totalAmount = 0;
for (int i = 0; i < itemCount; i++)
{
string productId = Guid.NewGuid().ToString("N").Substring(0, 8);
string productName = products[random.Next(products.Length)];
int quantity = random.Next(1, 5);
decimal unitPrice = Math.Round((decimal)(random.NextDouble() * 100 + 10), 2);
decimal itemTotal = quantity * unitPrice;
items[i] = new OrderItem
{
ProductId = productId,
ProductName = productName,
Quantity = quantity,
UnitPrice = unitPrice
};
totalAmount += itemTotal;
}
return new CreateOrderCommand
{
CustomerId = customerId,
CustomerName = customerName,
TotalAmount = totalAmount,
Items = items
};
}
}
}
Step 5: Implement the Order Processor
Create the following files in the OrderProcessor project:
Create AmqpConsumer.cs
:
using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Shared;
namespace OrderProcessor
{
public class AmqpConsumer : IDisposable
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly string _queueName;
public AmqpConsumer(
string hostName,
string exchangeName,
string queueName,
string routingKey)
{
_queueName = queueName;
ConnectionFactory factory = new ConnectionFactory { HostName = hostName };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(
exchange: exchangeName,
type: ExchangeType.Topic,
durable: true,
autoDelete: false);
_channel.QueueDeclare(
queue: queueName,
durable: true,
exclusive: false,
autoDelete: false);
_channel.QueueBind(
queue: queueName,
exchange: exchangeName,
routingKey: routingKey);
// Set prefetch count to 1 to ensure fair dispatching
_channel.BasicQos(
prefetchSize: 0,
prefetchCount: 1,
global: false);
}
public void StartConsuming<T>(
Action<T> processMessage,
CancellationToken cancellationToken = default) where T : class
{
EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
try
{
// Extract message body
byte[] body = ea.Body.ToArray();
string message = Encoding.UTF8.GetString(body);
// Deserialize message
T messageObject = JsonSerializer.Deserialize<T>(message);
Console.WriteLine($"Received message: {ea.RoutingKey}");
// Process the message
processMessage(messageObject);
// Acknowledge message
_channel.BasicAck(
deliveryTag: ea.DeliveryTag,
multiple: false);
}
catch (Exception ex)
{
Console.WriteLine($"Error processing message: {ex.Message}");
// Negative acknowledgment - don't requeue if it's a serialization issue
bool requeue = !(ex is JsonException);
_channel.BasicNack(
deliveryTag: ea.DeliveryTag,
multiple: false,
requeue: requeue);
}
};
_channel.BasicConsume(
queue: _queueName,
autoAck: false,
consumer: consumer);
Console.WriteLine($"Started consuming from queue: {_queueName}");
// Keep the consumer running until cancellation is requested
cancellationToken.WaitHandle.WaitOne();
}
public void Dispose()
{
_channel?.Dispose();
_connection?.Dispose();
}
}
}
Create AmqpProducer.cs
(similar to the one in OrderProducer project):
using System;
using System.Text;
using System.Text.Json;
using RabbitMQ.Client;
using Shared;
namespace OrderProcessor
{
public class AmqpProducer : IDisposable
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly string _exchangeName;
public AmqpProducer(string hostName, string exchangeName)
{
_exchangeName = exchangeName;
ConnectionFactory factory = new ConnectionFactory { HostName = hostName };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(
exchange: exchangeName,
type: ExchangeType.Topic,
durable: true,
autoDelete: false);
}
public void SendMessage<T>(T message, string routingKey) where T : Message
{
string json = JsonSerializer.Serialize(message);
byte[] body = Encoding.UTF8.GetBytes(json);
IBasicProperties properties = _channel.CreateBasicProperties();
properties.Persistent = true;
properties.ContentType = "application/json";
properties.MessageId = message.Id.ToString();
properties.Timestamp = new AmqpTimestamp(
new DateTimeOffset(message.Timestamp).ToUnixTimeSeconds());
properties.Type = typeof(T).Name;
_channel.BasicPublish(
exchange: _exchangeName,
routingKey: routingKey,
basicProperties: properties,
body: body);
Console.WriteLine($"Sent message {typeof(T).Name} with routing key {routingKey}");
}
public void Dispose()
{
_channel?.Dispose();
_connection?.Dispose();
}
}
}
Update Program.cs
:
using System;
using System.Threading;
using Shared;
namespace OrderProcessor
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Order Processor Service");
Console.WriteLine("Listening for orders...");
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
// Create notification producer
AmqpProducer notificationProducer = new AmqpProducer(
AmqpSettings.HostName,
AmqpSettings.NotificationExchange);
// Create and start consumer
using (AmqpConsumer consumer = new AmqpConsumer(
AmqpSettings.HostName,
AmqpSettings.OrderExchange,
AmqpSettings.OrderQueue,
AmqpSettings.CreateOrderRoutingKey))
{
consumer.StartConsuming<CreateOrderCommand>(order =>
{
try
{
Console.WriteLine($"Processing order from {order.CustomerName} for ${order.TotalAmount}");
// Simulate order processing time
Thread.Sleep(2000);
// Generate order ID
string orderId = Guid.NewGuid().ToString("N").Substring(0, 8);
// Create order processed event
OrderProcessedEvent processedEvent = new OrderProcessedEvent
{
OrderId = orderId,
CustomerId = order.CustomerId,
CustomerName = order.CustomerName,
TotalAmount = order.TotalAmount,
Status = "Completed",
ProcessedAt = DateTime.UtcNow
};
// Publish order processed event
notificationProducer.SendMessage(
processedEvent,
AmqpSettings.OrderProcessedRoutingKey);
Console.WriteLine($"Order processed: {orderId}");
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}, cancellationTokenSource.Token);
Console.WriteLine("Press Escape to stop processing");
while (true)
{
if (Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Escape)
{
cancellationTokenSource.Cancel();
break;
}
Thread.Sleep(100);
}
notificationProducer.Dispose();
}
}
}
}
Step 6: Implement the Notification Service
Create the following files in the NotificationService project:
Create AmqpConsumer.cs
(same as in OrderProcessor):
using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Shared;
namespace NotificationService
{
public class AmqpConsumer : IDisposable
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly string _queueName;
public AmqpConsumer(
string hostName,
string exchangeName,
string queueName,
string routingKey)
{
_queueName = queueName;
ConnectionFactory factory = new ConnectionFactory { HostName = hostName };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(
exchange: exchangeName,
type: ExchangeType.Topic,
durable: true,
autoDelete: false);
_channel.QueueDeclare(
queue: queueName,
durable: true,
exclusive: false,
autoDelete: false);
_channel.QueueBind(
queue: queueName,
exchange: exchangeName,
routingKey: routingKey);
// Set prefetch count to 1 to ensure fair dispatching
_channel.BasicQos(
prefetchSize: 0,
prefetchCount: 1,
global: false);
}
public void StartConsuming<T>(
Action<T> processMessage,
CancellationToken cancellationToken = default) where T : class
{
EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
try
{
// Extract message body
byte[] body = ea.Body.ToArray();
string message = Encoding.UTF8.GetString(body);
// Deserialize message
T messageObject = JsonSerializer.Deserialize<T>(message);
Console.WriteLine($"Received message: {ea.RoutingKey}");
// Process the message
processMessage(messageObject);
// Acknowledge message
_channel.BasicAck(
deliveryTag: ea.DeliveryTag,
multiple: false);
}
catch (Exception ex)
{
Console.WriteLine($"Error processing message: {ex.Message}");
// Negative acknowledgment - don't requeue if it's a serialization issue
bool requeue = !(ex is JsonException);
_channel.BasicNack(
deliveryTag: ea.DeliveryTag,
multiple: false,
requeue: requeue);
}
};
_channel.BasicConsume(
queue: _queueName,
autoAck: false,
consumer: consumer);
Console.WriteLine($"Started consuming from queue: {_queueName}");
// Keep the consumer running until cancellation is requested
cancellationToken.WaitHandle.WaitOne();
}
public void Dispose()
{
_channel?.Dispose();
_connection?.Dispose();
}
}
}
Update Program.cs
:
using System;
using System.Threading;
using Shared;
namespace NotificationService
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Notification Service");
Console.WriteLine("Listening for order notifications...");
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
using (AmqpConsumer consumer = new AmqpConsumer(
AmqpSettings.HostName,
AmqpSettings.NotificationExchange,
AmqpSettings.NotificationQueue,
AmqpSettings.OrderProcessedRoutingKey))
{
consumer.StartConsuming<OrderProcessedEvent>(notification =>
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("╔═══════════════════════════════════════════╗");
Console.WriteLine("║ ORDER NOTIFICATION ║");
Console.WriteLine("╚═══════════════════════════════════════════╝");
Console.WriteLine($"Order ID: {notification.OrderId}");
Console.WriteLine($"Customer: {notification.CustomerName}");
Console.WriteLine($"Total Amount: ${notification.TotalAmount}");
Console.WriteLine($"Status: {notification.Status}");
Console.WriteLine($"Processed At: {notification.ProcessedAt}");
Console.WriteLine("═══════════════════════════════════════════════");
Console.ResetColor();
}, cancellationTokenSource.Token);
Console.WriteLine("Press Escape to stop listening");
while (true)
{
if (Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Escape)
{
cancellationTokenSource.Cancel();
break;
}
Thread.Sleep(100);
}
}
}
}
}
Step 7: Run the Solution
You need to run all three projects to see the system in action.
First, run the OrderProcessor:
cd OrderProcessor dotnet run
Then run the NotificationService:
cd NotificationService dotnet run
Finally, run the OrderProducer:
cd OrderProducer dotnet run
In the OrderProducer window, follow the prompts to create orders.
Watch as the OrderProcessor processes the orders.
Observe notifications appearing in the NotificationService window.
Understanding the Flow
- OrderProducer creates order commands and publishes them to the order exchange with routing key “order.create”
- OrderProcessor consumes these commands from the order queue, processes them, and publishes order processed events to the notification exchange
- NotificationService consumes the order processed events and displays notifications
This demonstrates the asynchronous, loosely-coupled nature of AMQP communication. Each service operates independently, communicating only through message queues.
Extending the System
You could extend this system with:
- Error Handling: Add dead letter queues for failed message processing
- Persistence: Save orders and notifications to a database
- Retry Logic: Implement retry policies for transient failures
- Monitoring: Add health checks and metrics collection
- Web Interface: Create a web UI to display orders and notifications
Conclusion
This tutorial demonstrated how to build a distributed system using AMQP with RabbitMQ in .NET. The message-oriented approach provides several advantages:
- Loose Coupling: Services communicate through messages, not direct calls
- Reliability: Messages are persisted and acknowledged
- Scalability: Services can be scaled independently
- Flexibility: New consumers can be added without modifying producers
AMQP provides a robust alternative to REST APIs for service-to-service communication, especially when asynchronous processing and reliability are important requirements.