Building a Distributed Order Processing System with AMQP

Hands-on tutorial for implementing AMQP messaging in .NET applications with RabbitMQ

This tutorial demonstrates how to build a distributed order processing system using AMQP (Advanced Message Queuing Protocol) with RabbitMQ and .NET. You’ll create a solution that includes:

  1. An order producer service that publishes order messages
  2. An order processing service that consumes these messages
  3. A notification service that receives events about processed orders

Prerequisites

  • .NET 6.0 SDK or higher
  • Docker (for running RabbitMQ)
  • Visual Studio 2022, Visual Studio Code, or JetBrains Rider

Step 1: Set Up RabbitMQ

Start by running RabbitMQ in Docker:

docker run -d --hostname my-rabbit --name rabbit-mq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

This command:

  • Starts RabbitMQ with the management plugin
  • Exposes port 5672 (AMQP) and 15672 (Management UI)
  • The Management UI will be available at http://localhost:15672 (credentials: guest/guest)

Step 2: Create the Solution Structure

Create a new solution with three projects:

# Create solution
dotnet new sln -n AmqpOrderSystem

# Create projects
dotnet new console -n OrderProducer -o OrderProducer
dotnet new console -n OrderProcessor -o OrderProcessor
dotnet new console -n NotificationService -o NotificationService
dotnet new classlib -n Shared -o Shared

# Add projects to solution
dotnet sln add OrderProducer/OrderProducer.csproj
dotnet sln add OrderProcessor/OrderProcessor.csproj
dotnet sln add NotificationService/NotificationService.csproj
dotnet sln add Shared/Shared.csproj

# Add RabbitMQ.Client package to all projects
dotnet add OrderProducer/OrderProducer.csproj package RabbitMQ.Client
dotnet add OrderProcessor/OrderProcessor.csproj package RabbitMQ.Client
dotnet add NotificationService/NotificationService.csproj package RabbitMQ.Client
dotnet add Shared/Shared.csproj package RabbitMQ.Client

Step 3: Create Shared Models

In the Shared project, create message models that will be used by all services.

Create a file named Models.cs in the Shared project:

using System;

namespace Shared
{
    // Base message for all messages
    public abstract class Message
    {
        public Guid Id { get; set; } = Guid.NewGuid();
        public DateTime Timestamp { get; set; } = DateTime.UtcNow;
    }

    // Command message for creating an order
    public class CreateOrderCommand : Message
    {
        public string CustomerId { get; set; }
        public string CustomerName { get; set; }
        public decimal TotalAmount { get; set; }
        public OrderItem[] Items { get; set; }
    }

    // Event message for when an order is processed
    public class OrderProcessedEvent : Message
    {
        public string OrderId { get; set; }
        public string CustomerId { get; set; }
        public string CustomerName { get; set; }
        public decimal TotalAmount { get; set; }
        public string Status { get; set; }
        public DateTime ProcessedAt { get; set; }
    }

    // Data structure for order items
    public class OrderItem
    {
        public string ProductId { get; set; }
        public string ProductName { get; set; }
        public int Quantity { get; set; }
        public decimal UnitPrice { get; set; }
    }
}

Create a file named AmqpSettings.cs in the Shared project:

namespace Shared
{
    public static class AmqpSettings
    {
        public const string HostName = "localhost";
        public const string OrderExchange = "order_exchange";
        public const string OrderQueue = "order_queue";
        public const string NotificationExchange = "notification_exchange";
        public const string NotificationQueue = "notification_queue";
        public const string CreateOrderRoutingKey = "order.create";
        public const string OrderProcessedRoutingKey = "order.processed";
    }
}

Step 4: Implement the Order Producer

Create the following files in the OrderProducer project:

Create AmqpProducer.cs:

using System;
using System.Text;
using System.Text.Json;
using RabbitMQ.Client;
using Shared;

