From 8b207ecfc50fd5597acc4c1a5a3c9791d07c0180 Mon Sep 17 00:00:00 2001 From: Will Sugarman Date: Wed, 17 Apr 2024 22:56:31 -0700 Subject: [PATCH 1/3] WIP --- .../AzureStorageOrchestrationService.cs | 8 ++++---- .../Messaging/ControlQueue.cs | 7 ++++--- .../Messaging/TaskHubQueue.cs | 13 ++++++++----- .../OrchestrationSessionManager.cs | 2 +- src/DurableTask.AzureStorage/Storage/Queue.cs | 5 +++-- 5 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 951160a70..8763d5175 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1349,7 +1349,7 @@ public async Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkI // Reset the visibility of the message to ensure it doesn't get picked up by anyone else. await session.CurrentMessageBatch.ParallelForEachAsync( this.settings.MaxStorageOperationConcurrency, - message => controlQueue.RenewMessageAsync(message, session)); + async message => message.OriginalQueueMessage = await controlQueue.RenewMessageAsync(message, session)); workItem.LockedUntilUtc = DateTime.UtcNow.Add(this.settings.ControlQueueVisibilityTimeout); } @@ -1380,7 +1380,7 @@ async Task AbandonMessagesAsync(OrchestrationSession session, IList { await messages.ParallelForEachAsync( this.settings.MaxStorageOperationConcurrency, - message => session.ControlQueue.AbandonMessageAsync(message, session)); + message => _ = session.ControlQueue.AbandonMessageAsync(message, session)); // Remove the messages from the current batch. The remaining messages // may still be able to be processed @@ -1531,7 +1531,7 @@ public async Task RenewTaskActivityWorkItemLockAsync(TaskA session.StartNewLogicalTraceScope(); // Reset the visibility of the message to ensure it doesn't get picked up by anyone else. - await this.workItemQueue.RenewMessageAsync(session.MessageData, session); + session.MessageData.OriginalQueueMessage = await this.workItemQueue.RenewMessageAsync(session.MessageData, session); workItem.LockedUntilUtc = DateTime.UtcNow.Add(this.settings.WorkItemQueueVisibilityTimeout); return workItem; @@ -1559,7 +1559,7 @@ public async Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem session.StartNewLogicalTraceScope(); - await this.workItemQueue.AbandonMessageAsync(session.MessageData, session); + _ = await this.workItemQueue.AbandonMessageAsync(session.MessageData, session); if (this.activeActivitySessions.TryRemove(workItem.Id, out _)) { diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 75f289b3b..466ade527 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -126,7 +126,8 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage) e.ToString()); // Abandon the message so we can try it again later. - await this.AbandonMessageAsync(queueMessage); + // Note: We will fetch the message again before retrying, so no need to record the result + _ = await this.AbandonMessageAsync(queueMessage); return; } @@ -191,7 +192,7 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage) } // This overload is intended for cases where we aren't able to deserialize an instance of MessageData. - public Task AbandonMessageAsync(QueueMessage queueMessage) + public Task AbandonMessageAsync(QueueMessage queueMessage) { this.stats.PendingOrchestratorMessages.TryRemove(queueMessage.MessageId, out _); return base.AbandonMessageAsync( @@ -202,7 +203,7 @@ public Task AbandonMessageAsync(QueueMessage queueMessage) sequenceNumber: -1); } - public override Task AbandonMessageAsync(MessageData message, SessionBase? session = null) + public override Task AbandonMessageAsync(MessageData message, SessionBase? session = null) { this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _); return base.AbandonMessageAsync(message, session); diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index 3188dcec3..65191ad31 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -212,7 +212,7 @@ await this.storageQueue.AddMessageAsync( return initialVisibilityDelay; } - public virtual Task AbandonMessageAsync(MessageData message, SessionBase? session = null) + public virtual Task AbandonMessageAsync(MessageData message, SessionBase? session = null) { QueueMessage queueMessage = message.OriginalQueueMessage; TaskMessage taskMessage = message.TaskMessage; @@ -227,7 +227,7 @@ public virtual Task AbandonMessageAsync(MessageData message, SessionBase? sessio sequenceNumber); } - protected async Task AbandonMessageAsync( + protected async Task AbandonMessageAsync( QueueMessage queueMessage, TaskMessage? taskMessage, OrchestrationInstance? instance, @@ -276,7 +276,7 @@ protected async Task AbandonMessageAsync( { // We "abandon" the message by settings its visibility timeout using an exponential backoff algorithm. // This allows it to be reprocessed on this node or another node at a later time, hopefully successfully. - await this.storageQueue.UpdateMessageAsync( + return await this.storageQueue.UpdateMessageAsync( queueMessage, TimeSpan.FromSeconds(numSecondsToWait), traceActivityId); @@ -293,10 +293,12 @@ await this.storageQueue.UpdateMessageAsync( taskEventId, details: $"Caller: {nameof(AbandonMessageAsync)}", queueMessage.PopReceipt); + + return queueMessage; } } - public async Task RenewMessageAsync(MessageData message, SessionBase session) + public async Task RenewMessageAsync(MessageData message, SessionBase session) { QueueMessage queueMessage = message.OriginalQueueMessage; TaskMessage taskMessage = message.TaskMessage; @@ -316,7 +318,7 @@ public async Task RenewMessageAsync(MessageData message, SessionBase session) try { - await this.storageQueue.UpdateMessageAsync( + return await this.storageQueue.UpdateMessageAsync( queueMessage, this.MessageVisibilityTimeout, session?.TraceActivityId); @@ -325,6 +327,7 @@ await this.storageQueue.UpdateMessageAsync( { // Message may have been processed and deleted already. this.HandleMessagingExceptions(e, message, $"Caller: {nameof(RenewMessageAsync)}"); + return queueMessage; } } diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index acd73078a..1516ee79d 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -314,7 +314,7 @@ async Task> DedupeExecutionStartedMessagesAsync( filteredMessages = filteredMessages.Except(messagesToDefer); // Defer messages on a background thread to avoid blocking the dequeue loop - _ = Task.Run(() => messagesToDefer.ParallelForEachAsync(msg => controlQueue.AbandonMessageAsync(msg, session: null))); + _ = Task.Run(() => messagesToDefer.ParallelForEachAsync(msg => _ = controlQueue.AbandonMessageAsync(msg, session: null))); } if (messagesToDiscard?.Count > 0) diff --git a/src/DurableTask.AzureStorage/Storage/Queue.cs b/src/DurableTask.AzureStorage/Storage/Queue.cs index 46b4115d8..7a7f8a4c9 100644 --- a/src/DurableTask.AzureStorage/Storage/Queue.cs +++ b/src/DurableTask.AzureStorage/Storage/Queue.cs @@ -59,10 +59,10 @@ await this.queueClient this.stats.MessagesSent.Increment(); } - public async Task UpdateMessageAsync(QueueMessage queueMessage, TimeSpan visibilityTimeout, Guid? clientRequestId = null, CancellationToken cancellationToken = default) + public async Task UpdateMessageAsync(QueueMessage queueMessage, TimeSpan visibilityTimeout, Guid? clientRequestId = null, CancellationToken cancellationToken = default) { using IDisposable scope = OperationContext.CreateClientRequestScope(clientRequestId); - await this.queueClient + UpdateReceipt receipt = await this.queueClient .UpdateMessageAsync( queueMessage.MessageId, queueMessage.PopReceipt, @@ -71,6 +71,7 @@ await this.queueClient .DecorateFailure(); this.stats.MessagesUpdated.Increment(); + return queueMessage.Update(receipt); } public async Task DeleteMessageAsync(QueueMessage queueMessage, Guid? clientRequestId = null, CancellationToken cancellationToken = default) From 499d5d31a0afea6eb664d4b8785437bdffb82bc4 Mon Sep 17 00:00:00 2001 From: Will Sugarman Date: Wed, 15 May 2024 22:01:33 -0700 Subject: [PATCH 2/3] Refactor --- .../AzureStorageOrchestrationService.cs | 6 ++--- src/DurableTask.AzureStorage/MessageData.cs | 5 ++++ .../Messaging/ControlQueue.cs | 6 ++--- .../Messaging/TaskHubQueue.cs | 23 +++++++++++++------ .../OrchestrationSessionManager.cs | 2 +- src/DurableTask.AzureStorage/Storage/Queue.cs | 4 ++-- 6 files changed, 30 insertions(+), 16 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 8763d5175..5065f151a 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1349,7 +1349,7 @@ public async Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkI // Reset the visibility of the message to ensure it doesn't get picked up by anyone else. await session.CurrentMessageBatch.ParallelForEachAsync( this.settings.MaxStorageOperationConcurrency, - async message => message.OriginalQueueMessage = await controlQueue.RenewMessageAsync(message, session)); + message => controlQueue.RenewMessageAsync(message, session)); workItem.LockedUntilUtc = DateTime.UtcNow.Add(this.settings.ControlQueueVisibilityTimeout); } @@ -1531,7 +1531,7 @@ public async Task RenewTaskActivityWorkItemLockAsync(TaskA session.StartNewLogicalTraceScope(); // Reset the visibility of the message to ensure it doesn't get picked up by anyone else. - session.MessageData.OriginalQueueMessage = await this.workItemQueue.RenewMessageAsync(session.MessageData, session); + await this.workItemQueue.RenewMessageAsync(session.MessageData, session); workItem.LockedUntilUtc = DateTime.UtcNow.Add(this.settings.WorkItemQueueVisibilityTimeout); return workItem; @@ -1559,7 +1559,7 @@ public async Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem session.StartNewLogicalTraceScope(); - _ = await this.workItemQueue.AbandonMessageAsync(session.MessageData, session); + await this.workItemQueue.AbandonMessageAsync(session.MessageData, session); if (this.activeActivitySessions.TryRemove(workItem.Id, out _)) { diff --git a/src/DurableTask.AzureStorage/MessageData.cs b/src/DurableTask.AzureStorage/MessageData.cs index bf39fbc72..d89e7c686 100644 --- a/src/DurableTask.AzureStorage/MessageData.cs +++ b/src/DurableTask.AzureStorage/MessageData.cs @@ -106,6 +106,11 @@ public MessageData() internal long TotalMessageSizeBytes { get; set; } internal MessageFormatFlags MessageFormat { get; set; } + + internal void Update(UpdateReceipt receipt) + { + this.OriginalQueueMessage = this.OriginalQueueMessage.Update(receipt); + } } /// diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 466ade527..9f1c0d2ba 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -126,7 +126,7 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage) e.ToString()); // Abandon the message so we can try it again later. - // Note: We will fetch the message again before retrying, so no need to record the result + // Note: We will fetch the message again from the queue before retrying, so no need to read the receipt _ = await this.AbandonMessageAsync(queueMessage); return; } @@ -192,7 +192,7 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage) } // This overload is intended for cases where we aren't able to deserialize an instance of MessageData. - public Task AbandonMessageAsync(QueueMessage queueMessage) + public Task AbandonMessageAsync(QueueMessage queueMessage) { this.stats.PendingOrchestratorMessages.TryRemove(queueMessage.MessageId, out _); return base.AbandonMessageAsync( @@ -203,7 +203,7 @@ public Task AbandonMessageAsync(QueueMessage queueMessage) sequenceNumber: -1); } - public override Task AbandonMessageAsync(MessageData message, SessionBase? session = null) + public override Task AbandonMessageAsync(MessageData message, SessionBase? session = null) { this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _); return base.AbandonMessageAsync(message, session); diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index 65191ad31..d204268bf 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -212,22 +212,29 @@ await this.storageQueue.AddMessageAsync( return initialVisibilityDelay; } - public virtual Task AbandonMessageAsync(MessageData message, SessionBase? session = null) + public virtual async Task AbandonMessageAsync(MessageData message, SessionBase? session = null) { QueueMessage queueMessage = message.OriginalQueueMessage; TaskMessage taskMessage = message.TaskMessage; OrchestrationInstance instance = taskMessage.OrchestrationInstance; long sequenceNumber = message.SequenceNumber; - return this.AbandonMessageAsync( + UpdateReceipt? receipt = await this.AbandonMessageAsync( queueMessage, taskMessage, instance, session?.TraceActivityId, sequenceNumber); + + // If we've successfully abandoned the message, update the pop receipt + // (even though we'll likely no longer interact with this message) + if (receipt is not null) + { + message.Update(receipt); + } } - protected async Task AbandonMessageAsync( + protected async Task AbandonMessageAsync( QueueMessage queueMessage, TaskMessage? taskMessage, OrchestrationInstance? instance, @@ -294,11 +301,11 @@ protected async Task AbandonMessageAsync( details: $"Caller: {nameof(AbandonMessageAsync)}", queueMessage.PopReceipt); - return queueMessage; + return null; } } - public async Task RenewMessageAsync(MessageData message, SessionBase session) + public async Task RenewMessageAsync(MessageData message, SessionBase session) { QueueMessage queueMessage = message.OriginalQueueMessage; TaskMessage taskMessage = message.TaskMessage; @@ -318,16 +325,18 @@ public async Task RenewMessageAsync(MessageData message, SessionBa try { - return await this.storageQueue.UpdateMessageAsync( + UpdateReceipt receipt = await this.storageQueue.UpdateMessageAsync( queueMessage, this.MessageVisibilityTimeout, session?.TraceActivityId); + + // Update the pop receipt + message.Update(receipt); } catch (Exception e) { // Message may have been processed and deleted already. this.HandleMessagingExceptions(e, message, $"Caller: {nameof(RenewMessageAsync)}"); - return queueMessage; } } diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index 1516ee79d..acd73078a 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -314,7 +314,7 @@ async Task> DedupeExecutionStartedMessagesAsync( filteredMessages = filteredMessages.Except(messagesToDefer); // Defer messages on a background thread to avoid blocking the dequeue loop - _ = Task.Run(() => messagesToDefer.ParallelForEachAsync(msg => _ = controlQueue.AbandonMessageAsync(msg, session: null))); + _ = Task.Run(() => messagesToDefer.ParallelForEachAsync(msg => controlQueue.AbandonMessageAsync(msg, session: null))); } if (messagesToDiscard?.Count > 0) diff --git a/src/DurableTask.AzureStorage/Storage/Queue.cs b/src/DurableTask.AzureStorage/Storage/Queue.cs index 7a7f8a4c9..eca1f1c28 100644 --- a/src/DurableTask.AzureStorage/Storage/Queue.cs +++ b/src/DurableTask.AzureStorage/Storage/Queue.cs @@ -59,7 +59,7 @@ await this.queueClient this.stats.MessagesSent.Increment(); } - public async Task UpdateMessageAsync(QueueMessage queueMessage, TimeSpan visibilityTimeout, Guid? clientRequestId = null, CancellationToken cancellationToken = default) + public async Task UpdateMessageAsync(QueueMessage queueMessage, TimeSpan visibilityTimeout, Guid? clientRequestId = null, CancellationToken cancellationToken = default) { using IDisposable scope = OperationContext.CreateClientRequestScope(clientRequestId); UpdateReceipt receipt = await this.queueClient @@ -71,7 +71,7 @@ public async Task UpdateMessageAsync(QueueMessage queueMessage, Ti .DecorateFailure(); this.stats.MessagesUpdated.Increment(); - return queueMessage.Update(receipt); + return receipt; } public async Task DeleteMessageAsync(QueueMessage queueMessage, Guid? clientRequestId = null, CancellationToken cancellationToken = default) From 3170960f0789fadab507cbf586683c014611738c Mon Sep 17 00:00:00 2001 From: Will Sugarman Date: Wed, 15 May 2024 22:05:08 -0700 Subject: [PATCH 3/3] nit --- .../AzureStorageOrchestrationService.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 5065f151a..951160a70 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1380,7 +1380,7 @@ async Task AbandonMessagesAsync(OrchestrationSession session, IList { await messages.ParallelForEachAsync( this.settings.MaxStorageOperationConcurrency, - message => _ = session.ControlQueue.AbandonMessageAsync(message, session)); + message => session.ControlQueue.AbandonMessageAsync(message, session)); // Remove the messages from the current batch. The remaining messages // may still be able to be processed