Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 66 additions & 53 deletions src/HotChocolate/Core/src/Types/Fetching/AsyncAutoResetEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,90 @@ namespace HotChocolate.Fetching;

internal sealed class AsyncAutoResetEvent : INotifyCompletion
{
#if NET9_0_OR_GREATER
private readonly Lock _sync = new();
#else
private readonly object _sync = new();
#endif
private const int Idle = 0;
private const int Signaled = 1;
private const int Waiting = 2;

private int _state;
private Action? _continuation;
private bool _isSignaled;

public bool IsSignaled => Volatile.Read(ref _isSignaled);
public bool IsSignaled => Volatile.Read(ref _state) == Signaled;

public bool IsCompleted => false;
public bool IsCompleted => Volatile.Read(ref _state) == Signaled;

public void GetResult() { }
public void GetResult()
{
// Consume the signal when completing synchronously (via IsCompleted == true).
// CAS failure is benign (e.g. TryResetToIdle cleared it first).
Interlocked.CompareExchange(ref _state, Idle, Signaled);
}

public void OnCompleted(Action continuation)
{
bool wasSignaled;
Debug.Assert(_continuation is null, "There should only be one awaiter.");
_continuation = continuation;

lock (_sync)
while (true)
{
wasSignaled = _isSignaled;

if (wasSignaled)
{
// consume the signal
_isSignaled = false;
}
else
switch (Volatile.Read(ref _state))
{
Debug.Assert(_continuation is null, "There should only be one awaiter.");
_continuation = continuation;
}
}
case Idle:
// Register waiter: IDLE -> WAITING
if (Interlocked.CompareExchange(ref _state, Waiting, Idle) == Idle)
{
return;
}
break; // CAS failed, retry

if (wasSignaled)
{
ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true);
case Signaled:
// Consume signal immediately: SIGNALED -> IDLE
if (Interlocked.CompareExchange(ref _state, Idle, Signaled) == Signaled)
{
_continuation = null;
ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true);
return;
}
break; // CAS failed, retry

default:
Debug.Fail("OnCompleted called while already waiting.");
return;
}
}
}

public void Set()
{
Action? continuation = null;

lock (_sync)
while (true)
{
if (_continuation is not null)
switch (Volatile.Read(ref _state))
{
// someone is waiting - release them immediately
// we don't set _isSignaled since we're consuming it immediately
continuation = _continuation;
_continuation = null;
}
else
{
// since no one waiting we are storing the signal for the next awaiter
_isSignaled = true;
}
}
case Idle:
// Store signal: IDLE -> SIGNALED
if (Interlocked.CompareExchange(ref _state, Signaled, Idle) == Idle)
{
return;
}
break; // CAS failed, retry

if (continuation is not null)
{
ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true);
case Waiting:
// Wake waiter: WAITING -> IDLE
if (Interlocked.CompareExchange(ref _state, Idle, Waiting) == Waiting)
{
var c = _continuation!;
_continuation = null;
ThreadPool.QueueUserWorkItem(static c => c(), c, preferLocal: true);
return;
}
break; // CAS failed, retry

case Signaled:
// Already signaled, nothing to do
return;

default:
return;
}
}
}

Expand All @@ -77,15 +98,7 @@ public void Set()
/// </summary>
public bool TryResetToIdle()
{
lock (_sync)
{
if (_continuation is null && _isSignaled)
{
_isSignaled = false;
return true;
}
return false;
}
return Interlocked.CompareExchange(ref _state, Idle, Signaled) == Signaled;
}

public AsyncAutoResetEvent GetAwaiter() => this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,90 @@ namespace HotChocolate.Fusion.Execution;

internal sealed class AsyncAutoResetEvent : INotifyCompletion
{
#if NET9_0_OR_GREATER
private readonly Lock _sync = new();
#else
private readonly object _sync = new();
#endif
private const int Idle = 0;
private const int Signaled = 1;
private const int Waiting = 2;

private int _state;
private Action? _continuation;
private bool _isSignaled;

public bool IsSignaled => Volatile.Read(ref _isSignaled);
public bool IsSignaled => Volatile.Read(ref _state) == Signaled;

public bool IsCompleted => false;
public bool IsCompleted => Volatile.Read(ref _state) == Signaled;

public void GetResult() { }
public void GetResult()
{
// Consume the signal when completing synchronously (via IsCompleted == true).
// CAS failure is benign (e.g. TryResetToIdle cleared it first).
Interlocked.CompareExchange(ref _state, Idle, Signaled);
}

public void OnCompleted(Action continuation)
{
bool wasSignaled;
Debug.Assert(_continuation is null, "There should only be one awaiter.");
_continuation = continuation;

lock (_sync)
while (true)
{
wasSignaled = _isSignaled;

if (wasSignaled)
{
// consume the signal
_isSignaled = false;
}
else
switch (Volatile.Read(ref _state))
{
Debug.Assert(_continuation is null, "There should only be one awaiter.");
_continuation = continuation;
}
}
case Idle:
// Register waiter: IDLE -> WAITING
if (Interlocked.CompareExchange(ref _state, Waiting, Idle) == Idle)
{
return;
}
break; // CAS failed, retry

if (wasSignaled)
{
ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true);
case Signaled:
// Consume signal immediately: SIGNALED -> IDLE
if (Interlocked.CompareExchange(ref _state, Idle, Signaled) == Signaled)
{
_continuation = null;
ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true);
return;
}
break; // CAS failed, retry

default:
Debug.Fail("OnCompleted called while already waiting.");
return;
}
}
}

public void Set()
{
Action? continuation = null;

lock (_sync)
while (true)
{
if (_continuation is not null)
switch (Volatile.Read(ref _state))
{
// someone is waiting - release them immediately
// we don't set _isSignaled since we're consuming it immediately
continuation = _continuation;
_continuation = null;
}
else
{
// since no one waiting we are storing the signal for the next awaiter
_isSignaled = true;
}
}
case Idle:
// Store signal: IDLE -> SIGNALED
if (Interlocked.CompareExchange(ref _state, Signaled, Idle) == Idle)
{
return;
}
break; // CAS failed, retry

if (continuation is not null)
{
ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true);
case Waiting:
// Wake waiter: WAITING -> IDLE
if (Interlocked.CompareExchange(ref _state, Idle, Waiting) == Waiting)
{
var c = _continuation!;
_continuation = null;
ThreadPool.QueueUserWorkItem(static c => c(), c, preferLocal: true);
return;
}
break; // CAS failed, retry

case Signaled:
// Already signaled, nothing to do
return;

default:
return;
}
}
}

Expand All @@ -77,15 +98,7 @@ public void Set()
/// </summary>
public bool TryResetToIdle()
{
lock (_sync)
{
if (_continuation is null && _isSignaled)
{
_isSignaled = false;
return true;
}
return false;
}
return Interlocked.CompareExchange(ref _state, Idle, Signaled) == Signaled;
}

public AsyncAutoResetEvent GetAwaiter() => this;
Expand Down
Loading