namespace OrderProducer
{
    public class AmqpProducer : IDisposable
    {
        private readonly IConnection _connection;
        private readonly IModel _channel;
        private readonly string _exchangeName;

        public AmqpProducer(string hostName, string exchangeName)
        {
            _exchangeName = exchangeName;
            
            ConnectionFactory factory = new ConnectionFactory { HostName = hostName };
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();
            
            _channel.ExchangeDeclare(
                exchange: exchangeName,
                type: ExchangeType.Topic,
                durable: true,
                autoDelete: false);
        }

        public void SendMessage<T>(T message, string routingKey) where T : Message
        {
            string json = JsonSerializer.Serialize(message);
            byte[] body = Encoding.UTF8.GetBytes(json);

            IBasicProperties properties = _channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.ContentType = "application/json";
            properties.MessageId = message.Id.ToString();
            properties.Timestamp = new AmqpTimestamp(
                new DateTimeOffset(message.Timestamp).ToUnixTimeSeconds());
            properties.Type = typeof(T).Name;

            _channel.BasicPublish(
                exchange: _exchangeName,
                routingKey: routingKey,
                basicProperties: properties,
                body: body);

            Console.WriteLine($"Sent message {typeof(T).Name} with routing key {routingKey}");
        }

        public void Dispose()
        {
            _channel?.Dispose();
            _connection?.Dispose();
        }
    }
}

Update Program.cs:

using System;
using System.Threading;
using Shared;

namespace OrderProducer
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Order Producer Service");
            Console.WriteLine("Press any key to start sending orders...");
            Console.ReadKey();

            using (AmqpProducer producer = new AmqpProducer(
                AmqpSettings.HostName, 
                AmqpSettings.OrderExchange))
            {
                while (true)
                {
                    try
                    {
                        // Create a random order
                        CreateOrderCommand order = CreateRandomOrder();
                        
                        // Send the order
                        producer.SendMessage(order, AmqpSettings.CreateOrderRoutingKey);
                        
                        Console.WriteLine($"Order created for {order.CustomerName} with total ${order.TotalAmount}");
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"Error: {ex.Message}");
                    }

                    Console.WriteLine("Press Enter to send another order or Escape to exit");
                    ConsoleKeyInfo key = Console.ReadKey();
                    if (key.Key == ConsoleKey.Escape)
                        break;
                }
            }
        }

        static CreateOrderCommand CreateRandomOrder()
        {
            string[] customers = new[] { "Alice", "Bob", "Charlie", "Dave", "Eve" };
            string[] products = new[] { "Laptop", "Monitor", "Keyboard", "Mouse", "Headphones" };
            
            Random random = new Random();
            
            string customerId = Guid.NewGuid().ToString("N").Substring(0, 8);
            string customerName = customers[random.Next(customers.Length)];
            
            int itemCount = random.Next(1, 4);
            OrderItem[] items = new OrderItem[itemCount];
            decimal totalAmount = 0;
            
            for (int i = 0; i < itemCount; i++)
            {
                string productId = Guid.NewGuid().ToString("N").Substring(0, 8);
                string productName = products[random.Next(products.Length)];
                int quantity = random.Next(1, 5);
                decimal unitPrice = Math.Round((decimal)(random.NextDouble() * 100 + 10), 2);
                decimal itemTotal = quantity * unitPrice;
                
                items[i] = new OrderItem
                {
                    ProductId = productId,
                    ProductName = productName,
                    Quantity = quantity,
                    UnitPrice = unitPrice
                };
                
                totalAmount += itemTotal;
            }
            
            return new CreateOrderCommand
            {
                CustomerId = customerId,
                CustomerName = customerName,
                TotalAmount = totalAmount,
                Items = items
            };
        }
    }
}

Step 5: Implement the Order Processor

Create the following files in the OrderProcessor project:

Create AmqpConsumer.cs:

using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Shared;

namespace OrderProcessor
{
    public class AmqpConsumer : IDisposable
    {
        private readonly IConnection _connection;
        private readonly IModel _channel;
        private readonly string _queueName;

