From 4bffeb54b7f32fd3877674f0d6ec335c95aecdc2 Mon Sep 17 00:00:00 2001 From: Pascal Senn Date: Sat, 7 Mar 2026 13:18:42 +0000 Subject: [PATCH 1/3] Adds `ConfigureDefaults` to rabbitmq --- .../Configurations/RabbitMQBusDefaults.cs | 20 + .../RabbitMQDefaultExchangeOptions.cs | 50 ++ .../RabbitMQDefaultQueueOptions.cs | 65 +++ .../RabbitMQTransportConfiguration.cs | 5 + .../IRabbitMQMessagingTransportDescriptor.cs | 7 + .../RabbitMQMessagingTransportDescriptor.cs | 8 + .../RabbitMQMessagingTransport.cs | 3 +- .../Topology/RabbitMQMessagingTopology.cs | 14 +- .../Behaviors/BusDefaultsIntegrationTests.cs | 272 ++++++++++ .../Helpers/RabbitMQFixture.cs | 34 +- .../Topology/RabbitMQBusDefaultsTests.cs | 494 ++++++++++++++++++ .../src/docs/mocha/v1/transports/rabbitmq.md | 72 +++ 12 files changed, 1031 insertions(+), 13 deletions(-) create mode 100644 src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQBusDefaults.cs create mode 100644 src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQDefaultExchangeOptions.cs create mode 100644 src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQDefaultQueueOptions.cs create mode 100644 src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Behaviors/BusDefaultsIntegrationTests.cs create mode 100644 src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Topology/RabbitMQBusDefaultsTests.cs diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQBusDefaults.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQBusDefaults.cs new file mode 100644 index 00000000000..7517c8d848d --- /dev/null +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQBusDefaults.cs @@ -0,0 +1,20 @@ +namespace Mocha.Transport.RabbitMQ; + +/// +/// Defines bus-level defaults that are applied to all auto-provisioned queues and exchanges +/// when they are created by topology conventions. +/// +public sealed class RabbitMQBusDefaults +{ + /// + /// Gets or sets the default queue configuration that is applied to all auto-provisioned queues. + /// Individual queue settings will override these defaults. + /// + public RabbitMQDefaultQueueOptions Queue { get; set; } = new(); + + /// + /// Gets or sets the default exchange configuration that is applied to all auto-provisioned exchanges. + /// Individual exchange settings will override these defaults. + /// + public RabbitMQDefaultExchangeOptions Exchange { get; set; } = new(); +} diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQDefaultExchangeOptions.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQDefaultExchangeOptions.cs new file mode 100644 index 00000000000..f8ca037f2f1 --- /dev/null +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQDefaultExchangeOptions.cs @@ -0,0 +1,50 @@ +namespace Mocha.Transport.RabbitMQ; + +/// +/// Default options for exchanges created by topology conventions. +/// +public sealed class RabbitMQDefaultExchangeOptions +{ + /// + /// Gets or sets the default exchange type. + /// When set, all auto-provisioned exchanges will use this type unless explicitly overridden. + /// + public string? Type { get; set; } + + /// + /// Gets or sets whether exchanges are durable by default. + /// Default is null (uses the RabbitMQ default of true). + /// + public bool? Durable { get; set; } + + /// + /// Gets or sets whether exchanges are auto-deleted by default. + /// Default is null (uses the RabbitMQ default of false). + /// + public bool? AutoDelete { get; set; } + + /// + /// Gets or sets additional default arguments applied to all auto-provisioned exchanges. + /// + public Dictionary Arguments { get; set; } = new(); + + /// + /// Applies these defaults to an exchange configuration, without overriding explicitly set values. + /// + internal void ApplyTo(RabbitMQExchangeConfiguration configuration) + { + configuration.Type ??= Type; + configuration.Durable ??= Durable; + configuration.AutoDelete ??= AutoDelete; + + if (Arguments.Count > 0) + { + configuration.Arguments ??= new Dictionary(); + + foreach (var (key, value) in Arguments) + { + configuration.Arguments.TryAdd(key, value); + } + } + } +} diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQDefaultQueueOptions.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQDefaultQueueOptions.cs new file mode 100644 index 00000000000..5f3d9bdbbdb --- /dev/null +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQDefaultQueueOptions.cs @@ -0,0 +1,65 @@ +namespace Mocha.Transport.RabbitMQ; + +/// +/// Default options for queues created by topology conventions. +/// +public sealed class RabbitMQDefaultQueueOptions +{ + /// + /// Gets or sets the default queue type (Classic, Quorum, or Stream). + /// When set, all auto-provisioned queues will use this queue type unless explicitly overridden. + /// + public string? QueueType { get; set; } + + /// + /// Gets or sets whether queues are durable by default. + /// Default is null (uses the RabbitMQ default of true). + /// + public bool? Durable { get; set; } + + /// + /// Gets or sets whether queues are auto-deleted by default. + /// Default is null (uses the RabbitMQ default of false). + /// + public bool? AutoDelete { get; set; } + + /// + /// Gets or sets additional default arguments applied to all auto-provisioned queues. + /// + public Dictionary Arguments { get; set; } = new(); + + /// + /// Applies these defaults to a queue configuration, without overriding explicitly set values. + /// Quorum and stream queue types are not applied to queues that have auto-delete or exclusive + /// set, since those properties are incompatible with these queue types. Default arguments are + /// also skipped for incompatible queues because they may contain queue-type-specific settings + /// (e.g. x-delivery-limit is only valid for quorum queues). + /// + internal void ApplyTo(RabbitMQQueueConfiguration configuration) + { + configuration.Durable ??= Durable; + configuration.AutoDelete ??= AutoDelete; + + // Quorum and stream queues do not support auto-delete or exclusive properties. + // Skip applying queue type and default arguments for incompatible configurations + // (e.g. reply queues) since default arguments are often queue-type-specific. + var isIncompatibleWithQueueType = + configuration.AutoDelete is true || configuration.Exclusive is true; + + if (isIncompatibleWithQueueType) + { + return; + } + + if (Arguments.Count > 0 || QueueType is not null) + { + configuration.Arguments ??= new Dictionary(); + configuration.Arguments.TryAdd("x-queue-type", QueueType); + + foreach (var (key, value) in Arguments) + { + configuration.Arguments.TryAdd(key, value); + } + } + } +} diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQTransportConfiguration.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQTransportConfiguration.cs index 5c05216f633..c648f7794f6 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQTransportConfiguration.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQTransportConfiguration.cs @@ -50,6 +50,11 @@ public RabbitMQTransportConfiguration() /// Gets or sets the explicitly declared bindings for this transport. /// public List Bindings { get; set; } = []; + + /// + /// Gets or sets the bus-level defaults applied to all auto-provisioned queues and exchanges. + /// + public RabbitMQBusDefaults Defaults { get; set; } = new(); } /// diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/IRabbitMQMessagingTransportDescriptor.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/IRabbitMQMessagingTransportDescriptor.cs index 576d770c119..eb70bd7c0f5 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/IRabbitMQMessagingTransportDescriptor.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/IRabbitMQMessagingTransportDescriptor.cs @@ -27,6 +27,13 @@ public interface IRabbitMQMessagingTransportDescriptor IRabbitMQMessagingTransportDescriptor ConnectionProvider( Func connectionFactory); + /// + /// Configures bus-level defaults that are applied to all auto-provisioned queues and exchanges. + /// + /// A delegate that configures the bus defaults. + /// The descriptor for method chaining. + IRabbitMQMessagingTransportDescriptor ConfigureDefaults(Action configure); + /// /// Gets or creates a receive endpoint descriptor with the specified name. /// diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/RabbitMQMessagingTransportDescriptor.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/RabbitMQMessagingTransportDescriptor.cs index 7bffc3ff899..6fc435bbad4 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/RabbitMQMessagingTransportDescriptor.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/RabbitMQMessagingTransportDescriptor.cs @@ -145,6 +145,14 @@ public IRabbitMQMessagingTransportDescriptor ConnectionProvider( return this; } + /// + public IRabbitMQMessagingTransportDescriptor ConfigureDefaults(Action configure) + { + configure(Configuration.Defaults); + + return this; + } + /// public IRabbitMQReceiveEndpointDescriptor Endpoint(string name) { diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessagingTransport.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessagingTransport.cs index cbedb1703c6..6e2e80967f7 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessagingTransport.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessagingTransport.cs @@ -72,7 +72,8 @@ protected override void OnAfterInitialized(IMessagingSetupContext context) Port = Connection.Port, Path = Connection.VirtualHost }; - _topology = new RabbitMQMessagingTopology(this, builder.Uri); + + _topology = new RabbitMQMessagingTopology(this, builder.Uri, configuration.Defaults); foreach (var exchange in configuration.Exchanges) { diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQMessagingTopology.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQMessagingTopology.cs index deeb8fd279d..307573572b0 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQMessagingTopology.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQMessagingTopology.cs @@ -4,7 +4,10 @@ namespace Mocha.Transport.RabbitMQ; /// Manages the RabbitMQ topology model (exchanges, queues, and bindings) for a transport instance, /// providing thread-safe mutation and lookup of topology resources. /// -public sealed class RabbitMQMessagingTopology(RabbitMQMessagingTransport transport, Uri baseAddress) +public sealed class RabbitMQMessagingTopology( + RabbitMQMessagingTransport transport, + Uri baseAddress, + RabbitMQBusDefaults defaults) : MessagingTopology(transport, baseAddress) { private readonly object _lock = new(); @@ -27,6 +30,11 @@ public sealed class RabbitMQMessagingTopology(RabbitMQMessagingTransport transpo /// public IReadOnlyList Bindings => _bindings; + /// + /// Gets or sets the bus-level defaults applied to all auto-provisioned queues and exchanges. + /// + public RabbitMQBusDefaults Defaults => defaults; + /// /// Adds a new exchange to the topology, initializing it from the given configuration. /// @@ -46,6 +54,7 @@ public RabbitMQExchange AddExchange(RabbitMQExchangeConfiguration configuration) exchange = new RabbitMQExchange(); configuration.Topology = this; + defaults.Exchange.ApplyTo(configuration); exchange.Initialize(configuration); _exchanges.Add(exchange); @@ -75,6 +84,9 @@ public RabbitMQQueue AddQueue(RabbitMQQueueConfiguration configuration) } configuration.Topology = this; + + defaults.Queue.ApplyTo(configuration); + queue = new RabbitMQQueue(); queue.Initialize(configuration); diff --git a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Behaviors/BusDefaultsIntegrationTests.cs b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Behaviors/BusDefaultsIntegrationTests.cs new file mode 100644 index 00000000000..7ebb992c4d9 --- /dev/null +++ b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Behaviors/BusDefaultsIntegrationTests.cs @@ -0,0 +1,272 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.DependencyInjection; +using Mocha.Transport.RabbitMQ.Tests.Helpers; +using Xunit.Abstractions; + +namespace Mocha.Transport.RabbitMQ.Tests.Behaviors; + +[Collection("RabbitMQ")] +public class BusDefaultsIntegrationTests +{ + private static readonly TimeSpan s_timeout = TimeSpan.FromSeconds(30); + private readonly RabbitMQFixture _fixture; + private readonly ITestOutputHelper _output; + + public BusDefaultsIntegrationTests(RabbitMQFixture fixture, ITestOutputHelper output) + { + _fixture = fixture; + _output = output; + } + + [Fact] + public async Task ConfigureDefaults_Should_ProvisionQuorumQueues_When_BusStarts() + { + // arrange + var recorder = new MessageRecorder(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(recorder) + .AddMessageBus() + .AddEventHandler() + .AddRabbitMQ(t => t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Quorum)) + .BuildTestBusAsync(); + + // act — verify the queue was created as quorum + var queues = await ListQueuesAsync(vhost.VhostName); + + // assert — application queues should be quorum type (reply queues are + // auto-delete and correctly remain classic since quorum doesn't support that) + var appQueues = queues.Where(q => !q.Name.StartsWith("response-")).ToList(); + Assert.NotEmpty(appQueues); + foreach (var (name, type) in appQueues) + { + Assert.Equal("quorum", type); + } + } + + [Fact] + public async Task ConfigureDefaults_Should_DeliverMessages_When_QuorumQueuesUsed() + { + // arrange + var recorder = new MessageRecorder(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(recorder) + .AddMessageBus() + .AddEventHandler() + .AddRabbitMQ(t => t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Quorum)) + .BuildTestBusAsync(); + + using var scope = bus.Provider.CreateScope(); + var messageBus = scope.ServiceProvider.GetRequiredService(); + + // act + await messageBus.PublishAsync(new OrderCreated { OrderId = "ORD-QUORUM" }, CancellationToken.None); + + // assert + Assert.True(await recorder.WaitAsync(s_timeout), "Handler did not receive the event within timeout"); + + var message = Assert.Single(recorder.Messages); + var order = Assert.IsType(message); + Assert.Equal("ORD-QUORUM", order.OrderId); + } + + [Fact] + public async Task ConfigureDefaults_Should_ProvisionWithCustomArguments_When_BusStarts() + { + // arrange + var recorder = new MessageRecorder(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(recorder) + .AddMessageBus() + .AddEventHandler() + .AddRabbitMQ(t => + { + t.ConfigureDefaults(d => + { + d.Queue.QueueType = RabbitMQQueueType.Quorum; + d.Queue.Arguments["x-delivery-limit"] = 5; + }); + }) + .BuildTestBusAsync(); + + // act — verify queues are quorum + var queues = await ListQueuesAsync(vhost.VhostName); + + // assert — application queues should be quorum type (reply queues are + // auto-delete and correctly remain classic since quorum doesn't support that) + var appQueues = queues.Where(q => !q.Name.StartsWith("response-")).ToList(); + Assert.NotEmpty(appQueues); + foreach (var (name, type) in appQueues) + { + Assert.Equal("quorum", type); + } + } + + [Fact] + public async Task ConfigureDefaults_Should_DeliverMessages_When_CustomDefaultsApplied() + { + // arrange + var recorder = new MessageRecorder(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(recorder) + .AddMessageBus() + .AddEventHandler() + .AddRabbitMQ(t => + { + t.ConfigureDefaults(d => + { + d.Queue.QueueType = RabbitMQQueueType.Quorum; + d.Queue.Arguments["x-delivery-limit"] = 5; + }); + }) + .BuildTestBusAsync(); + + using var scope = bus.Provider.CreateScope(); + var messageBus = scope.ServiceProvider.GetRequiredService(); + + // act + await messageBus.PublishAsync(new OrderCreated { OrderId = "ORD-DEFAULTS" }, CancellationToken.None); + + // assert + Assert.True(await recorder.WaitAsync(s_timeout), "Handler did not receive the event within timeout"); + + var message = Assert.Single(recorder.Messages); + var order = Assert.IsType(message); + Assert.Equal("ORD-DEFAULTS", order.OrderId); + } + + [Fact] + public async Task ConfigureDefaults_Should_NotOverrideExplicitQueue_When_QueueDeclaredWithType() + { + // arrange + var capture = new OrderCapture(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(capture) + .AddMessageBus() + .AddConsumer() + .AddRabbitMQ(t => + { + t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Quorum); + t.BindHandlersExplicitly(); + + // Explicitly declare a classic queue — should override the quorum default + t.DeclareExchange("order-ex"); + t.DeclareQueue("classic-q").QueueType(RabbitMQQueueType.Classic); + t.DeclareBinding("order-ex", "classic-q"); + + t.Endpoint("classic-ep").Consumer().Queue("classic-q"); + t.DispatchEndpoint("order-dispatch").ToExchange("order-ex").Publish(); + }) + .BuildTestBusAsync(); + + // act — verify queue types + var queues = await ListQueuesAsync(vhost.VhostName); + + // assert — the classic-q should be classic, not quorum + var classicQueue = queues.First(q => q.Name == "classic-q"); + Assert.Equal("classic", classicQueue.Type); + } + + [Fact] + public async Task ConfigureDefaults_Should_NotOverrideExplicitQueue_When_MessagesStillDelivered() + { + // arrange + var capture = new OrderCapture(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(capture) + .AddMessageBus() + .AddConsumer() + .AddRabbitMQ(t => + { + t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Quorum); + t.BindHandlersExplicitly(); + + t.DeclareExchange("order-ex"); + t.DeclareQueue("override-q").QueueType(RabbitMQQueueType.Classic); + t.DeclareBinding("order-ex", "override-q"); + + t.Endpoint("override-ep").Consumer().Queue("override-q"); + t.DispatchEndpoint("order-dispatch").ToExchange("order-ex").Publish(); + }) + .BuildTestBusAsync(); + + using var scope = bus.Provider.CreateScope(); + var messageBus = scope.ServiceProvider.GetRequiredService(); + + // act + await messageBus.PublishAsync(new OrderCreated { OrderId = "ORD-OVERRIDE" }, CancellationToken.None); + + // assert + Assert.True(await capture.WaitAsync(s_timeout), "Consumer did not receive the message within timeout"); + + var message = Assert.Single(capture.Messages); + Assert.Equal("ORD-OVERRIDE", message.OrderId); + } + + private async Task> ListQueuesAsync(string vhostName) + { + var output = await _fixture.InvokeCommandAsync( + ["rabbitmqctl", "list_queues", "name", "type", "-p", vhostName, "--no-table-headers"]); + + Assert.NotNull(output); + _output.WriteLine($"rabbitmqctl output:\n{output}"); + + var result = new List<(string Name, string Type)>(); + var lines = output.Split('\n', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); + + foreach (var line in lines) + { + var parts = line.Split('\t'); + if (parts.Length >= 2 && parts[1] is not "type") + { + result.Add((parts[0], parts[1])); + } + } + + return result; + } + + public sealed class OrderCapture + { + private readonly SemaphoreSlim _semaphore = new(0); + public ConcurrentBag Messages { get; } = []; + + public void Record(IConsumeContext context) + { + Messages.Add(context.Message); + _semaphore.Release(); + } + + public async Task WaitAsync(TimeSpan timeout, int expectedCount = 1) + { + for (var i = 0; i < expectedCount; i++) + { + if (!await _semaphore.WaitAsync(timeout)) + { + return false; + } + } + return true; + } + } + + public sealed class OrderSpyConsumer(OrderCapture capture) : IConsumer + { + public ValueTask ConsumeAsync(IConsumeContext context) + { + capture.Record(context); + return default; + } + } +} diff --git a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Helpers/RabbitMQFixture.cs b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Helpers/RabbitMQFixture.cs index 3adb5fffed8..3a5ec2acafe 100644 --- a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Helpers/RabbitMQFixture.cs +++ b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Helpers/RabbitMQFixture.cs @@ -11,12 +11,7 @@ public class MochaRabbitMQResource : RabbitMQResource { public Task InvokeCommandAsync(string[] command) => Manager.InvokeCommandAsync( - new ContainerExecCreateParameters - { - Cmd = command, - AttachStdout = true, - AttachStderr = true - }); + new ContainerExecCreateParameters { Cmd = command, AttachStdout = true, AttachStderr = true }); } public sealed class RabbitMQFixture : IAsyncLifetime @@ -41,10 +36,17 @@ public async Task CreateVhostAsync( { var vhostName = GenerateVhostName(testName, filePath); await _resource.InvokeCommandAsync(["rabbitmqctl", "add_vhost", vhostName]); - await _resource.InvokeCommandAsync(["rabbitmqctl", "set_permissions", "-p", vhostName, "guest", ".*", ".*", ".*"]); + await _resource.InvokeCommandAsync([ + "rabbitmqctl", "set_permissions", "-p", vhostName, "guest", ".*", ".*", ".*" + ]); return new VhostContext(this, vhostName); } + internal async Task InvokeCommandAsync(string[] command) + { + return await _resource.InvokeCommandAsync(command); + } + internal async Task CloseAllConnectionsAsync(string reason = "test") { await _resource.InvokeCommandAsync(["rabbitmqctl", "close_all_connections", reason]); @@ -62,12 +64,22 @@ private static string GenerateVhostName(string testName, string filePath) } } -public sealed class VhostContext(RabbitMQFixture fixture, string vhostName) : IAsyncDisposable +public sealed class VhostContext : IAsyncDisposable { - public IConnectionFactory ConnectionFactory { get; } = - new ConnectionFactory { Uri = new Uri(fixture.ConnectionString), VirtualHost = vhostName }; + private readonly RabbitMQFixture _fixture; + + public VhostContext(RabbitMQFixture fixture, string vhostName) + { + _fixture = fixture; + VhostName = vhostName; + ConnectionFactory = new ConnectionFactory { Uri = new Uri(fixture.ConnectionString), VirtualHost = vhostName }; + } + + public string VhostName { get; } + + public IConnectionFactory ConnectionFactory { get; } - public async ValueTask DisposeAsync() => await fixture.DeleteVhostAsync(vhostName); + public async ValueTask DisposeAsync() => await _fixture.DeleteVhostAsync(VhostName); } [CollectionDefinition("RabbitMQ")] diff --git a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Topology/RabbitMQBusDefaultsTests.cs b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Topology/RabbitMQBusDefaultsTests.cs new file mode 100644 index 00000000000..3802782a3bc --- /dev/null +++ b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Topology/RabbitMQBusDefaultsTests.cs @@ -0,0 +1,494 @@ +using Microsoft.Extensions.DependencyInjection; +using Mocha.Transport.RabbitMQ.Tests.Helpers; + +namespace Mocha.Transport.RabbitMQ.Tests.Topology; + +public class RabbitMQBusDefaultsTests +{ + [Fact] + public void AddQueue_Should_ApplyDefaultQueueType_When_DefaultsConfigured() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Quorum)); + + // act + var queue = topology.AddQueue(new RabbitMQQueueConfiguration { Name = "test-queue" }); + + // assert + Assert.Equal("quorum", queue.Arguments["x-queue-type"]); + } + + [Fact] + public void AddQueue_Should_NotOverrideQueueType_When_ExplicitlySet() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Quorum)); + + // act + var queue = topology.AddQueue(new RabbitMQQueueConfiguration + { + Name = "test-queue", + Arguments = new Dictionary { ["x-queue-type"] = "stream" } + }); + + // assert + Assert.Equal("stream", queue.Arguments["x-queue-type"]); + } + + [Fact] + public void AddQueue_Should_ApplyDefaultAutoDelete_When_DefaultsConfigured() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Queue.AutoDelete = true)); + + // act + var queue = topology.AddQueue(new RabbitMQQueueConfiguration + { + Name = "test-queue", + AutoDelete = null + }); + + // assert + Assert.True(queue.AutoDelete); + } + + [Fact] + public void AddQueue_Should_NotOverrideAutoDelete_When_ExplicitlySet() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Queue.AutoDelete = true)); + + // act + var queue = topology.AddQueue(new RabbitMQQueueConfiguration + { + Name = "test-queue", + AutoDelete = false + }); + + // assert + Assert.False(queue.AutoDelete); + } + + [Fact] + public void AddQueue_Should_MergeDefaultArguments_When_DefaultsConfigured() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Queue.Arguments["x-delivery-limit"] = 5)); + + // act + var queue = topology.AddQueue(new RabbitMQQueueConfiguration { Name = "test-queue" }); + + // assert + Assert.Equal(5, queue.Arguments["x-delivery-limit"]); + } + + [Fact] + public void AddQueue_Should_NotOverrideArguments_When_ExplicitlySet() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Queue.Arguments["x-delivery-limit"] = 5)); + + // act + var queue = topology.AddQueue(new RabbitMQQueueConfiguration + { + Name = "test-queue", + Arguments = new Dictionary { ["x-delivery-limit"] = 10 } + }); + + // assert + Assert.Equal(10, queue.Arguments["x-delivery-limit"]); + } + + [Fact] + public void AddQueue_Should_MergeMultipleArguments_When_DefaultsAndExplicitCombined() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => + { + d.Queue.Arguments["x-delivery-limit"] = 5; + d.Queue.Arguments["x-max-priority"] = 10; + })); + + // act + var queue = topology.AddQueue(new RabbitMQQueueConfiguration + { + Name = "test-queue", + Arguments = new Dictionary { ["x-delivery-limit"] = 3 } + }); + + // assert — explicit argument wins, default argument is added + Assert.Equal(3, queue.Arguments["x-delivery-limit"]); + Assert.Equal(10, queue.Arguments["x-max-priority"]); + } + + [Fact] + public void AddQueue_Should_ApplyNoDefaults_When_NoDefaultsConfigured() + { + // arrange + var (_, _, topology) = CreateTopology(_ => { }); + + // act + var queue = topology.AddQueue(new RabbitMQQueueConfiguration { Name = "test-queue" }); + + // assert — only standard defaults, no x-queue-type argument + Assert.True(queue.Durable); + Assert.False(queue.AutoDelete); + Assert.DoesNotContain("x-queue-type", queue.Arguments.Keys); + } + + [Fact] + public void AddExchange_Should_ApplyDefaultType_When_DefaultsConfigured() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Exchange.Type = RabbitMQExchangeType.Topic)); + + // act + var exchange = topology.AddExchange(new RabbitMQExchangeConfiguration { Name = "test-exchange" }); + + // assert + Assert.Equal("topic", exchange.Type); + } + + [Fact] + public void AddExchange_Should_NotOverrideType_When_ExplicitlySet() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Exchange.Type = RabbitMQExchangeType.Topic)); + + // act + var exchange = topology.AddExchange(new RabbitMQExchangeConfiguration + { + Name = "test-exchange", + Type = RabbitMQExchangeType.Direct + }); + + // assert + Assert.Equal("direct", exchange.Type); + } + + [Fact] + public void AddExchange_Should_ApplyDefaultDurable_When_DefaultsConfigured() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Exchange.Durable = false)); + + // act + var exchange = topology.AddExchange(new RabbitMQExchangeConfiguration { Name = "test-exchange" }); + + // assert + Assert.False(exchange.Durable); + } + + [Fact] + public void AddExchange_Should_NotOverrideDurable_When_ExplicitlySet() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Exchange.Durable = false)); + + // act + var exchange = topology.AddExchange(new RabbitMQExchangeConfiguration + { + Name = "test-exchange", + Durable = true + }); + + // assert + Assert.True(exchange.Durable); + } + + [Fact] + public void AddExchange_Should_ApplyDefaultAutoDelete_When_DefaultsConfigured() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Exchange.AutoDelete = true)); + + // act + var exchange = topology.AddExchange(new RabbitMQExchangeConfiguration { Name = "test-exchange" }); + + // assert + Assert.True(exchange.AutoDelete); + } + + [Fact] + public void AddExchange_Should_NotOverrideAutoDelete_When_ExplicitlySet() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Exchange.AutoDelete = true)); + + // act + var exchange = topology.AddExchange(new RabbitMQExchangeConfiguration + { + Name = "test-exchange", + AutoDelete = false + }); + + // assert + Assert.False(exchange.AutoDelete); + } + + [Fact] + public void AddExchange_Should_MergeDefaultArguments_When_DefaultsConfigured() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Exchange.Arguments["alternate-exchange"] = "alt-exchange")); + + // act + var exchange = topology.AddExchange(new RabbitMQExchangeConfiguration { Name = "test-exchange" }); + + // assert + Assert.Equal("alt-exchange", exchange.Arguments["alternate-exchange"]); + } + + [Fact] + public void AddExchange_Should_NotOverrideArguments_When_ExplicitlySet() + { + // arrange + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Exchange.Arguments["alternate-exchange"] = "default-alt")); + + // act + var exchange = topology.AddExchange(new RabbitMQExchangeConfiguration + { + Name = "test-exchange", + Arguments = new Dictionary { ["alternate-exchange"] = "explicit-alt" } + }); + + // assert + Assert.Equal("explicit-alt", exchange.Arguments["alternate-exchange"]); + } + + [Fact] + public void ConfigureDefaults_Should_SkipQueueType_When_AutoDeleteIsTrue() + { + // arrange — auto-delete queues (e.g. reply queues) are incompatible with quorum + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Quorum)); + var queue = topology.AddQueue(new RabbitMQQueueConfiguration + { + Name = "reply-queue", + AutoDelete = true + }); + + // assert + Assert.DoesNotContain("x-queue-type", queue.Arguments.Keys); + } + + [Fact] + public void ConfigureDefaults_Should_SkipQueueType_When_ExclusiveIsTrue() + { + // arrange — exclusive queues are incompatible with quorum + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Quorum)); + var queue = topology.AddQueue(new RabbitMQQueueConfiguration + { + Name = "exclusive-queue", + Exclusive = true + }); + + // assert + Assert.DoesNotContain("x-queue-type", queue.Arguments.Keys); + } + + [Fact] + public void ConfigureDefaults_Should_SkipStreamQueueType_When_AutoDeleteIsTrue() + { + // arrange — auto-delete queues are incompatible with stream + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Stream)); + var queue = topology.AddQueue(new RabbitMQQueueConfiguration + { + Name = "reply-queue", + AutoDelete = true + }); + + // assert + Assert.DoesNotContain("x-queue-type", queue.Arguments.Keys); + } + + [Fact] + public void ConfigureDefaults_Should_SkipAllDefaults_When_QueueIsIncompatible() + { + // arrange — both queue type and arguments are skipped for incompatible queues + // since default arguments may be queue-type-specific (e.g. x-delivery-limit) + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => + { + d.Queue.QueueType = RabbitMQQueueType.Quorum; + d.Queue.Arguments["x-delivery-limit"] = 5; + })); + + var queue = topology.AddQueue(new RabbitMQQueueConfiguration + { + Name = "reply-queue", + AutoDelete = true + }); + + // assert — both queue type and arguments are skipped + Assert.DoesNotContain("x-queue-type", queue.Arguments.Keys); + Assert.DoesNotContain("x-delivery-limit", queue.Arguments.Keys); + } + + [Fact] + public void ConfigureDefaults_Should_ApplyToExplicitlyDeclaredQueues() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + { + t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Quorum); + t.DeclareQueue("my-queue"); + }); + + // assert + var queue = topology.Queues.Single(q => q.Name == "my-queue"); + Assert.Equal("quorum", queue.Arguments["x-queue-type"]); + } + + [Fact] + public void ConfigureDefaults_Should_ApplyToExplicitlyDeclaredExchanges() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + { + t.ConfigureDefaults(d => d.Exchange.Type = RabbitMQExchangeType.Topic); + t.DeclareExchange("my-exchange"); + }); + + // assert + var exchange = topology.Exchanges.Single(e => e.Name == "my-exchange"); + Assert.Equal("topic", exchange.Type); + } + + [Fact] + public void ConfigureDefaults_Should_NotOverrideDeclaredQueueSettings() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + { + t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Quorum); + t.DeclareQueue("my-queue").QueueType(RabbitMQQueueType.Stream); + }); + + // assert + var queue = topology.Queues.Single(q => q.Name == "my-queue"); + Assert.Equal("stream", queue.Arguments["x-queue-type"]); + } + + [Fact] + public void ConfigureDefaults_Should_NotOverrideDeclaredExchangeType() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + { + t.ConfigureDefaults(d => d.Exchange.Type = RabbitMQExchangeType.Topic); + t.DeclareExchange("my-exchange").Type(RabbitMQExchangeType.Direct); + }); + + // assert + var exchange = topology.Exchanges.Single(e => e.Name == "my-exchange"); + Assert.Equal("direct", exchange.Type); + } + + [Fact] + public void ConfigureDefaults_Should_AllowMultipleConfigureCalls() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + { + t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Quorum); + t.ConfigureDefaults(d => d.Queue.Arguments["x-delivery-limit"] = 5); + }); + + var queue = topology.AddQueue(new RabbitMQQueueConfiguration { Name = "test-queue" }); + + // assert — both calls are applied + Assert.Equal("quorum", queue.Arguments["x-queue-type"]); + Assert.Equal(5, queue.Arguments["x-delivery-limit"]); + } + + [Fact] + public void ConfigureDefaults_Should_ApplyLastValue_When_SamePropertySetMultipleTimes() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + { + t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Quorum); + t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Stream); + }); + + var queue = topology.AddQueue(new RabbitMQQueueConfiguration { Name = "test-queue" }); + + // assert — last write wins + Assert.Equal("stream", queue.Arguments["x-queue-type"]); + } + + // ────────────────────────────────────────────── + // Defaults apply to multiple resources + // ────────────────────────────────────────────── + + [Fact] + public void ConfigureDefaults_Should_ApplyToAllQueues() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Queue.QueueType = RabbitMQQueueType.Quorum)); + + var queue1 = topology.AddQueue(new RabbitMQQueueConfiguration { Name = "queue-1" }); + var queue2 = topology.AddQueue(new RabbitMQQueueConfiguration { Name = "queue-2" }); + + // assert + Assert.Equal("quorum", queue1.Arguments["x-queue-type"]); + Assert.Equal("quorum", queue2.Arguments["x-queue-type"]); + } + + [Fact] + public void ConfigureDefaults_Should_ApplyToAllExchanges() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + t.ConfigureDefaults(d => d.Exchange.Type = RabbitMQExchangeType.Topic)); + + var exchange1 = topology.AddExchange(new RabbitMQExchangeConfiguration { Name = "exchange-1" }); + var exchange2 = topology.AddExchange(new RabbitMQExchangeConfiguration { Name = "exchange-2" }); + + // assert + Assert.Equal("topic", exchange1.Type); + Assert.Equal("topic", exchange2.Type); + } + + // ────────────────────────────────────────────── + // Helpers + // ────────────────────────────────────────────── + + private static ( + MessagingRuntime Runtime, + RabbitMQMessagingTransport Transport, + RabbitMQMessagingTopology Topology) CreateTopology(Action configure) + { + var services = new ServiceCollection(); + var builder = services.AddMessageBus(); + var runtime = builder + .AddRabbitMQ(t => + { + t.ConnectionProvider(_ => new StubConnectionProvider()); + configure(t); + }) + .BuildRuntime(); + var transport = runtime.Transports.OfType().Single(); + var topology = (RabbitMQMessagingTopology)transport.Topology; + return (runtime, transport, topology); + } +} diff --git a/website/src/docs/mocha/v1/transports/rabbitmq.md b/website/src/docs/mocha/v1/transports/rabbitmq.md index bcace862da4..700ea316318 100644 --- a/website/src/docs/mocha/v1/transports/rabbitmq.md +++ b/website/src/docs/mocha/v1/transports/rabbitmq.md @@ -177,6 +177,78 @@ When you register a request handler with `AddRequestHandler()` for send (fire Send messages go to a dedicated queue. Only one handler processes each message - this is the point-to-point guarantee. +# Configure transport-level defaults + +You can set defaults that apply to all auto-provisioned queues and exchanges. This is useful when you want consistent settings across all resources without configuring each one individually. + +Use `ConfigureDefaults` to set queue and exchange defaults: + +```csharp +builder.Services + .AddMessageBus() + .AddRabbitMQ(transport => + { + transport.ConfigureDefaults(defaults => + { + // All queues will be quorum with a delivery limit of 5 + defaults.Queue.QueueType = RabbitMQQueueType.Quorum; + defaults.Queue.Arguments["x-delivery-limit"] = 5; + + // All exchanges will use topic routing + defaults.Exchange.Type = RabbitMQExchangeType.Topic; + }); + }); +``` + +For example, to enable [quorum queues](https://www.rabbitmq.com/docs/quorum-queues) with a specific initial group size: + +```csharp +builder.Services + .AddMessageBus() + .AddRabbitMQ(transport => + { + transport.ConfigureDefaults(defaults => + { + defaults.Queue.QueueType = RabbitMQQueueType.Quorum; + defaults.Queue.Arguments["x-quorum-initial-group-size"] = 3; + }); + }); +``` + +Or to use [stream queues](https://www.rabbitmq.com/docs/streams) for append-only log semantics: + +```csharp +builder.Services + .AddMessageBus() + .AddRabbitMQ(transport => + { + transport.ConfigureDefaults(defaults => + { + defaults.Queue.QueueType = RabbitMQQueueType.Stream; + }); + }); +``` + +Available queue defaults: + +| Property | Type | Description | +| ------------ | ---------------------------- | ----------------------------------------------------------------- | +| `QueueType` | `string` | Queue type: `RabbitMQQueueType.Classic`, `.Quorum`, or `.Stream` | +| `Durable` | `bool?` | Whether queues survive broker restarts (default: `true`) | +| `AutoDelete` | `bool?` | Whether queues are auto-deleted when unused (default: `false`) | +| `Arguments` | `Dictionary` | Additional arguments (e.g., `x-delivery-limit`, `x-max-priority`) | + +Available exchange defaults: + +| Property | Type | Description | +| ------------ | ---------------------------- | -------------------------------------------------------------------------------- | +| `Type` | `string` | Exchange type: `RabbitMQExchangeType.Fanout`, `.Direct`, `.Topic`, or `.Headers` | +| `Durable` | `bool?` | Whether exchanges survive broker restarts (default: `true`) | +| `AutoDelete` | `bool?` | Whether exchanges are auto-deleted when unused (default: `false`) | +| `Arguments` | `Dictionary` | Additional arguments (e.g., `alternate-exchange`) | + +Defaults never override explicitly configured values. If you declare a queue with a specific queue type, that setting takes precedence over the transport default. You can call `ConfigureDefaults` multiple times — each call accumulates settings on the same defaults object. + # Declare custom topology Mocha auto-provisions topology by default. To declare additional exchanges, queues, or bindings: From f21c87c6dd3ef30ff4c8583cd0e1643e34268b8f Mon Sep 17 00:00:00 2001 From: Pascal Senn Date: Sat, 7 Mar 2026 13:36:02 +0000 Subject: [PATCH 2/3] cleanup --- .../RabbitMQDefaultQueueOptions.cs | 6 ++- .../RabbitMQMessageEnvelopeParser.cs | 19 ++++++- .../RabbitMQMessageEnvelopeParserTests.cs | 54 +++++++++++++++++++ 3 files changed, 76 insertions(+), 3 deletions(-) diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQDefaultQueueOptions.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQDefaultQueueOptions.cs index 5f3d9bdbbdb..cb044fb4e6f 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQDefaultQueueOptions.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQDefaultQueueOptions.cs @@ -54,7 +54,11 @@ internal void ApplyTo(RabbitMQQueueConfiguration configuration) if (Arguments.Count > 0 || QueueType is not null) { configuration.Arguments ??= new Dictionary(); - configuration.Arguments.TryAdd("x-queue-type", QueueType); + + if (QueueType is not null) + { + configuration.Arguments.TryAdd("x-queue-type", QueueType); + } foreach (var (key, value) in Arguments) { diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessageEnvelopeParser.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessageEnvelopeParser.cs index 76c1a7c55cf..abc8a7eac44 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessageEnvelopeParser.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessageEnvelopeParser.cs @@ -38,8 +38,7 @@ public MessageEnvelope Parse(BasicDeliverEventArgs eventArgs) MessageType = props.Type ?? props.Headers?.GetString(RabbitMQMessageHeaders.MessageType), SentAt = sentAt, DeliverBy = ParseExpiration(props.Expiration, sentAt), - // TODO quorum queues can use x-delivery-count instead of redelivered! - DeliveryCount = eventArgs.Redelivered ? 1 : 0, + DeliveryCount = GetDeliveryCount(props.Headers, eventArgs.Redelivered), Headers = BuildHeaders(props.Headers), EnclosedMessageTypes = props.Headers?.GetStringArray(RabbitMQMessageHeaders.EnclosedMessageTypes) ?? [], Body = eventArgs.Body @@ -48,6 +47,22 @@ public MessageEnvelope Parse(BasicDeliverEventArgs eventArgs) return envelope; } + /// + /// Returns the delivery count from the quorum queue x-delivery-count header when + /// available; otherwise falls back to the classic queue Redelivered flag. + /// + private static int GetDeliveryCount(IDictionary? headers, bool redelivered) + { + if (headers is not null + && headers.TryGetValue("x-delivery-count", out var value) + && value is long count) + { + return (int)count; + } + + return redelivered ? 1 : 0; + } + private static DateTimeOffset? ParseExpiration(string? expiration, DateTimeOffset? sentAt) { if (string.IsNullOrEmpty(expiration) || !long.TryParse(expiration, out var ms)) diff --git a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/RabbitMQMessageEnvelopeParserTests.cs b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/RabbitMQMessageEnvelopeParserTests.cs index 0f16f78b122..596de4fbe49 100644 --- a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/RabbitMQMessageEnvelopeParserTests.cs +++ b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/RabbitMQMessageEnvelopeParserTests.cs @@ -199,6 +199,60 @@ public void Parse_Should_SetDeliveryCountZero_When_NotRedelivered() Assert.Equal(0, envelope.DeliveryCount); } + [Fact] + public void Parse_Should_UseXDeliveryCount_When_QuorumQueueHeaderPresent() + { + // arrange — quorum queues set x-delivery-count as a long + var args = CreateDeliverEventArgs( + props => props.Headers = new Dictionary + { + ["x-delivery-count"] = 3L + }, + redelivered: true); + + // act + var envelope = _parser.Parse(args); + + // assert — exact count from header, not the boolean fallback + Assert.Equal(3, envelope.DeliveryCount); + } + + [Fact] + public void Parse_Should_UseXDeliveryCount_When_ValueIsZero() + { + // arrange — first delivery on a quorum queue + var args = CreateDeliverEventArgs( + props => props.Headers = new Dictionary + { + ["x-delivery-count"] = 0L + }, + redelivered: false); + + // act + var envelope = _parser.Parse(args); + + // assert + Assert.Equal(0, envelope.DeliveryCount); + } + + [Fact] + public void Parse_Should_FallbackToRedelivered_When_XDeliveryCountAbsent() + { + // arrange — classic queue, no x-delivery-count header + var args = CreateDeliverEventArgs( + props => props.Headers = new Dictionary + { + ["x-some-other-header"] = "value"u8.ToArray() + }, + redelivered: true); + + // act + var envelope = _parser.Parse(args); + + // assert — falls back to redelivered flag + Assert.Equal(1, envelope.DeliveryCount); + } + [Fact] public void Parse_Should_ExtractEnclosedMessageTypes_When_HeaderPresent() { From 1563ceb34301a0e439d66fdd884415210457ce6e Mon Sep 17 00:00:00 2001 From: Pascal Senn Date: Sun, 8 Mar 2026 15:12:56 +0100 Subject: [PATCH 3/3] cleanup --- .../Mocha.Transport.RabbitMQ/RabbitMQMessageEnvelopeParser.cs | 2 +- .../Topology/Configurations/RabbitMQQueueConfiguration.cs | 2 +- .../Topology/RabbitMQMessagingTopology.cs | 2 +- website/src/docs/mocha/v1/transports/rabbitmq.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessageEnvelopeParser.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessageEnvelopeParser.cs index abc8a7eac44..9429d3b1b7b 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessageEnvelopeParser.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessageEnvelopeParser.cs @@ -57,7 +57,7 @@ private static int GetDeliveryCount(IDictionary? headers, bool && headers.TryGetValue("x-delivery-count", out var value) && value is long count) { - return (int)count; + return count > int.MaxValue ? int.MaxValue : (int)count; } return redelivered ? 1 : 0; diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/Configurations/RabbitMQQueueConfiguration.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/Configurations/RabbitMQQueueConfiguration.cs index a63d2159507..b18b37d6d2f 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/Configurations/RabbitMQQueueConfiguration.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/Configurations/RabbitMQQueueConfiguration.cs @@ -15,7 +15,7 @@ public sealed class RabbitMQQueueConfiguration : TopologyConfiguration - public bool? Durable { get; set; } = true; + public bool? Durable { get; set; } /// /// Gets or sets a value indicating whether the queue can only be accessed by the connection that created it. diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQMessagingTopology.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQMessagingTopology.cs index 307573572b0..685b3d968bf 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQMessagingTopology.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQMessagingTopology.cs @@ -31,7 +31,7 @@ public sealed class RabbitMQMessagingTopology( public IReadOnlyList Bindings => _bindings; /// - /// Gets or sets the bus-level defaults applied to all auto-provisioned queues and exchanges. + /// Gets the bus-level defaults applied to all auto-provisioned queues and exchanges. /// public RabbitMQBusDefaults Defaults => defaults; diff --git a/website/src/docs/mocha/v1/transports/rabbitmq.md b/website/src/docs/mocha/v1/transports/rabbitmq.md index 700ea316318..189364cc2b7 100644 --- a/website/src/docs/mocha/v1/transports/rabbitmq.md +++ b/website/src/docs/mocha/v1/transports/rabbitmq.md @@ -247,7 +247,7 @@ Available exchange defaults: | `AutoDelete` | `bool?` | Whether exchanges are auto-deleted when unused (default: `false`) | | `Arguments` | `Dictionary` | Additional arguments (e.g., `alternate-exchange`) | -Defaults never override explicitly configured values. If you declare a queue with a specific queue type, that setting takes precedence over the transport default. You can call `ConfigureDefaults` multiple times — each call accumulates settings on the same defaults object. +Defaults never override explicitly configured values. If you declare a queue with a specific queue type, that setting takes precedence over the transport default. You can call `ConfigureDefaults` multiple times - each call accumulates settings on the same defaults object. # Declare custom topology