Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public RabbitMQTransportConfiguration()
/// </summary>
public List<RabbitMQBindingConfiguration> Bindings { get; set; } = [];

/// <summary>
/// Gets or sets a value indicating whether topology resources (queues, exchanges, bindings)
/// should be automatically provisioned on the broker. When <c>null</c>, defaults to <c>true</c>.
/// Individual resources can override this setting.
/// </summary>
public bool? AutoProvision { get; set; }

/// <summary>
/// Gets or sets the bus-level defaults applied to all auto-provisioned queues and exchanges.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public void DiscoverTopology(
new RabbitMQQueueConfiguration
{
Name = configuration.QueueName,
AutoDelete = endpoint.Kind == ReceiveEndpointKind.Reply
AutoDelete = endpoint.Kind == ReceiveEndpointKind.Reply,
AutoProvision = configuration.AutoProvision
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ IRabbitMQMessagingTransportDescriptor ConnectionProvider(
/// <returns>A binding descriptor for further configuration.</returns>
IRabbitMQBindingDescriptor DeclareBinding(string exchange, string queue);

/// <summary>
/// Sets whether topology resources should be automatically provisioned on the broker.
/// When disabled, queues, exchanges, and bindings must exist before the transport starts.
/// Individual resources can override this setting via their own <c>AutoProvision</c> method.
/// </summary>
/// <param name="autoProvision">
/// <c>true</c> to enable auto-provisioning (default); <c>false</c> to disable it globally.
/// </param>
/// <returns>The descriptor for method chaining.</returns>
IRabbitMQMessagingTransportDescriptor AutoProvision(bool autoProvision = true);

/// <inheritdoc cref="IMessagingTransportDescriptor.Name" />
new IRabbitMQMessagingTransportDescriptor Name(string name);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ public RabbitMQMessagingTransportDescriptor(IMessagingSetupContext discoveryCont
return this;
}

/// <inheritdoc />
public IRabbitMQMessagingTransportDescriptor AutoProvision(bool autoProvision = true)
{
Configuration.AutoProvision = autoProvision;
return this;
}

/// <inheritdoc />
public IRabbitMQMessagingTransportDescriptor ConnectionProvider(
Func<IServiceProvider, IRabbitMQConnectionProvider> connectionProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,14 @@ private async ValueTask EnsureProvisionedAsync(IChannel channel, CancellationTok
return;
}

if (Queue is not null)
var autoProvision = ((RabbitMQMessagingTopology)transport.Topology).AutoProvision;

if (Queue is not null && (Queue.AutoProvision ?? autoProvision))
{
await Queue.ProvisionAsync(channel, cancellationToken);
}

if (Exchange is not null)
if (Exchange is not null && (Exchange.AutoProvision ?? autoProvision))
{
await Exchange.ProvisionAsync(channel, cancellationToken);
}
Comment thread
PascalSenn marked this conversation as resolved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ protected override void OnAfterInitialized(IMessagingSetupContext context)
Port = Connection.Port,
Path = Connection.VirtualHost
};

_topology = new RabbitMQMessagingTopology(this, builder.Uri, configuration.Defaults);
_topology = new RabbitMQMessagingTopology(
this,
builder.Uri,
configuration.Defaults,
configuration.AutoProvision ?? true);

foreach (var exchange in configuration.Exchanges)
{
Expand Down Expand Up @@ -110,20 +113,30 @@ private RabbitMQDispatcher CreateDispatcher(IMessagingSetupContext context)
async Task ProvisionTopologyAsync(IConnection connection, CancellationToken ct)
{
await using var channel = await connection.CreateChannelAsync(cancellationToken: ct);
var autoProvision = _topology.AutoProvision;

foreach (var queue in _topology.Queues)
foreach (var exchange in _topology.Exchanges)
{
await queue.ProvisionAsync(channel, ct);
if (exchange.AutoProvision ?? autoProvision)
{
await exchange.ProvisionAsync(channel, ct);
}
}

foreach (var exchange in _topology.Exchanges)
foreach (var queue in _topology.Queues)
{
await exchange.ProvisionAsync(channel, ct);
if (queue.AutoProvision ?? autoProvision)
{
await queue.ProvisionAsync(channel, ct);
}
}

foreach (var binding in _topology.Bindings)
{
await binding.ProvisionAsync(channel, ct);
if (binding.AutoProvision ?? autoProvision)
{
await binding.ProvisionAsync(channel, ct);
}
}
}
}
Expand All @@ -137,6 +150,7 @@ public override TransportDescription Describe()

var entities = new List<TopologyEntityDescription>();
var links = new List<TopologyLinkDescription>();
var autoProvision = _topology.AutoProvision;

foreach (var exchange in _topology.Exchanges)
{
Expand All @@ -151,7 +165,7 @@ public override TransportDescription Describe()
["type"] = exchange.Type,
["durable"] = exchange.Durable,
["autoDelete"] = exchange.AutoDelete,
["autoProvision"] = exchange.AutoProvision
["autoProvision"] = exchange.AutoProvision ?? autoProvision
}));
}

Expand All @@ -168,7 +182,7 @@ public override TransportDescription Describe()
["durable"] = queue.Durable,
["exclusive"] = queue.Exclusive,
["autoDelete"] = queue.AutoDelete,
["autoProvision"] = queue.AutoProvision
["autoProvision"] = queue.AutoProvision ?? autoProvision
}));
}