        public AmqpConsumer(
            string hostName,
            string exchangeName,
            string queueName,
            string routingKey)
        {
            _queueName = queueName;
            
            ConnectionFactory factory = new ConnectionFactory { HostName = hostName };
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();
            
            _channel.ExchangeDeclare(
                exchange: exchangeName,
                type: ExchangeType.Topic,
                durable: true,
                autoDelete: false);
                
            _channel.QueueDeclare(
                queue: queueName,
                durable: true,
                exclusive: false,
                autoDelete: false);
                
            _channel.QueueBind(
                queue: queueName,
                exchange: exchangeName,
                routingKey: routingKey);
                
            // Set prefetch count to 1 to ensure fair dispatching
            _channel.BasicQos(
                prefetchSize: 0,
                prefetchCount: 1,
                global: false);
        }

        public void StartConsuming<T>(
            Action<T> processMessage,
            CancellationToken cancellationToken = default) where T : class
        {
            EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
            
            consumer.Received += (model, ea) =>
            {
                try
                {
                    // Extract message body
                    byte[] body = ea.Body.ToArray();
                    string message = Encoding.UTF8.GetString(body);
                    
                    // Deserialize message
                    T messageObject = JsonSerializer.Deserialize<T>(message);
                    
                    Console.WriteLine($"Received message: {ea.RoutingKey}");
                    
                    // Process the message
                    processMessage(messageObject);
                    
                    // Acknowledge message
                    _channel.BasicAck(
                        deliveryTag: ea.DeliveryTag,
                        multiple: false);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Error processing message: {ex.Message}");
                    
                    // Negative acknowledgment - don't requeue if it's a serialization issue
                    bool requeue = !(ex is JsonException);
                    _channel.BasicNack(
                        deliveryTag: ea.DeliveryTag,
                        multiple: false,
                        requeue: requeue);
                }
            };
            
            _channel.BasicConsume(
                queue: _queueName,
                autoAck: false,
                consumer: consumer);
                
            Console.WriteLine($"Started consuming from queue: {_queueName}");
            
            // Keep the consumer running until cancellation is requested
            cancellationToken.WaitHandle.WaitOne();
        }

        public void Dispose()
        {
            _channel?.Dispose();
            _connection?.Dispose();
        }
    }
}

Create AmqpProducer.cs (similar to the one in OrderProducer project):

using System;
using System.Text;
using System.Text.Json;
using RabbitMQ.Client;
using Shared;

namespace OrderProcessor
{
    public class AmqpProducer : IDisposable
    {
        private readonly IConnection _connection;
        private readonly IModel _channel;
        private readonly string _exchangeName;

        public AmqpProducer(string hostName, string exchangeName)
        {
            _exchangeName = exchangeName;
            
            ConnectionFactory factory = new ConnectionFactory { HostName = hostName };
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();
            
            _channel.ExchangeDeclare(
                exchange: exchangeName,
                type: ExchangeType.Topic,
                durable: true,
                autoDelete: false);
        }

        public void SendMessage<T>(T message, string routingKey) where T : Message
        {
            string json = JsonSerializer.Serialize(message);
            byte[] body = Encoding.UTF8.GetBytes(json);

            IBasicProperties properties = _channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.ContentType = "application/json";
            properties.MessageId = message.Id.ToString();
            properties.Timestamp = new AmqpTimestamp(
                new DateTimeOffset(message.Timestamp).ToUnixTimeSeconds());
            properties.Type = typeof(T).Name;

            _channel.BasicPublish(
                exchange: _exchangeName,
                routingKey: routingKey,
                basicProperties: properties,
                body: body);

            Console.WriteLine($"Sent message {typeof(T).Name} with routing key {routingKey}");
        }

        public void Dispose()
        {
            _channel?.Dispose();
            _connection?.Dispose();
        }
    }
}

Update Program.cs:

using System;
using System.Threading;
using Shared;

namespace OrderProcessor
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Order Processor Service");
            Console.WriteLine("Listening for orders...");
            
            CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
            
            // Create notification producer
            AmqpProducer notificationProducer = new AmqpProducer(
                AmqpSettings.HostName,
                AmqpSettings.NotificationExchange);
            
            // Create and start consumer
            using (AmqpConsumer consumer = new AmqpConsumer(
                AmqpSettings.HostName,
                AmqpSettings.OrderExchange,
                AmqpSettings.OrderQueue,
                AmqpSettings.CreateOrderRoutingKey))
            {
                consumer.StartConsuming<CreateOrderCommand>(order =>
                {
                    try
                    {
                        Console.WriteLine($"Processing order from {order.CustomerName} for ${order.TotalAmount}");
                        
                        // Simulate order processing time
                        Thread.Sleep(2000);
                        
                        // Generate order ID
                        string orderId = Guid.NewGuid().ToString("N").Substring(0, 8);
                        
                        // Create order processed event
                        OrderProcessedEvent processedEvent = new OrderProcessedEvent
                        {
                            OrderId = orderId,
                            CustomerId = order.CustomerId,
                            CustomerName = order.CustomerName,
                            TotalAmount = order.TotalAmount,
                            Status = "Completed",
                            ProcessedAt = DateTime.UtcNow
                        };
                        
                        // Publish order processed event
                        notificationProducer.SendMessage(
                            processedEvent,
                            AmqpSettings.OrderProcessedRoutingKey);
                            
                        Console.WriteLine($"Order processed: {orderId}");
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"Error: {ex.Message}");
                    }
                }, cancellationTokenSource.Token);
                
                Console.WriteLine("Press Escape to stop processing");
                
                while (true)
                {
                    if (Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Escape)
                    {
                        cancellationTokenSource.Cancel();
                        break;
                    }
                    
                    Thread.Sleep(100);
                }
                
                notificationProducer.Dispose();
            }
        }
    }
}

Step 6: Implement the Notification Service

Create the following files in the NotificationService project:

Create AmqpConsumer.cs (same as in OrderProcessor):

using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Shared;

namespace NotificationService
{
    public class AmqpConsumer : IDisposable
    {
        private readonly IConnection _connection;
        private readonly IModel _channel;
        private readonly string _queueName;

        public AmqpConsumer(
            string hostName,
            string exchangeName,
            string queueName,
            string routingKey)
        {
            _queueName = queueName;
            
            ConnectionFactory factory = new ConnectionFactory { HostName = hostName };
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();
            
            _channel.ExchangeDeclare(
                exchange: exchangeName,
                type: ExchangeType.Topic,
                durable: true,
                autoDelete: false);
                
            _channel.QueueDeclare(
                queue: queueName,
                durable: true,
                exclusive: false,
                autoDelete: false);
                
            _channel.QueueBind(
                queue: queueName,
                exchange: exchangeName,
                routingKey: routingKey);
                
            // Set prefetch count to 1 to ensure fair dispatching
            _channel.BasicQos(
                prefetchSize: 0,
                prefetchCount: 1,
                global: false);
        }

        public void StartConsuming<T>(
            Action<T> processMessage,
            CancellationToken cancellationToken = default) where T : class
        {
            EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
            
            consumer.Received += (model, ea) =>
            {
                try
                {
                    // Extract message body
                    byte[] body = ea.Body.ToArray();
                    string message = Encoding.UTF8.GetString(body);
                    
                    // Deserialize message
                    T messageObject = JsonSerializer.Deserialize<T>(message);
                    
                    Console.WriteLine($"Received message: {ea.RoutingKey}");
                    
                    // Process the message
                    processMessage(messageObject);
                    
                    // Acknowledge message
                    _channel.BasicAck(
                        deliveryTag: ea.DeliveryTag,
                        multiple: false);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Error processing message: {ex.Message}");
                    
                    // Negative acknowledgment - don't requeue if it's a serialization issue
                    bool requeue = !(ex is JsonException);
                    _channel.BasicNack(
                        deliveryTag: ea.DeliveryTag,
                        multiple: false,
                        requeue: requeue);
                }
            };
            
            _channel.BasicConsume(
                queue: _queueName,
                autoAck: false,
                consumer: consumer);
                
            Console.WriteLine($"Started consuming from queue: {_queueName}");
            
            // Keep the consumer running until cancellation is requested
            cancellationToken.WaitHandle.WaitOne();
        }