Expand All @@ -189,7 +203,7 @@ public override TransportDescription Describe()
new Dictionary<string, object?>
{
["routingKey"] = string.IsNullOrEmpty(binding.RoutingKey) ? null : binding.RoutingKey,
["autoProvision"] = binding.AutoProvision
["autoProvision"] = binding.AutoProvision ?? autoProvision
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ public abstract class RabbitMQBinding : TopologyResource<RabbitMQBindingConfigur

/// <summary>
/// Gets a value indicating whether this binding is automatically provisioned during topology setup.
/// When <c>null</c>, the transport-level default is used.
/// </summary>
public bool AutoProvision { get; protected set; }
public bool? AutoProvision { get; protected set; }

/// <summary>
/// Gets the routing key pattern used to filter messages passing through this binding.
Expand Down Expand Up @@ -55,7 +56,7 @@ protected override void OnInitialize(RabbitMQBindingConfiguration configuration)
{
RoutingKey = configuration.RoutingKey ?? string.Empty;
Arguments = configuration.Arguments?.ToImmutableDictionary(kv => kv.Key, kv => (object?)kv.Value) ?? ImmutableDictionary<string, object?>.Empty;
AutoProvision = configuration.AutoProvision ?? true;
AutoProvision = configuration.AutoProvision;
}

protected override void OnComplete(RabbitMQBindingConfiguration configuration)
Expand Down Expand Up @@ -96,7 +97,7 @@ protected override void OnInitialize(RabbitMQBindingConfiguration configuration)
{
RoutingKey = configuration.RoutingKey ?? string.Empty;
Arguments = configuration.Arguments?.ToImmutableDictionary(kv => kv.Key, kv => (object?)kv.Value) ?? ImmutableDictionary<string, object?>.Empty;
AutoProvision = configuration.AutoProvision ?? true;
AutoProvision = configuration.AutoProvision;
}

protected override void OnComplete(RabbitMQBindingConfiguration configuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ public sealed class RabbitMQExchange : TopologyResource<RabbitMQExchangeConfigur

/// <summary>
/// Gets a value indicating whether this exchange is automatically provisioned during topology setup.
/// When <c>null</c>, the transport-level default is used.
/// </summary>
public bool AutoProvision { get; private set; }
public bool? AutoProvision { get; private set; }

/// <summary>
/// Gets the exchange type (e.g., "direct", "fanout", "topic", "headers").
Expand Down Expand Up @@ -58,7 +59,7 @@ protected override void OnInitialize(RabbitMQExchangeConfiguration configuration
Type = configuration.Type ?? "fanout";
AutoDelete = configuration.AutoDelete ?? false;
Arguments = configuration.Arguments?.ToImmutableDictionary(kv => kv.Key, kv => (object?)kv.Value) ?? ImmutableDictionary<string, object?>.Empty;
AutoProvision = configuration.AutoProvision ?? true;
AutoProvision = configuration.AutoProvision;
}

protected override void OnComplete(RabbitMQExchangeConfiguration configuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@ namespace Mocha.Transport.RabbitMQ;
public sealed class RabbitMQMessagingTopology(
RabbitMQMessagingTransport transport,
Uri baseAddress,
RabbitMQBusDefaults defaults)
RabbitMQBusDefaults defaults,
bool autoProvision)
: MessagingTopology<RabbitMQMessagingTransport>(transport, baseAddress)
{
private readonly object _lock = new();
private readonly List<RabbitMQExchange> _exchanges = [];
private readonly List<RabbitMQQueue> _queues = [];
private readonly List<RabbitMQBinding> _bindings = [];

/// <summary>
/// Gets a value indicating whether topology resources should be auto-provisioned by default.
/// Individual resources may override this setting via their own <c>AutoProvision</c> property.
/// </summary>
public bool AutoProvision => autoProvision;

/// <summary>
/// Gets the list of exchanges registered in this topology.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ public sealed class RabbitMQQueue : TopologyResource<RabbitMQQueueConfiguration>

/// <summary>
/// Gets a value indicating whether this queue is automatically provisioned during topology setup.
/// When <c>null</c>, the transport-level default is used.
/// </summary>
public bool AutoProvision { get; private set; }
public bool? AutoProvision { get; private set; }

/// <summary>
/// Gets a value indicating whether this queue survives broker restarts.
Expand Down Expand Up @@ -58,7 +59,7 @@ protected override void OnInitialize(RabbitMQQueueConfiguration configuration)
Exclusive = configuration.Exclusive ?? false;
AutoDelete = configuration.AutoDelete ?? false;
Arguments = configuration.Arguments?.ToImmutableDictionary(kv => kv.Key, kv => (object?)kv.Value) ?? ImmutableDictionary<string, object?>.Empty;
AutoProvision = configuration.AutoProvision ?? true;
AutoProvision = configuration.AutoProvision;
}

protected override void OnComplete(RabbitMQQueueConfiguration configuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class ReceiveEndpointConfiguration : MessagingConfiguration
/// <summary>
/// Gets or sets whether the transport should automatically provision infrastructure for this endpoint.
/// </summary>
public bool? AutoProvision { get; set; } = false;
public bool? AutoProvision { get; set; }

/// <summary>
/// Gets or sets the maximum number of messages that can be processed concurrently on this endpoint.
Expand Down
Loading
Loading