        public void Dispose()
        {
            _channel?.Dispose();
            _connection?.Dispose();
        }
    }
}

Update Program.cs:

using System;
using System.Threading;
using Shared;

namespace NotificationService
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Notification Service");
            Console.WriteLine("Listening for order notifications...");
            
            CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
            
            using (AmqpConsumer consumer = new AmqpConsumer(
                AmqpSettings.HostName,
                AmqpSettings.NotificationExchange,
                AmqpSettings.NotificationQueue,
                AmqpSettings.OrderProcessedRoutingKey))
            {
                consumer.StartConsuming<OrderProcessedEvent>(notification =>
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    Console.WriteLine("╔═══════════════════════════════════════════╗");
                    Console.WriteLine("║            ORDER NOTIFICATION             ║");
                    Console.WriteLine("╚═══════════════════════════════════════════╝");
                    Console.WriteLine($"Order ID:      {notification.OrderId}");
                    Console.WriteLine($"Customer:      {notification.CustomerName}");
                    Console.WriteLine($"Total Amount:  ${notification.TotalAmount}");
                    Console.WriteLine($"Status:        {notification.Status}");
                    Console.WriteLine($"Processed At:  {notification.ProcessedAt}");
                    Console.WriteLine("═══════════════════════════════════════════════");
                    Console.ResetColor();
                }, cancellationTokenSource.Token);
                
                Console.WriteLine("Press Escape to stop listening");
                
                while (true)
                {
                    if (Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Escape)
                    {
                        cancellationTokenSource.Cancel();
                        break;
                    }
                    
                    Thread.Sleep(100);
                }
            }
        }
    }
}

Step 7: Run the Solution

You need to run all three projects to see the system in action.

  1. First, run the OrderProcessor:

    cd OrderProcessor
    dotnet run
    
  2. Then run the NotificationService:

    cd NotificationService
    dotnet run
    
  3. Finally, run the OrderProducer:

    cd OrderProducer
    dotnet run
    
  4. In the OrderProducer window, follow the prompts to create orders.

  5. Watch as the OrderProcessor processes the orders.

  6. Observe notifications appearing in the NotificationService window.

Understanding the Flow

  1. OrderProducer creates order commands and publishes them to the order exchange with routing key “order.create”
  2. OrderProcessor consumes these commands from the order queue, processes them, and publishes order processed events to the notification exchange
  3. NotificationService consumes the order processed events and displays notifications

This demonstrates the asynchronous, loosely-coupled nature of AMQP communication. Each service operates independently, communicating only through message queues.

Extending the System

You could extend this system with:

  1. Error Handling: Add dead letter queues for failed message processing
  2. Persistence: Save orders and notifications to a database
  3. Retry Logic: Implement retry policies for transient failures
  4. Monitoring: Add health checks and metrics collection
  5. Web Interface: Create a web UI to display orders and notifications

Conclusion

This tutorial demonstrated how to build a distributed system using AMQP with RabbitMQ in .NET. The message-oriented approach provides several advantages:

  1. Loose Coupling: Services communicate through messages, not direct calls
  2. Reliability: Messages are persisted and acknowledged
  3. Scalability: Services can be scaled independently
  4. Flexibility: New consumers can be added without modifying producers

AMQP provides a robust alternative to REST APIs for service-to-service communication, especially when asynchronous processing and reliability are important requirements.