From 842db999dde6a3bff95a8348ac17212c921a7ead Mon Sep 17 00:00:00 2001 From: Michael Staib Date: Fri, 27 Mar 2026 01:04:45 +0000 Subject: [PATCH 1/7] Add HotChocolate server in-memory provider to Fusion --- .../Fusion/HotChocolate.Fusion.slnx | 1 + ...hocolate.Fusion.Connectors.InMemory.csproj | 20 ++ .../InMemorySourceSchemaClient.cs | 225 ++++++++++++++++++ ...InMemorySourceSchemaClientConfiguration.cs | 29 +++ .../HotChocolate.Fusion.Execution.csproj | 1 + .../Utilities.Buffers/ChunkedArrayWriter.cs | 75 +++++- .../HotChocolate.Utilities.Buffers.csproj | 1 + 7 files changed, 347 insertions(+), 5 deletions(-) create mode 100644 src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/HotChocolate.Fusion.Connectors.InMemory.csproj create mode 100644 src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClient.cs create mode 100644 src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClientConfiguration.cs diff --git a/src/HotChocolate/Fusion/HotChocolate.Fusion.slnx b/src/HotChocolate/Fusion/HotChocolate.Fusion.slnx index 077a6e20e93..8c435408a16 100644 --- a/src/HotChocolate/Fusion/HotChocolate.Fusion.slnx +++ b/src/HotChocolate/Fusion/HotChocolate.Fusion.slnx @@ -12,6 +12,7 @@ + diff --git a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/HotChocolate.Fusion.Connectors.InMemory.csproj b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/HotChocolate.Fusion.Connectors.InMemory.csproj new file mode 100644 index 00000000000..491791d9b3a --- /dev/null +++ b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/HotChocolate.Fusion.Connectors.InMemory.csproj @@ -0,0 +1,20 @@ + + + + HotChocolate.Fusion.Connectors.InMemory + HotChocolate.Fusion + preview + $(DefineConstants);FUSION + + + + + + + + + + + + + diff --git a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClient.cs b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClient.cs new file mode 100644 index 00000000000..8800eca54a3 --- /dev/null +++ b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClient.cs @@ -0,0 +1,225 @@ +using System.Collections.Immutable; +using System.Runtime.CompilerServices; +using System.Text.Json; +using HotChocolate.Buffers; +using HotChocolate.Execution; +using HotChocolate.Fusion.Text.Json; +using HotChocolate.Language; +using HotChocolate.Transport.Formatters; + +namespace HotChocolate.Fusion.Execution.Clients; + +/// +/// An implementation that executes GraphQL operations +/// directly in-process via , bypassing any network transport. +/// +public sealed class InMemorySourceSchemaClient : ISourceSchemaClient +{ + private static readonly Uri s_uri = new("inmemory://localhost"); + + private readonly RequestExecutorProxy _executor; + private readonly JsonResultFormatter _formatter; + private bool _disposed; + + /// + /// Initializes a new instance of . + /// + /// + /// The request executor proxy for the source schema. + /// + /// + /// The JSON result formatter used to serialize execution results. + /// + public InMemorySourceSchemaClient( + RequestExecutorProxy executor, + JsonResultFormatter formatter) + { + ArgumentNullException.ThrowIfNull(executor); + ArgumentNullException.ThrowIfNull(formatter); + + _executor = executor; + _formatter = formatter; + } + + /// + public SourceSchemaClientCapabilities Capabilities + => default; + + /// + public async ValueTask ExecuteAsync( + OperationPlanContext context, + SourceSchemaClientRequest request, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(context); + ObjectDisposedException.ThrowIf(_disposed, this); + + if (request.Variables.Length > 1) + { + throw new InvalidOperationException( + "The in-memory source schema client does not support variable batching."); + } + + var operationRequest = BuildOperationRequest(request); + + try + { + var result = await _executor + .ExecuteAsync(operationRequest, cancellationToken) + .ConfigureAwait(false); + + return new Response(result, request, _formatter); + } + catch + { + operationRequest.Dispose(); + throw; + } + } + + /// + public async IAsyncEnumerable ExecuteBatchStreamAsync( + OperationPlanContext context, + ImmutableArray requests, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(context); + ObjectDisposedException.ThrowIf(_disposed, this); + + for (var i = 0; i < requests.Length; i++) + { + var response = await ExecuteAsync(context, requests[i], cancellationToken) + .ConfigureAwait(false); + + try + { + await foreach (var sourceResult in response + .ReadAsResultStreamAsync(cancellationToken) + .ConfigureAwait(false)) + { + yield return new BatchStreamResult(i, sourceResult); + } + } + finally + { + response.Dispose(); + } + } + } + + /// + public ValueTask DisposeAsync() + { + if (!_disposed) + { + _executor.Dispose(); + _disposed = true; + } + + return ValueTask.CompletedTask; + } + + private static OperationRequest BuildOperationRequest(SourceSchemaClientRequest request) + { + JsonDocument? variables = null; + + if (request.Variables.Length == 1 && !request.Variables[0].IsEmpty) + { + var sequence = request.Variables[0].Values.AsSequence(); + variables = JsonDocument.Parse(sequence); + } + + return OperationRequest.FromSourceText( + request.OperationSourceText, + variableValues: variables); + } + + private static SourceResultDocument SerializeToDocument( + OperationResult operationResult, + JsonResultFormatter formatter) + { + var writer = new ChunkedArrayWriter(JsonMemoryKind.Json); + + try + { + formatter.Format(operationResult, writer); + var (chunks, usedChunks, lastLength) = writer.DrainChunks(); + return SourceResultDocument.Parse(chunks, lastLength, usedChunks, pooledMemory: true); + } + catch + { + writer.Dispose(); + throw; + } + } + + private sealed class Response : SourceSchemaClientResponse + { + private readonly IExecutionResult _result; + private readonly SourceSchemaClientRequest _request; + private readonly JsonResultFormatter _formatter; + + public Response( + IExecutionResult result, + SourceSchemaClientRequest request, + JsonResultFormatter formatter) + { + _result = result; + _request = request; + _formatter = formatter; + } + + public override Uri Uri => s_uri; + + public override string ContentType => "application/json"; + + public override bool IsSuccessful => true; + + public override async IAsyncEnumerable ReadAsResultStreamAsync( + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + switch (_result) + { + case OperationResult operationResult: + { + var path = _request.Variables.Length == 1 + ? _request.Variables[0].Path + : CompactPath.Root; + + var additionalPaths = _request.Variables.Length == 1 + ? _request.Variables[0].AdditionalPaths + : default; + + var document = SerializeToDocument(operationResult, _formatter); + + yield return additionalPaths.IsDefaultOrEmpty + ? new SourceSchemaResult(path, document) + : new SourceSchemaResult(path, document, additionalPaths: additionalPaths); + break; + } + + case IResponseStream responseStream: + { + await foreach (var operationResult in responseStream + .ReadResultsAsync() + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + var document = SerializeToDocument(operationResult, _formatter); + yield return new SourceSchemaResult(CompactPath.Root, document); + } + break; + } + + default: + throw new InvalidOperationException( + $"Unexpected execution result type: {_result.GetType().Name}."); + } + } + + public override void Dispose() + { + _result.DisposeAsync().AsTask().GetAwaiter().GetResult(); + } + } +} diff --git a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClientConfiguration.cs b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClientConfiguration.cs new file mode 100644 index 00000000000..05f1d1bab6e --- /dev/null +++ b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClientConfiguration.cs @@ -0,0 +1,29 @@ +namespace HotChocolate.Fusion.Execution.Clients; + +/// +/// Configuration for an in-memory source schema client that executes +/// GraphQL operations directly in-process. +/// +public sealed class InMemorySourceSchemaClientConfiguration : ISourceSchemaClientConfiguration +{ + /// + /// Initializes a new instance of . + /// + /// The name of the source schema. + /// The supported operation types. + public InMemorySourceSchemaClientConfiguration( + string name, + SupportedOperationType supportedOperations = SupportedOperationType.All) + { + ArgumentException.ThrowIfNullOrEmpty(name); + + Name = name; + SupportedOperations = supportedOperations; + } + + /// + public string Name { get; } + + /// + public SupportedOperationType SupportedOperations { get; } +} diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/HotChocolate.Fusion.Execution.csproj b/src/HotChocolate/Fusion/src/Fusion.Execution/HotChocolate.Fusion.Execution.csproj index a2e5eb6dace..2ea0b7f7cee 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/HotChocolate.Fusion.Execution.csproj +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/HotChocolate.Fusion.Execution.csproj @@ -13,6 +13,7 @@ + diff --git a/src/HotChocolate/Utilities/src/Utilities.Buffers/ChunkedArrayWriter.cs b/src/HotChocolate/Utilities/src/Utilities.Buffers/ChunkedArrayWriter.cs index 18a2f80e68a..4f6800dc624 100644 --- a/src/HotChocolate/Utilities/src/Utilities.Buffers/ChunkedArrayWriter.cs +++ b/src/HotChocolate/Utilities/src/Utilities.Buffers/ChunkedArrayWriter.cs @@ -21,6 +21,7 @@ internal sealed class ChunkedArrayWriter : IBufferWriter, IDisposable private const int DefaultScratchSize = 128; private const int SimdThreshold = 32; + private readonly JsonMemoryKind _memoryKind; private byte[][] _chunks; private int _chunkCount; private int _currentChunk; @@ -29,10 +30,11 @@ internal sealed class ChunkedArrayWriter : IBufferWriter, IDisposable private bool _advanceFromScratch; private bool _disposed; - public ChunkedArrayWriter() + public ChunkedArrayWriter(JsonMemoryKind memoryKind = JsonMemoryKind.Variables) { + _memoryKind = memoryKind; _chunks = ArrayPool.Shared.Rent(4); - _chunks[0] = JsonMemory.Rent(JsonMemoryKind.Variables); + _chunks[0] = JsonMemory.Rent(memoryKind); _chunkCount = 1; _scratch = new byte[DefaultScratchSize]; } @@ -577,6 +579,69 @@ public void Reset() _advanceFromScratch = false; } + /// + /// Transfers ownership of all data chunks to the caller and resets the writer. + /// Returns the chunk array, the number of used chunks, and the byte count + /// in the last chunk — ready to pass directly to + /// SourceResultDocument.Parse(dataChunks, lastLength, usedChunks, pooledMemory: true). + /// After this call the writer is disposed and must not be used again. + /// + public (byte[][] DataChunks, int UsedChunks, int LastLength) DrainChunks() + { +#if NETSTANDARD2_0 + if (_disposed) + { + throw new ObjectDisposedException(typeof(ChunkedArrayWriter).FullName!); + } +#else + ObjectDisposedException.ThrowIf(_disposed, this); +#endif + + int usedChunks; + int lastLength; + + if (_currentChunkOffset > 0) + { + usedChunks = _currentChunk + 1; + lastLength = _currentChunkOffset; + } + else if (_currentChunk > 0) + { + // Moved to a new chunk but nothing written to it — return unused chunk. + usedChunks = _currentChunk; + lastLength = BufferSize; + JsonMemory.Return(_memoryKind, _chunks[_currentChunk]); + _chunks[_currentChunk] = null!; + } + else + { + // Nothing written at all. + usedChunks = 0; + lastLength = 0; + } + + // Return any pre-allocated chunks beyond the used range. + for (var i = usedChunks; i < _chunkCount; i++) + { + if (_chunks[i] is not null) + { + JsonMemory.Return(_memoryKind, _chunks[i]); + _chunks[i] = null!; + } + } + + var chunks = _chunks; + + // Reset state — writer is now effectively disposed. + _chunks = []; + _chunkCount = 0; + _currentChunk = 0; + _currentChunkOffset = 0; + _disposed = true; + + return (chunks, usedChunks, lastLength); + } + /// /// Returns excess chunks beyond the first one. /// Call this when the owning store is returned to a pool. @@ -585,7 +650,7 @@ public void Clean() { for (var i = 1; i < _chunkCount; i++) { - JsonMemory.Return(JsonMemoryKind.Variables, _chunks[i]); + JsonMemory.Return(_memoryKind, _chunks[i]); _chunks[i] = null!; } @@ -604,7 +669,7 @@ public void Dispose() { for (var i = 0; i < _chunkCount; i++) { - JsonMemory.Return(JsonMemoryKind.Variables, _chunks[i]); + JsonMemory.Return(_memoryKind, _chunks[i]); _chunks[i] = null!; } @@ -631,7 +696,7 @@ private void MoveToNextChunk() if (_currentChunk >= _chunkCount) { - _chunks[_currentChunk] = JsonMemory.Rent(JsonMemoryKind.Variables); + _chunks[_currentChunk] = JsonMemory.Rent(_memoryKind); _chunkCount = _currentChunk + 1; } } diff --git a/src/HotChocolate/Utilities/src/Utilities.Buffers/HotChocolate.Utilities.Buffers.csproj b/src/HotChocolate/Utilities/src/Utilities.Buffers/HotChocolate.Utilities.Buffers.csproj index a3eda07d0ba..66d41f10584 100644 --- a/src/HotChocolate/Utilities/src/Utilities.Buffers/HotChocolate.Utilities.Buffers.csproj +++ b/src/HotChocolate/Utilities/src/Utilities.Buffers/HotChocolate.Utilities.Buffers.csproj @@ -25,6 +25,7 @@ + From 59bf9d18fd36de6ae02812857c3fdd0b47516545 Mon Sep 17 00:00:00 2001 From: Michael Staib Date: Fri, 27 Mar 2026 14:27:23 +0000 Subject: [PATCH 2/7] foundational pieces --- .../InMemorySourceSchemaClientFactory.cs | 44 +++++++++++++ ...colateFusionServiceCollectionExtensions.cs | 6 +- .../Clients/DefaultSourceSchemaClientScope.cs | 63 +++++++++++++------ .../DefaultSourceSchemaClientScopeFactory.cs | 10 +-- .../Clients/HttpSourceSchemaClientFactory.cs | 26 ++++++++ .../Clients/ISourceSchemaClientFactory.cs | 23 +++++++ .../Clients/SourceSchemaClientFactory.cs | 29 +++++++++ 7 files changed, 175 insertions(+), 26 deletions(-) create mode 100644 src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClientFactory.cs create mode 100644 src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/HttpSourceSchemaClientFactory.cs create mode 100644 src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/ISourceSchemaClientFactory.cs create mode 100644 src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/SourceSchemaClientFactory.cs diff --git a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClientFactory.cs b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClientFactory.cs new file mode 100644 index 00000000000..0bb094e015f --- /dev/null +++ b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClientFactory.cs @@ -0,0 +1,44 @@ +using HotChocolate.Execution; +using HotChocolate.Transport.Formatters; + +namespace HotChocolate.Fusion.Execution.Clients; + +/// +/// A factory that creates instances +/// for source schemas configured with . +/// +public sealed class InMemorySourceSchemaClientFactory + : SourceSchemaClientFactory +{ + private readonly IRequestExecutorProvider _executorProvider; + private readonly IRequestExecutorEvents _executorEvents; + private readonly JsonResultFormatter _formatter; + + /// + /// Initializes a new instance of . + /// + /// The provider for resolving request executors. + /// The event source for executor lifecycle events. + /// The JSON result formatter. + public InMemorySourceSchemaClientFactory( + IRequestExecutorProvider executorProvider, + IRequestExecutorEvents executorEvents, + JsonResultFormatter formatter) + { + ArgumentNullException.ThrowIfNull(executorProvider); + ArgumentNullException.ThrowIfNull(executorEvents); + ArgumentNullException.ThrowIfNull(formatter); + + _executorProvider = executorProvider; + _executorEvents = executorEvents; + _formatter = formatter; + } + + /// + protected override ISourceSchemaClient CreateClient( + InMemorySourceSchemaClientConfiguration configuration) + { + var proxy = new RequestExecutorProxy(_executorProvider, _executorEvents, configuration.Name); + return new InMemorySourceSchemaClient(proxy, _formatter); + } +} diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/DependencyInjection/HotChocolateFusionServiceCollectionExtensions.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/DependencyInjection/HotChocolateFusionServiceCollectionExtensions.cs index ccc265c1c8f..5fbb1f02bd5 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/DependencyInjection/HotChocolateFusionServiceCollectionExtensions.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/DependencyInjection/HotChocolateFusionServiceCollectionExtensions.cs @@ -60,9 +60,13 @@ private static void AddRequestExecutorManager( private static void AddSourceSchemaScope( IServiceCollection services) { + services.AddSingleton( + static sp => new HttpSourceSchemaClientFactory( + sp.GetRequiredService())); + services.TryAddSingleton( static sp => new DefaultSourceSchemaClientScopeFactory( - sp.GetRequiredService())); + sp.GetServices().ToArray())); } private static DefaultFusionGatewayBuilder CreateBuilder( diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/DefaultSourceSchemaClientScope.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/DefaultSourceSchemaClientScope.cs index d2957df795f..c8f1824636c 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/DefaultSourceSchemaClientScope.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/DefaultSourceSchemaClientScope.cs @@ -1,6 +1,5 @@ using System.Collections.Concurrent; using HotChocolate.Features; -using HotChocolate.Fusion.Transport.Http; using HotChocolate.Fusion.Types; using HotChocolate.Language; @@ -14,18 +13,18 @@ public sealed class DefaultSourceSchemaClientScope : ISourceSchemaClientScope private readonly object _sync = new(); #endif private readonly ConcurrentDictionary<(string Name, OperationType Type), ISourceSchemaClient> _clients = []; - private readonly IHttpClientFactory _httpClientFactory; + private readonly ISourceSchemaClientFactory[] _clientFactories; private readonly SourceSchemaClientConfigurations _configurations; private bool _disposed; public DefaultSourceSchemaClientScope( FusionSchemaDefinition schemaDefinition, - IHttpClientFactory httpClientFactory) + ISourceSchemaClientFactory[] clientFactories) { ArgumentNullException.ThrowIfNull(schemaDefinition); - ArgumentNullException.ThrowIfNull(httpClientFactory); + ArgumentNullException.ThrowIfNull(clientFactories); - _httpClientFactory = httpClientFactory; + _clientFactories = clientFactories; _configurations = schemaDefinition.Features.GetRequired(); } @@ -47,28 +46,52 @@ public ISourceSchemaClient GetClient(string name, OperationType operationType) $"No client configuration found for schema '{name}' and operation type {operationType}."); } - switch (config) - { - case SourceSchemaHttpClientConfiguration httpClientConfig: - var httpClient = _httpClientFactory.CreateClient(httpClientConfig.HttpClientName); - httpClient.BaseAddress = httpClientConfig.BaseAddress; + sourceSchemaClient = CreateClient(config); + _clients.TryAdd(key, sourceSchemaClient); + } + } + } + + return sourceSchemaClient; + } - sourceSchemaClient = new SourceSchemaHttpClient( - GraphQLHttpClient.Create(httpClient, disposeHttpClient: true), - httpClientConfig); + private ISourceSchemaClient CreateClient(ISourceSchemaClientConfiguration configuration) + { + var factories = _clientFactories; - _clients.TryAdd(key, sourceSchemaClient); - break; + if (factories.Length > 0 && factories[0].CanHandle(configuration)) + { + return factories[0].CreateClient(configuration); + } - default: - throw new NotSupportedException( - $"Unsupported client configuration type: {config.GetType().Name}."); - } + if (factories.Length > 1 && factories[1].CanHandle(configuration)) + { + return factories[1].CreateClient(configuration); + } + + if (factories.Length > 2 && factories[2].CanHandle(configuration)) + { + return factories[2].CreateClient(configuration); + } + + if (factories.Length > 3 && factories[3].CanHandle(configuration)) + { + return factories[3].CreateClient(configuration); + } + + if (factories.Length > 4) + { + for (var i = 4; i < factories.Length; i++) + { + if (factories[i].CanHandle(configuration)) + { + return factories[i].CreateClient(configuration); } } } - return sourceSchemaClient; + throw new NotSupportedException( + $"No client factory found for configuration type: {configuration.GetType().Name}."); } public async ValueTask DisposeAsync() diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/DefaultSourceSchemaClientScopeFactory.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/DefaultSourceSchemaClientScopeFactory.cs index 5077e878248..addea0ed61a 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/DefaultSourceSchemaClientScopeFactory.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/DefaultSourceSchemaClientScopeFactory.cs @@ -4,13 +4,13 @@ namespace HotChocolate.Fusion.Execution.Clients; internal sealed class DefaultSourceSchemaClientScopeFactory : ISourceSchemaClientScopeFactory { - private readonly IHttpClientFactory _httpClientFactory; + private readonly ISourceSchemaClientFactory[] _clientFactories; - public DefaultSourceSchemaClientScopeFactory(IHttpClientFactory httpClientFactory) + public DefaultSourceSchemaClientScopeFactory(ISourceSchemaClientFactory[] clientFactories) { - ArgumentNullException.ThrowIfNull(httpClientFactory); + ArgumentNullException.ThrowIfNull(clientFactories); - _httpClientFactory = httpClientFactory; + _clientFactories = clientFactories; } public ISourceSchemaClientScope CreateScope(ISchemaDefinition schemaDefinition) @@ -24,6 +24,6 @@ public ISourceSchemaClientScope CreateScope(ISchemaDefinition schemaDefinition) nameof(schemaDefinition)); } - return new DefaultSourceSchemaClientScope(schema, _httpClientFactory); + return new DefaultSourceSchemaClientScope(schema, _clientFactories); } } diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/HttpSourceSchemaClientFactory.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/HttpSourceSchemaClientFactory.cs new file mode 100644 index 00000000000..44e3bc3ed9e --- /dev/null +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/HttpSourceSchemaClientFactory.cs @@ -0,0 +1,26 @@ +using HotChocolate.Fusion.Transport.Http; + +namespace HotChocolate.Fusion.Execution.Clients; + +internal sealed class HttpSourceSchemaClientFactory + : SourceSchemaClientFactory +{ + private readonly IHttpClientFactory _httpClientFactory; + + public HttpSourceSchemaClientFactory(IHttpClientFactory httpClientFactory) + { + ArgumentNullException.ThrowIfNull(httpClientFactory); + _httpClientFactory = httpClientFactory; + } + + protected override ISourceSchemaClient CreateClient( + SourceSchemaHttpClientConfiguration configuration) + { + var httpClient = _httpClientFactory.CreateClient(configuration.HttpClientName); + httpClient.BaseAddress = configuration.BaseAddress; + + return new SourceSchemaHttpClient( + GraphQLHttpClient.Create(httpClient, disposeHttpClient: true), + configuration); + } +} diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/ISourceSchemaClientFactory.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/ISourceSchemaClientFactory.cs new file mode 100644 index 00000000000..aefd8080cbd --- /dev/null +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/ISourceSchemaClientFactory.cs @@ -0,0 +1,23 @@ +namespace HotChocolate.Fusion.Execution.Clients; + +/// +/// A factory that creates instances +/// from a matching . +/// Each connector type (HTTP, in-memory, etc.) provides its own factory implementation. +/// +public interface ISourceSchemaClientFactory +{ + /// + /// Determines whether this factory can create a client for the given configuration. + /// + /// The client configuration to check. + /// true if this factory handles the given configuration type. + bool CanHandle(ISourceSchemaClientConfiguration configuration); + + /// + /// Creates a new for the given configuration. + /// + /// The client configuration. + /// A new source schema client instance. + ISourceSchemaClient CreateClient(ISourceSchemaClientConfiguration configuration); +} diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/SourceSchemaClientFactory.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/SourceSchemaClientFactory.cs new file mode 100644 index 00000000000..7552d217ea9 --- /dev/null +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/SourceSchemaClientFactory.cs @@ -0,0 +1,29 @@ +namespace HotChocolate.Fusion.Execution.Clients; + +/// +/// Base class for implementations that are +/// bound to a specific type. +/// Handles the type check in and the cast in +/// automatically. +/// +/// +/// The configuration type this factory handles. +/// +public abstract class SourceSchemaClientFactory : ISourceSchemaClientFactory + where TConfiguration : ISourceSchemaClientConfiguration +{ + /// + public bool CanHandle(ISourceSchemaClientConfiguration configuration) + => configuration is TConfiguration; + + /// + ISourceSchemaClient ISourceSchemaClientFactory.CreateClient(ISourceSchemaClientConfiguration configuration) + => CreateClient((TConfiguration)configuration); + + /// + /// Creates a new for the given typed configuration. + /// + /// The typed client configuration. + /// A new source schema client instance. + protected abstract ISourceSchemaClient CreateClient(TConfiguration configuration); +} From 07028e715b4a9bb2090bfa50e53313f90b231ce5 Mon Sep 17 00:00:00 2001 From: Michael Staib Date: Fri, 27 Mar 2026 22:16:28 +0000 Subject: [PATCH 3/7] Added some tests and fixed up several integration pieces --- src/All.slnx | 2 + .../Fusion/HotChocolate.Fusion.slnx | 1 + .../InMemoryFusionGatewayBuilderExtensions.cs | 93 +++++ ...hocolate.Fusion.Connectors.InMemory.csproj | 5 +- .../InMemoryConfigurationProvider.cs | 256 +++++++++++++ .../InMemorySourceSchemaClient.cs | 343 ++++++++++++++---- .../Configuration/FusionGatewaySetup.cs | 2 +- .../Configuration/FusionSetupUtilities.cs | 2 +- ...colateFusionServiceCollectionExtensions.cs | 16 + .../HotChocolate.Fusion.Execution.csproj | 1 + ...te.Fusion.Connectors.InMemory.Tests.csproj | 21 ++ .../InMemoryConnectorTests.cs | 218 +++++++++++ ...urnComposedData_When_TwoInMemorySchemas.md | 12 + ...ld_ReturnData_When_SingleInMemorySchema.md | 12 + ...roductAndReviews_When_CrossSchemaLookup.md | 38 ++ 15 files changed, 957 insertions(+), 65 deletions(-) create mode 100644 src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/DependencyInjection/InMemoryFusionGatewayBuilderExtensions.cs create mode 100644 src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemoryConfigurationProvider.cs create mode 100644 src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/HotChocolate.Fusion.Connectors.InMemory.Tests.csproj create mode 100644 src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/InMemoryConnectorTests.cs create mode 100644 src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/InMemoryConnectorTests.Execute_Should_ReturnComposedData_When_TwoInMemorySchemas.md create mode 100644 src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/InMemoryConnectorTests.Execute_Should_ReturnData_When_SingleInMemorySchema.md create mode 100644 src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/InMemoryConnectorTests.Execute_Should_ReturnProductAndReviews_When_CrossSchemaLookup.md diff --git a/src/All.slnx b/src/All.slnx index bbef4345d13..c856c1f63a2 100644 --- a/src/All.slnx +++ b/src/All.slnx @@ -215,6 +215,7 @@ + @@ -224,6 +225,7 @@ + diff --git a/src/HotChocolate/Fusion/HotChocolate.Fusion.slnx b/src/HotChocolate/Fusion/HotChocolate.Fusion.slnx index 8c435408a16..69c04986eaf 100644 --- a/src/HotChocolate/Fusion/HotChocolate.Fusion.slnx +++ b/src/HotChocolate/Fusion/HotChocolate.Fusion.slnx @@ -24,6 +24,7 @@ + diff --git a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/DependencyInjection/InMemoryFusionGatewayBuilderExtensions.cs b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/DependencyInjection/InMemoryFusionGatewayBuilderExtensions.cs new file mode 100644 index 00000000000..09924d0b4cc --- /dev/null +++ b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/DependencyInjection/InMemoryFusionGatewayBuilderExtensions.cs @@ -0,0 +1,93 @@ +using HotChocolate.Execution; +using HotChocolate.Execution.Configuration; +using HotChocolate.Fusion.Configuration; +using HotChocolate.Fusion.Connectors.InMemory; +using HotChocolate.Fusion.Execution.Clients; +using HotChocolate.Transport.Formatters; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace Microsoft.Extensions.DependencyInjection; + +/// +/// Extension methods for registering the in-memory connector on . +/// +public static class InMemoryFusionGatewayBuilderExtensions +{ + /// + /// Adds an in-memory schema connector that executes operations directly in-process + /// against the schema registered by the given . + /// + /// The fusion gateway builder. + /// + /// The request executor builder whose + /// identifies the source schema. + /// + /// The fusion gateway builder for chaining. + public static IFusionGatewayBuilder AddInMemorySchema( + this IFusionGatewayBuilder builder, + IRequestExecutorBuilder schemaBuilder) + { + ArgumentNullException.ThrowIfNull(schemaBuilder); + + return builder.AddInMemorySchema(schemaBuilder.Name); + } + + /// + /// Adds an in-memory schema connector that executes operations directly in-process + /// against the schema identified by . + /// + /// The fusion gateway builder. + /// The name of the source schema. + /// The fusion gateway builder for chaining. + public static IFusionGatewayBuilder AddInMemorySchema( + this IFusionGatewayBuilder builder, + string schemaName) + { + ArgumentNullException.ThrowIfNull(builder); + ArgumentException.ThrowIfNullOrEmpty(schemaName); + + if (!builder.Services.Any(d => d.ServiceType == typeof(InMemorySourceSchemaClientFactory))) + { + // Remove default HTTP client factory — in-memory mode doesn't need it. + for (var i = builder.Services.Count - 1; i >= 0; i--) + { + if (builder.Services[i].ServiceType == typeof(ISourceSchemaClientFactory)) + { + builder.Services.RemoveAt(i); + } + } + + builder.Services.AddSingleton( + static sp => new InMemorySourceSchemaClientFactory( + sp.GetRequiredService(), + sp.GetRequiredService(), + JsonResultFormatter.Default)); + + builder.Services.AddSingleton( + static sp => sp.GetRequiredService()); + } + + builder.Services.AddSingleton(new InMemorySchemaRegistration(schemaName)); + + FusionSetupUtilities.Configure( + builder, + setup => setup.DocumentProvider = sp => + { + var names = sp.GetServices() + .Select(r => r.SchemaName).ToArray(); + return new InMemoryConfigurationProvider( + names, + sp.GetRequiredService(), + sp.GetRequiredService()); + }); + + FusionSetupUtilities.Configure( + builder, + setup => setup.ClientConfigurationModifiers.Add( + _ => new InMemorySourceSchemaClientConfiguration(schemaName))); + + return builder; + } +} + +internal sealed record InMemorySchemaRegistration(string SchemaName); diff --git a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/HotChocolate.Fusion.Connectors.InMemory.csproj b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/HotChocolate.Fusion.Connectors.InMemory.csproj index 491791d9b3a..675aa1aea0a 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/HotChocolate.Fusion.Connectors.InMemory.csproj +++ b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/HotChocolate.Fusion.Connectors.InMemory.csproj @@ -12,9 +12,10 @@ - - + + + diff --git a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemoryConfigurationProvider.cs b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemoryConfigurationProvider.cs new file mode 100644 index 00000000000..a9ed20ee179 --- /dev/null +++ b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemoryConfigurationProvider.cs @@ -0,0 +1,256 @@ +using System.Buffers; +using System.Collections.Immutable; +using System.Text.Json; +using System.Threading.Channels; +using HotChocolate.Buffers; +using HotChocolate.Execution; +using HotChocolate.Fusion.Configuration; +using HotChocolate.Fusion.Logging; +using HotChocolate.Fusion.Options; +using HotChocolate.Utilities; + +namespace HotChocolate.Fusion.Connectors.InMemory; + +/// +/// A fusion configuration provider that composes schemas from in-memory +/// request executors and recomposes whenever any of the source schemas change. +/// +public sealed class InMemoryConfigurationProvider : IFusionConfigurationProvider +{ +#if NET9_0_OR_GREATER + private readonly Lock _syncRoot = new(); +#else + private readonly object _syncRoot = new(); +#endif + private readonly string[] _schemaNames; + private readonly RequestExecutorProxy[] _proxies; + private readonly IDisposable? _eventSubscription; + private readonly CancellationTokenSource _cts = new(); + + private readonly Channel _channel = + Channel.CreateBounded( + new BoundedChannelOptions(1) + { + FullMode = BoundedChannelFullMode.DropNewest, + SingleReader = true, + SingleWriter = false + }); + + private ImmutableArray _sessions = []; + private bool _disposed; + + /// + /// Initializes a new instance of . + /// + /// The names of the source schemas to compose. + /// The request executor provider. + /// The request executor events. + public InMemoryConfigurationProvider( + string[] schemaNames, + IRequestExecutorProvider executorProvider, + IRequestExecutorEvents executorEvents) + { + ArgumentNullException.ThrowIfNull(schemaNames); + ArgumentNullException.ThrowIfNull(executorProvider); + ArgumentNullException.ThrowIfNull(executorEvents); + + _schemaNames = schemaNames; + + _proxies = new RequestExecutorProxy[schemaNames.Length]; + for (var i = 0; i < schemaNames.Length; i++) + { + _proxies[i] = new RequestExecutorProxy(executorProvider, executorEvents, schemaNames[i]); + } + + var observer = new RequestExecutorEventObserver(OnExecutorEvent); + _eventSubscription = executorEvents.Subscribe(observer); + + _channel.Writer.TryWrite(true); + ComposeLoopAsync(_cts.Token).FireAndForget(); + } + + /// + public FusionConfiguration? Configuration { get; private set; } + + /// + public IDisposable Subscribe(IObserver observer) + { + ArgumentNullException.ThrowIfNull(observer); + ObjectDisposedException.ThrowIf(_disposed, this); + + var session = new ObserverSession(this, observer); + + lock (_syncRoot) + { + _sessions = _sessions.Add(session); + } + + var configuration = Configuration; + + if (configuration is not null) + { + observer.OnNext(configuration); + } + + return session; + } + + private void Unsubscribe(ObserverSession session) + { + lock (_syncRoot) + { + _sessions = _sessions.Remove(session); + } + } + + private void OnExecutorEvent(RequestExecutorEvent eventArgs) + { + if (_disposed || eventArgs.Type is not RequestExecutorEventType.Created) + { + return; + } + + for (var i = 0; i < _schemaNames.Length; i++) + { + if (eventArgs.Name.Equals(_schemaNames[i], StringComparison.Ordinal)) + { + _channel.Writer.TryWrite(true); + return; + } + } + } + + private async Task ComposeLoopAsync(CancellationToken ct) + { + var defaultSettings = JsonDocument.Parse("{ }"); + + await foreach (var _ in _channel.Reader.ReadAllAsync(ct)) + { + try + { + var sourceSchemas = new SourceSchemaText[_schemaNames.Length]; + + for (var i = 0; i < _proxies.Length; i++) + { + var executor = await _proxies[i].GetExecutorAsync(ct).ConfigureAwait(false); + var sdl = SchemaPrinter.Print((Schema)executor.Schema); + sourceSchemas[i] = new SourceSchemaText(_schemaNames[i], sdl); + } + + var result = new SchemaComposer( + sourceSchemas, + new SchemaComposerOptions(), + new CompositionLog()).Compose(); + + if (result.IsFailure) + { + continue; + } + + var documentNode = result.Value.ToSyntaxNode(); + var settings = new JsonDocumentOwner(defaultSettings, EmptyMemoryOwner.Instance); + NotifyObservers(new FusionConfiguration(documentNode, settings)); + } + catch + { + // ignore and wait for next update + } + } + } + + private void NotifyObservers(FusionConfiguration configuration) + { + ImmutableArray sessions; + + lock (_syncRoot) + { + sessions = _sessions; + Configuration = configuration; + } + + if (sessions.IsEmpty) + { + return; + } + + foreach (var session in sessions) + { + session.Notify(configuration); + } + } + + /// + public ValueTask DisposeAsync() + { + if (_disposed) + { + return ValueTask.CompletedTask; + } + + _disposed = true; + + _cts.Cancel(); + _cts.Dispose(); + _eventSubscription?.Dispose(); + + foreach (var proxy in _proxies) + { + proxy.Dispose(); + } + + foreach (var session in _sessions) + { + session.Complete(); + } + + // drain events + while (_channel.Reader.TryRead(out _)) + { + } + + return ValueTask.CompletedTask; + } + + private sealed class ObserverSession( + InMemoryConfigurationProvider provider, + IObserver observer) + : IDisposable + { + public void Notify(FusionConfiguration schemaDocument) + { + try + { + observer.OnNext(schemaDocument); + } + catch (Exception ex) + { + observer.OnError(ex); + } + } + + public void Complete() + { + try + { + observer.OnCompleted(); + } + catch + { + // We do not want to throw an exception if the observer + // throws an exception on completion. + } + } + + public void Dispose() + => provider.Unsubscribe(this); + } + + private sealed class EmptyMemoryOwner : IMemoryOwner + { + public static readonly EmptyMemoryOwner Instance = new(); + + public Memory Memory => default; + + public void Dispose() { } + } +} diff --git a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClient.cs b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClient.cs index 8800eca54a3..26f52b7ee11 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClient.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClient.cs @@ -1,10 +1,12 @@ using System.Collections.Immutable; using System.Runtime.CompilerServices; +using System.Text.Encodings.Web; using System.Text.Json; using HotChocolate.Buffers; using HotChocolate.Execution; -using HotChocolate.Fusion.Text.Json; using HotChocolate.Language; +using HotChocolate.Fusion.Text.Json; +using HotChocolate.Text.Json; using HotChocolate.Transport.Formatters; namespace HotChocolate.Fusion.Execution.Clients; @@ -43,7 +45,8 @@ public InMemorySourceSchemaClient( /// public SourceSchemaClientCapabilities Capabilities - => default; + => SourceSchemaClientCapabilities.VariableBatching + | SourceSchemaClientCapabilities.RequestBatching; /// public async ValueTask ExecuteAsync( @@ -54,13 +57,8 @@ public async ValueTask ExecuteAsync( ArgumentNullException.ThrowIfNull(context); ObjectDisposedException.ThrowIf(_disposed, this); - if (request.Variables.Length > 1) - { - throw new InvalidOperationException( - "The in-memory source schema client does not support variable batching."); - } - - var operationRequest = BuildOperationRequest(request); + ChunkedArrayWriter? buffer = null; + var operationRequest = BuildOperationRequest(request, ref buffer); try { @@ -68,11 +66,12 @@ public async ValueTask ExecuteAsync( .ExecuteAsync(operationRequest, cancellationToken) .ConfigureAwait(false); - return new Response(result, request, _formatter); + return new Response(result, request, _formatter, buffer); } catch { operationRequest.Dispose(); + buffer?.Dispose(); throw; } } @@ -86,24 +85,114 @@ public async IAsyncEnumerable ExecuteBatchStreamAsync( ArgumentNullException.ThrowIfNull(context); ObjectDisposedException.ThrowIf(_disposed, this); - for (var i = 0; i < requests.Length; i++) + // Build one IOperationRequest per source request. + var operationRequests = new IOperationRequest[requests.Length]; + ChunkedArrayWriter? buffer = null; + + try + { + for (var i = 0; i < requests.Length; i++) + { + operationRequests[i] = BuildOperationRequest(requests[i], ref buffer); + } + } + catch { - var response = await ExecuteAsync(context, requests[i], cancellationToken) + for (var i = 0; i < operationRequests.Length; i++) + { + operationRequests[i]?.Dispose(); + } + + buffer?.Dispose(); + throw; + } + + var batch = new OperationRequestBatch(operationRequests); + IResponseStream responseStream; + + try + { + responseStream = await _executor + .ExecuteBatchAsync(batch, cancellationToken) .ConfigureAwait(false); + } + catch + { + buffer?.Dispose(); + throw; + } - try + try + { + await foreach (var operationResult in responseStream + .ReadResultsAsync() + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) { - await foreach (var sourceResult in response - .ReadAsResultStreamAsync(cancellationToken) - .ConfigureAwait(false)) + var requestIndex = ResolveRequestIndex(requests, operationResult); + + if (requestIndex == -1) + { + // No request index — fan out to all requests. + var document = SerializeToDocument(operationResult, _formatter); + + for (var i = 0; i < requests.Length; i++) + { + if (TryGetResultPath(requests[i], variableIndex: 0, out var p, out var ap)) + { + var ssr = ap.IsDefaultOrEmpty + ? new SourceSchemaResult(p, document) + : new SourceSchemaResult(p, document, additionalPaths: ap); + + yield return new BatchStreamResult(i, ssr); + } + } + + continue; + } + + var request = requests[requestIndex]; + var variableIndex = ResolveVariableIndex(request, operationResult); + + if (variableIndex == -1) { - yield return new BatchStreamResult(i, sourceResult); + // No variable index — fan out to all variable sets of this request. + var document = SerializeToDocument(operationResult, _formatter); + + for (var vi = 0; vi < request.Variables.Length; vi++) + { + if (TryGetResultPath(request, vi, out var vp, out var vap)) + { + var vssr = vap.IsDefaultOrEmpty + ? new SourceSchemaResult(vp, document) + : new SourceSchemaResult(vp, document, additionalPaths: vap); + + yield return new BatchStreamResult(requestIndex, vssr); + } + } + + continue; } + + if (!TryGetResultPath(request, variableIndex, out var path, out var additionalPaths)) + { + throw new InvalidOperationException( + $"Invalid variable index {variableIndex} for request {requestIndex}."); + } + + var resultDocument = SerializeToDocument(operationResult, _formatter); + + var sourceSchemaResult = additionalPaths.IsDefaultOrEmpty + ? new SourceSchemaResult(path, resultDocument) + : new SourceSchemaResult(path, resultDocument, additionalPaths: additionalPaths); + + yield return new BatchStreamResult(requestIndex, sourceSchemaResult); } - finally - { - response.Dispose(); - } + } + finally + { + await responseStream.DisposeAsync().ConfigureAwait(false); + buffer?.Dispose(); } } @@ -119,38 +208,112 @@ public ValueTask DisposeAsync() return ValueTask.CompletedTask; } - private static OperationRequest BuildOperationRequest(SourceSchemaClientRequest request) + private static IOperationRequest BuildOperationRequest( + SourceSchemaClientRequest request, + ref ChunkedArrayWriter? buffer) { - JsonDocument? variables = null; + if (request.Variables.Length == 0) + { + return OperationRequest.FromSourceText(request.OperationSourceText); + } - if (request.Variables.Length == 1 && !request.Variables[0].IsEmpty) + if (request.Variables.Length == 1) { + if (request.Variables[0].IsEmpty) + { + return OperationRequest.FromSourceText(request.OperationSourceText); + } + var sequence = request.Variables[0].Values.AsSequence(); - variables = JsonDocument.Parse(sequence); + return OperationRequest.FromSourceText( + request.OperationSourceText, + variableValues: JsonDocument.Parse(sequence)); + } + + buffer ??= new ChunkedArrayWriter(); + var writer = new JsonWriter( + buffer, + new JsonWriterOptions + { + Indented = true, + Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping + }); + + writer.WriteStartArray(); + + for (var i = 0; i < request.Variables.Length; i++) + { + request.Variables[i].Values.WriteTo(writer); } - return OperationRequest.FromSourceText( - request.OperationSourceText, - variableValues: variables); + writer.WriteEndArray(); + + var variables = JsonSegment.Create(buffer, 0, buffer.Length); + + return OperationRequestBuilder.New() + .SetDocument(request.OperationSourceText) + .SetVariableValues(JsonDocument.Parse(variables.AsSequence())) + .Build(); } - private static SourceResultDocument SerializeToDocument( - OperationResult operationResult, - JsonResultFormatter formatter) + private static int ResolveRequestIndex( + ImmutableArray requests, + OperationResult result) { - var writer = new ChunkedArrayWriter(JsonMemoryKind.Json); + if (requests.Length == 1) + { + return 0; + } - try + return result.RequestIndex ?? -1; + } + + private static int ResolveVariableIndex( + SourceSchemaClientRequest request, + OperationResult result) + { + if (request.Variables.Length <= 1) { - formatter.Format(operationResult, writer); - var (chunks, usedChunks, lastLength) = writer.DrainChunks(); - return SourceResultDocument.Parse(chunks, lastLength, usedChunks, pooledMemory: true); + return 0; } - catch + + return result.VariableIndex ?? -1; + } + + private static bool TryGetResultPath( + SourceSchemaClientRequest request, + int variableIndex, + out CompactPath path, + out ImmutableArray additionalPaths) + { + if (request.Variables.Length == 0) { - writer.Dispose(); - throw; + path = CompactPath.Root; + additionalPaths = []; + return true; } + + if ((uint)variableIndex >= (uint)request.Variables.Length) + { + path = CompactPath.Root; + additionalPaths = []; + return false; + } + + var variable = request.Variables[variableIndex]; + path = variable.Path; + additionalPaths = variable.AdditionalPaths; + return true; + } + + private static SourceResultDocument SerializeToDocument( + OperationResult operationResult, + JsonResultFormatter formatter) + { + using var writer = new ChunkedArrayWriter(JsonMemoryKind.Json); + formatter.Format(operationResult, writer); + var (chunks, usedChunks, lastLength) = writer.DrainChunks(); + return SourceResultDocument.Parse(chunks, lastLength, usedChunks, pooledMemory: true); } private sealed class Response : SourceSchemaClientResponse @@ -158,15 +321,18 @@ private sealed class Response : SourceSchemaClientResponse private readonly IExecutionResult _result; private readonly SourceSchemaClientRequest _request; private readonly JsonResultFormatter _formatter; + private readonly ChunkedArrayWriter? _buffer; public Response( IExecutionResult result, SourceSchemaClientRequest request, - JsonResultFormatter formatter) + JsonResultFormatter formatter, + ChunkedArrayWriter? buffer) { _result = result; _request = request; _formatter = formatter; + _buffer = buffer; } public override Uri Uri => s_uri; @@ -178,29 +344,20 @@ public Response( public override async IAsyncEnumerable ReadAsResultStreamAsync( [EnumeratorCancellation] CancellationToken cancellationToken = default) { - switch (_result) + var variables = _request.Variables; + + if (_request.OperationType == OperationType.Subscription) { - case OperationResult operationResult: + if (_result is OperationResult errorResult) { - var path = _request.Variables.Length == 1 - ? _request.Variables[0].Path - : CompactPath.Root; - - var additionalPaths = _request.Variables.Length == 1 - ? _request.Variables[0].AdditionalPaths - : default; - - var document = SerializeToDocument(operationResult, _formatter); - - yield return additionalPaths.IsDefaultOrEmpty - ? new SourceSchemaResult(path, document) - : new SourceSchemaResult(path, document, additionalPaths: additionalPaths); - break; + var document = SerializeToDocument(errorResult, _formatter); + yield return new SourceSchemaResult(CompactPath.Root, document); } - - case IResponseStream responseStream: + else { - await foreach (var operationResult in responseStream + var stream = _result.ExpectResponseStream(); + + await foreach (var operationResult in stream .ReadResultsAsync() .WithCancellation(cancellationToken) .ConfigureAwait(false)) @@ -208,18 +365,82 @@ public override async IAsyncEnumerable ReadAsResultStreamAsy var document = SerializeToDocument(operationResult, _formatter); yield return new SourceSchemaResult(CompactPath.Root, document); } + } + + yield break; + } + + switch (variables.Length) + { + case 0: + { + var document = SerializeToDocument(_result.ExpectOperationResult(), _formatter); + yield return new SourceSchemaResult(CompactPath.Root, document); + break; + } + + case 1: + { + var document = SerializeToDocument(_result.ExpectOperationResult(), _formatter); + var variable = variables[0]; + + yield return variable.AdditionalPaths.IsDefaultOrEmpty + ? new SourceSchemaResult(variable.Path, document) + : new SourceSchemaResult(variable.Path, document, additionalPaths: variable.AdditionalPaths); break; } default: - throw new InvalidOperationException( - $"Unexpected execution result type: {_result.GetType().Name}."); + { + if (_result is OperationResult singleResult) + { + // Single result for all variable sets (e.g. validation error). + var document = SerializeToDocument(singleResult, _formatter); + var errorResult = new SourceSchemaResult(variables[0].Path, document); + + for (var i = 0; i < variables.Length; i++) + { + var variable = variables[i]; + yield return errorResult.WithPath(variable.Path, variable.AdditionalPaths); + } + } + else + { + // Variable batching — one result per variable set. + var resultBatch = (OperationResultBatch)_result; + + for (var i = 0; i < resultBatch.Results.Count; i++) + { + if (resultBatch.Results[i] is not OperationResult operationResult) + { + continue; + } + + if (operationResult.VariableIndex is not { } index + || (uint)index >= (uint)variables.Length) + { + throw new InvalidOperationException( + "The operation result is missing a valid variable index."); + } + + var variable = variables[index]; + var document = SerializeToDocument(operationResult, _formatter); + + yield return variable.AdditionalPaths.IsDefaultOrEmpty + ? new SourceSchemaResult(variable.Path, document) + : new SourceSchemaResult(variable.Path, document, additionalPaths: variable.AdditionalPaths); + } + } + + break; + } } } public override void Dispose() { _result.DisposeAsync().AsTask().GetAwaiter().GetResult(); + _buffer?.Dispose(); } } } diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Configuration/FusionGatewaySetup.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Configuration/FusionGatewaySetup.cs index 816393d1859..76ef525618b 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/Configuration/FusionGatewaySetup.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Configuration/FusionGatewaySetup.cs @@ -8,7 +8,7 @@ namespace HotChocolate.Fusion.Configuration; -internal sealed class FusionGatewaySetup +public sealed class FusionGatewaySetup { public Func? DocumentProvider { get; set; } diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Configuration/FusionSetupUtilities.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Configuration/FusionSetupUtilities.cs index 9e25767ebee..cc5ec48c2a3 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/Configuration/FusionSetupUtilities.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Configuration/FusionSetupUtilities.cs @@ -13,7 +13,7 @@ public static class FusionSetupUtilities /// public static Version Version { get; } = new(2, 0, 0, 0); - internal static IFusionGatewayBuilder Configure( + public static IFusionGatewayBuilder Configure( IFusionGatewayBuilder builder, Action configure) { diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/DependencyInjection/HotChocolateFusionServiceCollectionExtensions.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/DependencyInjection/HotChocolateFusionServiceCollectionExtensions.cs index 5fbb1f02bd5..8f9dc2e653d 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/DependencyInjection/HotChocolateFusionServiceCollectionExtensions.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/DependencyInjection/HotChocolateFusionServiceCollectionExtensions.cs @@ -30,6 +30,22 @@ public static IFusionGatewayBuilder AddGraphQLGateway( return CreateBuilder(services, name); } + /// + /// Builds a from the and + /// resolves the Fusion gateway executor. + /// + /// The service collection containing the gateway configuration. + /// A token to cancel the operation. + /// The gateway request executor. + public static async ValueTask BuildGatewayAsync( + this IServiceCollection services, + CancellationToken cancellationToken = default) + => await services + .BuildServiceProvider() + .GetRequiredService() + .GetExecutorAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + private static void AddCore( IServiceCollection services) { diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/HotChocolate.Fusion.Execution.csproj b/src/HotChocolate/Fusion/src/Fusion.Execution/HotChocolate.Fusion.Execution.csproj index 2ea0b7f7cee..3a07027e428 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/HotChocolate.Fusion.Execution.csproj +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/HotChocolate.Fusion.Execution.csproj @@ -8,6 +8,7 @@ + diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/HotChocolate.Fusion.Connectors.InMemory.Tests.csproj b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/HotChocolate.Fusion.Connectors.InMemory.Tests.csproj new file mode 100644 index 00000000000..224cd984cb7 --- /dev/null +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/HotChocolate.Fusion.Connectors.InMemory.Tests.csproj @@ -0,0 +1,21 @@ + + + + + HotChocolate.Fusion.Connectors.InMemory.Tests + HotChocolate.Fusion + + + + + + + + + + + + + + + diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/InMemoryConnectorTests.cs b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/InMemoryConnectorTests.cs new file mode 100644 index 00000000000..55733512b9e --- /dev/null +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/InMemoryConnectorTests.cs @@ -0,0 +1,218 @@ +using HotChocolate.Execution; +using HotChocolate.Types.Composite; +using Microsoft.Extensions.DependencyInjection; + +namespace HotChocolate.Fusion; + +public sealed class InMemoryConnectorTests +{ + [Fact] + public async Task Execute_Should_ReturnData_When_SingleInMemorySchema() + { + // arrange + var services = new ServiceCollection(); + + services.AddGraphQL("products") + .AddQueryType() + .AddSourceSchemaDefaults(); + + services.AddGraphQLGateway() + .AddInMemorySchema("products"); + + var executor = await services.BuildGatewayAsync(); + + // act + var result = await executor.ExecuteAsync( + """ + { + productById(id: 1) { + id + name + } + } + """); + + // assert + result.MatchMarkdownSnapshot(); + } + + [Fact] + public async Task Execute_Should_ReturnComposedData_When_TwoInMemorySchemas() + { + // arrange + var services = new ServiceCollection(); + + services.AddGraphQL("products") + .AddQueryType() + .AddSourceSchemaDefaults(); + + services.AddGraphQL("reviews") + .AddQueryType() + .AddSourceSchemaDefaults(); + + services.AddGraphQLGateway() + .AddInMemorySchema("products") + .AddInMemorySchema("reviews"); + + var executor = await services.BuildGatewayAsync(); + + // act + var result = await executor.ExecuteAsync( + """ + { + productById(id: 1) { + id + name + } + } + """); + + // assert + result.MatchMarkdownSnapshot(); + } + + [Fact] + public async Task Execute_Should_ReturnProductAndReviews_When_CrossSchemaLookup() + { + // arrange + var services = new ServiceCollection(); + + services.AddGraphQL("products") + .AddQueryType() + .AddSourceSchemaDefaults(); + + services.AddGraphQL("reviews") + .AddQueryType() + .AddSourceSchemaDefaults(); + + services.AddGraphQLGateway() + .AddInMemorySchema("products") + .AddInMemorySchema("reviews"); + + var executor = await services.BuildGatewayAsync(); + + // act + var result = await executor.ExecuteAsync( + """ + { + products { + id + name + reviews { + body + stars + } + } + } + """); + + // assert + result.MatchMarkdownSnapshot(); + } + + private static class SingleSchema + { + public class Query + { + [Lookup] + public Product? GetProductById(int id) + => id is >= 1 and <= 3 + ? new Product(id, $"Product {id}") + : null; + } + + [EntityKey("id")] + public record Product(int Id, string Name); + } + + private static class TwoSchemas + { + public static class ProductsSchema + { + public class Query + { + [Lookup] + public Product? GetProductById(int id) + => id is >= 1 and <= 3 + ? new Product(id, $"Product {id}") + : null; + } + + [EntityKey("id")] + public record Product(int Id, string Name); + } + + public static class ReviewsSchema + { + public class Query + { + [Lookup] + [Internal] + public Product? GetProductById(int id) + => id is >= 1 and <= 3 + ? new Product(id) + : null; + } + + [EntityKey("id")] + [GraphQLName("Product")] + public record Product(int Id); + } + } + + private static class CrossSchema + { + public static class ProductsSchema + { + public class Query + { + public IEnumerable GetProducts() + => + [ + new Product(1, "Product A"), + new Product(2, "Product B") + ]; + + [Lookup] + [Internal] + public Product? GetProductById(int id) + => id switch + { + 1 => new Product(1, "Product A"), + 2 => new Product(2, "Product B"), + _ => null + }; + } + + [EntityKey("id")] + public record Product(int Id, string Name); + } + + public static class ReviewsSchema + { + public class Query + { + [Lookup] + [Internal] + public Product? GetProductById(int id) + => id is >= 1 and <= 2 + ? new Product(id) + : null; + } + + [EntityKey("id")] + [GraphQLName("Product")] + public record Product(int Id) + { + public IEnumerable GetReviews() + => + [ + new Review(1, "Great product!", 5), + new Review(2, "Not bad", 3) + ]; + } + + public record Review(int Id, string Body, int Stars); + } + } +} diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/InMemoryConnectorTests.Execute_Should_ReturnComposedData_When_TwoInMemorySchemas.md b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/InMemoryConnectorTests.Execute_Should_ReturnComposedData_When_TwoInMemorySchemas.md new file mode 100644 index 00000000000..e7fda7982e2 --- /dev/null +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/InMemoryConnectorTests.Execute_Should_ReturnComposedData_When_TwoInMemorySchemas.md @@ -0,0 +1,12 @@ +# Execute_Should_ReturnComposedData_When_TwoInMemorySchemas + +```json +{ + "data": { + "productById": { + "id": 1, + "name": "Product 1" + } + } +} +``` diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/InMemoryConnectorTests.Execute_Should_ReturnData_When_SingleInMemorySchema.md b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/InMemoryConnectorTests.Execute_Should_ReturnData_When_SingleInMemorySchema.md new file mode 100644 index 00000000000..1163ed9035c --- /dev/null +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/InMemoryConnectorTests.Execute_Should_ReturnData_When_SingleInMemorySchema.md @@ -0,0 +1,12 @@ +# Execute_Should_ReturnData_When_SingleInMemorySchema + +```json +{ + "data": { + "productById": { + "id": 1, + "name": "Product 1" + } + } +} +``` diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/InMemoryConnectorTests.Execute_Should_ReturnProductAndReviews_When_CrossSchemaLookup.md b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/InMemoryConnectorTests.Execute_Should_ReturnProductAndReviews_When_CrossSchemaLookup.md new file mode 100644 index 00000000000..22f0bbd0090 --- /dev/null +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/InMemoryConnectorTests.Execute_Should_ReturnProductAndReviews_When_CrossSchemaLookup.md @@ -0,0 +1,38 @@ +# Execute_Should_ReturnProductAndReviews_When_CrossSchemaLookup + +```json +{ + "data": { + "products": [ + { + "id": 1, + "name": "Product A", + "reviews": [ + { + "body": "Great product!", + "stars": 5 + }, + { + "body": "Not bad", + "stars": 3 + } + ] + }, + { + "id": 2, + "name": "Product B", + "reviews": [ + { + "body": "Great product!", + "stars": 5 + }, + { + "body": "Not bad", + "stars": 3 + } + ] + } + ] + } +} +``` From f4eca9204e9aa56cd58d8e81602f2bf3fb04a42f Mon Sep 17 00:00:00 2001 From: Michael Staib Date: Sat, 28 Mar 2026 01:53:49 +0000 Subject: [PATCH 4/7] test edits --- .../FileUploadTests.cs | 338 ++++++++++++++++++ ...te.Fusion.Connectors.InMemory.Tests.csproj | 1 + 2 files changed, 339 insertions(+) create mode 100644 src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/FileUploadTests.cs diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/FileUploadTests.cs b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/FileUploadTests.cs new file mode 100644 index 00000000000..5ffbcb4003a --- /dev/null +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/FileUploadTests.cs @@ -0,0 +1,338 @@ +using System.Text; +using HotChocolate.Transport; +using HotChocolate.Transport.Http; +using HotChocolate.Types; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.TestHost; +using Microsoft.Extensions.DependencyInjection; + +namespace HotChocolate.Fusion; + +public sealed class FileUploadTests : IDisposable +{ + private TestServer? _server; + + [Fact] + public async Task Upload_Single_File() + { + // arrange + _server = await CreateServerAsync(); + using var client = GraphQLHttpClient.Create(_server.CreateClient()); + + var stream = new MemoryStream("abc"u8.ToArray()); + + var operation = new OperationRequest( + """ + query ($file: Upload!) { + singleUpload(file: $file) { + fileName + contentType + content + } + } + """, + variables: new Dictionary + { + ["file"] = new FileReference(() => stream, "test.txt", "text/plain") + }); + + var request = new GraphQLHttpRequest(operation, new Uri("http://localhost/graphql")) + { + Method = GraphQLHttpMethod.Post, + EnableFileUploads = true + }; + + // act + using var result = await client.SendAsync(request); + + // assert + var body = await result.ReadAsResultAsync(); + body.MatchMarkdownSnapshot(); + } + + [Fact] + public async Task Upload_Single_File_In_Input_Object() + { + // arrange + _server = await CreateServerAsync(); + using var client = GraphQLHttpClient.Create(_server.CreateClient()); + + var stream = new MemoryStream("abc"u8.ToArray()); + + var operation = new OperationRequest( + """ + query ($input: FileInput!) { + singleUploadWithInput(input: $input) { + fileName + contentType + content + } + } + """, + variables: new Dictionary + { + ["input"] = new Dictionary + { + ["file"] = new FileReference(() => stream, "test.txt", "text/plain") + } + }); + + var request = new GraphQLHttpRequest(operation, new Uri("http://localhost/graphql")) + { + Method = GraphQLHttpMethod.Post, + EnableFileUploads = true + }; + + // act + using var result = await client.SendAsync(request); + + // assert + var body = await result.ReadAsResultAsync(); + body.MatchMarkdownSnapshot(); + } + + [Fact] + public async Task Upload_Single_File_In_Input_Object_Inline() + { + // arrange + _server = await CreateServerAsync(); + using var client = GraphQLHttpClient.Create(_server.CreateClient()); + + var stream = new MemoryStream("abc"u8.ToArray()); + + var operation = new OperationRequest( + """ + query ($file: Upload!) { + singleUploadWithInput(input: { file: $file }) { + fileName + contentType + content + } + } + """, + variables: new Dictionary + { + ["file"] = new FileReference(() => stream, "test.txt", "text/plain") + }); + + var request = new GraphQLHttpRequest(operation, new Uri("http://localhost/graphql")) + { + Method = GraphQLHttpMethod.Post, + EnableFileUploads = true + }; + + // act + using var result = await client.SendAsync(request); + + // assert + var body = await result.ReadAsResultAsync(); + body.MatchMarkdownSnapshot(); + } + + [Fact] + public async Task Upload_List_Of_Files() + { + // arrange + _server = await CreateServerAsync(); + using var client = GraphQLHttpClient.Create(_server.CreateClient()); + + var stream1 = new MemoryStream("abc"u8.ToArray()); + var stream2 = new MemoryStream("def"u8.ToArray()); + + var operation = new OperationRequest( + """ + query ($files: [Upload!]!) { + multiUpload(files: $files) { + fileName + contentType + content + } + } + """, + variables: new Dictionary + { + ["files"] = new List + { + new FileReference(() => stream1, "test.txt", "text/plain"), + new FileReference(() => stream2, "test2.pdf", "application/pdf") + } + }); + + var request = new GraphQLHttpRequest(operation, new Uri("http://localhost/graphql")) + { + Method = GraphQLHttpMethod.Post, + EnableFileUploads = true + }; + + // act + using var result = await client.SendAsync(request); + + // assert + var body = await result.ReadAsResultAsync(); + body.MatchMarkdownSnapshot(); + } + + [Fact] + public async Task Upload_List_Of_Files_In_Input_Object() + { + // arrange + _server = await CreateServerAsync(); + using var client = GraphQLHttpClient.Create(_server.CreateClient()); + + var stream1 = new MemoryStream("abc"u8.ToArray()); + var stream2 = new MemoryStream("abc"u8.ToArray()); + + var operation = new OperationRequest( + """ + query ($input: FilesInput!) { + multiUploadWithInput(input: $input) { + fileName + contentType + content + } + } + """, + variables: new Dictionary + { + ["input"] = new Dictionary + { + ["files"] = new List + { + new FileReference(() => stream1, "test.txt", "text/plain"), + new FileReference(() => stream2, "test2.pdf", "application/pdf") + } + } + }); + + var request = new GraphQLHttpRequest(operation, new Uri("http://localhost/graphql")) + { + Method = GraphQLHttpMethod.Post, + EnableFileUploads = true + }; + + // act + using var result = await client.SendAsync(request); + + // assert + var body = await result.ReadAsResultAsync(); + body.MatchMarkdownSnapshot(); + } + + [Fact] + public async Task Upload_List_Of_Files_In_Input_Object_Inline() + { + // arrange + _server = await CreateServerAsync(); + using var client = GraphQLHttpClient.Create(_server.CreateClient()); + + var stream1 = new MemoryStream("abc"u8.ToArray()); + var stream2 = new MemoryStream("def"u8.ToArray()); + + var operation = new OperationRequest( + """ + query ($files: [Upload!]!) { + multiUploadWithInput(input: { files: $files }) { + fileName + contentType + content + } + } + """, + variables: new Dictionary + { + ["files"] = new List + { + new FileReference(() => stream1, "test.txt", "text/plain"), + new FileReference(() => stream2, "test2.pdf", "application/pdf") + } + }); + + var request = new GraphQLHttpRequest(operation, new Uri("http://localhost/graphql")) + { + Method = GraphQLHttpMethod.Post, + EnableFileUploads = true + }; + + // act + using var result = await client.SendAsync(request); + + // assert + var body = await result.ReadAsResultAsync(); + body.MatchMarkdownSnapshot(); + } + + private static async Task CreateServerAsync() + { + var builder = WebApplication.CreateBuilder(); + + builder.Services.AddGraphQL("uploads") + .AddQueryType() + .AddUploadType() + .AddSourceSchemaDefaults(); + + builder.Services.AddGraphQLGatewayServer() + .AddInMemorySchema("uploads"); + + builder.WebHost.UseTestServer(); + + var app = builder.Build(); + app.MapGraphQL(); + + await app.StartAsync(); + + return app.GetTestServer(); + } + + public void Dispose() + { + _server?.Dispose(); + } + + private static class UploadSchema + { + public class Query + { + public async Task SingleUpload(IFile file) + => await ReadFileAsync(file); + + public async Task SingleUploadWithInput(FileInput input) + => await ReadFileAsync(input.File); + + public async IAsyncEnumerable MultiUpload(IFile[] files) + { + foreach (var file in files) + { + yield return await ReadFileAsync(file); + } + } + + public async IAsyncEnumerable MultiUploadWithInput(FilesInput input) + { + foreach (var file in input.Files) + { + yield return await ReadFileAsync(file); + } + } + + private static async Task ReadFileAsync(IFile file) + { + await using var stream = file.OpenReadStream(); + using var sr = new StreamReader(stream, Encoding.UTF8); + var content = await sr.ReadToEndAsync(); + return new FileUploadResult(file.Name, file.ContentType, content); + } + + public class FileInput + { + public required IFile File { get; init; } + } + + public class FilesInput + { + public required IFile[] Files { get; init; } + } + } + + public record FileUploadResult(string FileName, string? ContentType, string Content); + } +} diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/HotChocolate.Fusion.Connectors.InMemory.Tests.csproj b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/HotChocolate.Fusion.Connectors.InMemory.Tests.csproj index 224cd984cb7..e2afbe07e72 100644 --- a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/HotChocolate.Fusion.Connectors.InMemory.Tests.csproj +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/HotChocolate.Fusion.Connectors.InMemory.Tests.csproj @@ -7,6 +7,7 @@ + From 6eccd1dd434b3b05322f6da6f8f679c5a38842d6 Mon Sep 17 00:00:00 2001 From: Michael Staib Date: Sat, 28 Mar 2026 04:45:41 +0000 Subject: [PATCH 5/7] polish --- .../Execution/Clients/SourceSchemaClientFactory.cs | 11 +++++++++-- .../src/Fusion.Execution/Execution/ThrowHelper.cs | 3 +++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/SourceSchemaClientFactory.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/SourceSchemaClientFactory.cs index 7552d217ea9..7e21029d381 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/SourceSchemaClientFactory.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Clients/SourceSchemaClientFactory.cs @@ -17,8 +17,15 @@ public bool CanHandle(ISourceSchemaClientConfiguration configuration) => configuration is TConfiguration; /// - ISourceSchemaClient ISourceSchemaClientFactory.CreateClient(ISourceSchemaClientConfiguration configuration) - => CreateClient((TConfiguration)configuration); + public ISourceSchemaClient CreateClient(ISourceSchemaClientConfiguration configuration) + { + if (configuration is not TConfiguration casted) + { + throw ThrowHelper.InvalidClientConfiguration(typeof(TConfiguration), configuration.GetType()); + } + + return CreateClient(casted); + } /// /// Creates a new for the given typed configuration. diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/ThrowHelper.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/ThrowHelper.cs index 71f4cd3a421..a3663cb9e0d 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/ThrowHelper.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/ThrowHelper.cs @@ -31,4 +31,7 @@ public static InvalidOperationException VariableIndexOutOfRange(int variableInde => new(string.Format( FusionExecutionResources.SourceSchemaHttpClient_VariableIndexOutOfRange, variableIndex)); + + public static ArgumentException InvalidClientConfiguration(Type expected, Type actual) + => new($"Expected client configuration of type '{expected.Name}' but received '{actual.Name}'."); } From e144886592437b3c373272ed8451b422029afa39 Mon Sep 17 00:00:00 2001 From: Michael Staib Date: Sat, 28 Mar 2026 16:14:23 +0000 Subject: [PATCH 6/7] Implement file upload --- .../Execution/VariableBatchRequest.cs | 61 ++++++++ .../InMemorySourceSchemaClient.cs | 147 ++++++++++++++---- .../FileUploadTests.cs | 35 +++-- .../FileUploadTests.Upload_List_Of_Files.md | 20 +++ ...ts.Upload_List_Of_Files_In_Input_Object.md | 20 +++ ...ad_List_Of_Files_In_Input_Object_Inline.md | 20 +++ .../FileUploadTests.Upload_Single_File.md | 13 ++ ...ests.Upload_Single_File_In_Input_Object.md | 13 ++ ...load_Single_File_In_Input_Object_Inline.md | 13 ++ 9 files changed, 294 insertions(+), 48 deletions(-) create mode 100644 src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_List_Of_Files.md create mode 100644 src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_List_Of_Files_In_Input_Object.md create mode 100644 src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_List_Of_Files_In_Input_Object_Inline.md create mode 100644 src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_Single_File.md create mode 100644 src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_Single_File_In_Input_Object.md create mode 100644 src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_Single_File_In_Input_Object_Inline.md diff --git a/src/HotChocolate/Core/src/Execution.Abstractions/Execution/VariableBatchRequest.cs b/src/HotChocolate/Core/src/Execution.Abstractions/Execution/VariableBatchRequest.cs index ecf0d976fd4..1ede760c8b8 100644 --- a/src/HotChocolate/Core/src/Execution.Abstractions/Execution/VariableBatchRequest.cs +++ b/src/HotChocolate/Core/src/Execution.Abstractions/Execution/VariableBatchRequest.cs @@ -205,4 +205,65 @@ public VariableBatchRequest WithFeatures(IFeatureCollection? features) features, Services, Flags); + + /// + /// Creates a new from a GraphQL source text + /// and a JSON array of variable sets. + /// + /// + /// The GraphQL operation source text. + /// + /// + /// A JSON document whose root element is an array of variable objects. + /// + /// + /// A GraphQL request document hash. + /// + /// + /// A name of an operation in the GraphQL request document that shall be executed. + /// + /// + /// The requested error handling mode. + /// + /// + /// The GraphQL request extension data. + /// + /// + /// The initial global request state. + /// + /// + /// The features that shall be used while executing the GraphQL request. + /// + /// + /// The services that shall be used while executing the GraphQL request. + /// + /// + /// The GraphQL request flags can be used to limit the execution engine capabilities. + /// + /// + /// Returns a new variable batch request. + /// + public static VariableBatchRequest FromSourceText( + string sourceText, + JsonDocument variableValues, + OperationDocumentHash? documentHash = null, + string? operationName = null, + ErrorHandlingMode? errorHandlingMode = null, + JsonDocument? extensions = null, + IReadOnlyDictionary? contextData = null, + IFeatureCollection? features = null, + IServiceProvider? services = null, + RequestFlags flags = RequestFlags.AllowAll) + => new( + new OperationDocumentSourceText(sourceText), + null, + documentHash, + operationName, + errorHandlingMode, + new JsonDocumentOwner(variableValues), + extensions is null ? null : new(extensions), + contextData, + features, + services, + flags); } diff --git a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClient.cs b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClient.cs index 26f52b7ee11..e68c6e16fec 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClient.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemorySourceSchemaClient.cs @@ -1,13 +1,16 @@ +using System.Buffers; using System.Collections.Immutable; using System.Runtime.CompilerServices; using System.Text.Encodings.Web; using System.Text.Json; using HotChocolate.Buffers; using HotChocolate.Execution; +using HotChocolate.Features; using HotChocolate.Language; using HotChocolate.Fusion.Text.Json; using HotChocolate.Text.Json; using HotChocolate.Transport.Formatters; +using HotChocolate.Types; namespace HotChocolate.Fusion.Execution.Clients; @@ -18,6 +21,13 @@ namespace HotChocolate.Fusion.Execution.Clients; public sealed class InMemorySourceSchemaClient : ISourceSchemaClient { private static readonly Uri s_uri = new("inmemory://localhost"); + private static readonly byte[] s_fileMarkerPrefix = "$.file("u8.ToArray(); + + private static readonly JsonWriterOptions s_jsonWriterOptions = new() + { + Indented = true, + Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping + }; private readonly RequestExecutorProxy _executor; private readonly JsonResultFormatter _formatter; @@ -58,7 +68,7 @@ public async ValueTask ExecuteAsync( ObjectDisposedException.ThrowIf(_disposed, this); ChunkedArrayWriter? buffer = null; - var operationRequest = BuildOperationRequest(request, ref buffer); + var operationRequest = BuildOperationRequest(context, request, ref buffer); try { @@ -93,7 +103,7 @@ public async IAsyncEnumerable ExecuteBatchStreamAsync( { for (var i = 0; i < requests.Length; i++) { - operationRequests[i] = BuildOperationRequest(requests[i], ref buffer); + operationRequests[i] = BuildOperationRequest(context, requests[i], ref buffer); } } catch @@ -209,35 +219,46 @@ public ValueTask DisposeAsync() } private static IOperationRequest BuildOperationRequest( + OperationPlanContext context, SourceSchemaClientRequest request, ref ChunkedArrayWriter? buffer) { + IFeatureCollection? features = null; + + if (request.RequiresFileUpload) + { + features = new FeatureCollection(); + features.Set(context.RequestContext.Features.GetRequired()); + } + if (request.Variables.Length == 0) { - return OperationRequest.FromSourceText(request.OperationSourceText); + return OperationRequest.FromSourceText( + request.OperationSourceText, + features: features); } if (request.Variables.Length == 1) { - if (request.Variables[0].IsEmpty) + var sequence = request.Variables[0].Values.AsSequence(); + + if (!request.RequiresFileUpload) { - return OperationRequest.FromSourceText(request.OperationSourceText); + return OperationRequest.FromSourceText( + request.OperationSourceText, + variableValues: JsonDocument.Parse(sequence)); } - var sequence = request.Variables[0].Values.AsSequence(); + buffer ??= new ChunkedArrayWriter(); + var cleanedJson = StripFileMarkers(buffer, sequence); return OperationRequest.FromSourceText( request.OperationSourceText, - variableValues: JsonDocument.Parse(sequence)); + variableValues: JsonDocument.Parse(cleanedJson.AsSequence()), + features: features); } buffer ??= new ChunkedArrayWriter(); - var writer = new JsonWriter( - buffer, - new JsonWriterOptions - { - Indented = true, - Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping - }); + var writer = new JsonWriter(buffer, s_jsonWriterOptions); writer.WriteStartArray(); @@ -250,35 +271,97 @@ private static IOperationRequest BuildOperationRequest( var variables = JsonSegment.Create(buffer, 0, buffer.Length); - return OperationRequestBuilder.New() - .SetDocument(request.OperationSourceText) - .SetVariableValues(JsonDocument.Parse(variables.AsSequence())) - .Build(); + var variableSequence = request.RequiresFileUpload + ? StripFileMarkers(buffer, variables.AsSequence()).AsSequence() + : variables.AsSequence(); + + return VariableBatchRequest.FromSourceText( + request.OperationSourceText, + variableValues: JsonDocument.Parse(variableSequence), + features: features); } - private static int ResolveRequestIndex( - ImmutableArray requests, - OperationResult result) + /// + /// Scans JSON for $.file(key) string markers and replaces them with just key. + /// + private static JsonSegment StripFileMarkers( + ChunkedArrayWriter buffer, + ReadOnlySequence json) { - if (requests.Length == 1) + var reader = new Utf8JsonReader(json, isFinalBlock: true, default); + var startPosition = buffer.Position; + var writer = new JsonWriter(buffer, s_jsonWriterOptions); + + while (reader.Read()) { - return 0; + switch (reader.TokenType) + { + case JsonTokenType.StartObject: + writer.WriteStartObject(); + break; + + case JsonTokenType.EndObject: + writer.WriteEndObject(); + break; + + case JsonTokenType.StartArray: + writer.WriteStartArray(); + break; + + case JsonTokenType.EndArray: + writer.WriteEndArray(); + break; + + case JsonTokenType.PropertyName: + writer.WritePropertyName(reader.ValueSpan); + break; + + case JsonTokenType.String: + var span = reader.ValueSpan; + if (span.Length > s_fileMarkerPrefix.Length + 1 + && span.StartsWith(s_fileMarkerPrefix) + && span[^1] == (byte)')') + { + // $.file(key) → key + span = span.Slice(s_fileMarkerPrefix.Length, span.Length - s_fileMarkerPrefix.Length - 1); + writer.WriteStringValue(span, skipEscaping: true); + } + else + { + writer.WriteStringValue(span, skipEscaping: true); + } + break; + + case JsonTokenType.Number: + writer.WriteNumberValue(reader.ValueSpan); + break; + + case JsonTokenType.True: + writer.WriteBooleanValue(true); + break; + + case JsonTokenType.False: + writer.WriteBooleanValue(false); + break; + + case JsonTokenType.Null: + writer.WriteNullValue(); + break; + } } - return result.RequestIndex ?? -1; + return JsonSegment.Create(buffer, startPosition, buffer.Position - startPosition); } + private static int ResolveRequestIndex( + ImmutableArray requests, + OperationResult result) + => requests.Length == 1 ? 0 : result.RequestIndex ?? -1; + private static int ResolveVariableIndex( SourceSchemaClientRequest request, OperationResult result) - { - if (request.Variables.Length <= 1) - { - return 0; - } - - return result.VariableIndex ?? -1; - } + => request.Variables.Length <= 1 ? 0 : result.VariableIndex ?? -1; private static bool TryGetResultPath( SourceSchemaClientRequest request, diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/FileUploadTests.cs b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/FileUploadTests.cs index 5ffbcb4003a..edfce2a66e6 100644 --- a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/FileUploadTests.cs +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/FileUploadTests.cs @@ -3,6 +3,7 @@ using HotChocolate.Transport.Http; using HotChocolate.Types; using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.TestHost; using Microsoft.Extensions.DependencyInjection; @@ -261,26 +262,28 @@ public async Task Upload_List_Of_Files_In_Input_Object_Inline() body.MatchMarkdownSnapshot(); } - private static async Task CreateServerAsync() + private static Task CreateServerAsync() { - var builder = WebApplication.CreateBuilder(); - - builder.Services.AddGraphQL("uploads") - .AddQueryType() - .AddUploadType() - .AddSourceSchemaDefaults(); - - builder.Services.AddGraphQLGatewayServer() - .AddInMemorySchema("uploads"); - - builder.WebHost.UseTestServer(); + var builder = new WebHostBuilder() + .ConfigureServices(services => + { + services.AddRouting(); - var app = builder.Build(); - app.MapGraphQL(); + services.AddGraphQLServer("uploads") + .AddQueryType() + .AddUploadType() + .AddSourceSchemaDefaults(); - await app.StartAsync(); + services.AddGraphQLGatewayServer() + .AddInMemorySchema("uploads"); + }) + .Configure(app => + { + app.UseRouting(); + app.UseEndpoints(endpoints => endpoints.MapGraphQL()); + }); - return app.GetTestServer(); + return Task.FromResult(new TestServer(builder)); } public void Dispose() diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_List_Of_Files.md b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_List_Of_Files.md new file mode 100644 index 00000000000..a51b95ef00b --- /dev/null +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_List_Of_Files.md @@ -0,0 +1,20 @@ +# Upload_List_Of_Files + +```text +{ + "data": { + "multiUpload": [ + { + "fileName": "test.txt", + "contentType": "text/plain", + "content": "abc" + }, + { + "fileName": "test2.pdf", + "contentType": "application/pdf", + "content": "def" + } + ] + } +} +``` diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_List_Of_Files_In_Input_Object.md b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_List_Of_Files_In_Input_Object.md new file mode 100644 index 00000000000..d4a6fb5b25f --- /dev/null +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_List_Of_Files_In_Input_Object.md @@ -0,0 +1,20 @@ +# Upload_List_Of_Files_In_Input_Object + +```text +{ + "data": { + "multiUploadWithInput": [ + { + "fileName": "test.txt", + "contentType": "text/plain", + "content": "abc" + }, + { + "fileName": "test2.pdf", + "contentType": "application/pdf", + "content": "abc" + } + ] + } +} +``` diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_List_Of_Files_In_Input_Object_Inline.md b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_List_Of_Files_In_Input_Object_Inline.md new file mode 100644 index 00000000000..2345391edac --- /dev/null +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_List_Of_Files_In_Input_Object_Inline.md @@ -0,0 +1,20 @@ +# Upload_List_Of_Files_In_Input_Object_Inline + +```text +{ + "data": { + "multiUploadWithInput": [ + { + "fileName": "test.txt", + "contentType": "text/plain", + "content": "abc" + }, + { + "fileName": "test2.pdf", + "contentType": "application/pdf", + "content": "def" + } + ] + } +} +``` diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_Single_File.md b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_Single_File.md new file mode 100644 index 00000000000..5903ee92086 --- /dev/null +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_Single_File.md @@ -0,0 +1,13 @@ +# Upload_Single_File + +```text +{ + "data": { + "singleUpload": { + "fileName": "test.txt", + "contentType": "text/plain", + "content": "abc" + } + } +} +``` diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_Single_File_In_Input_Object.md b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_Single_File_In_Input_Object.md new file mode 100644 index 00000000000..0b3fc653a60 --- /dev/null +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_Single_File_In_Input_Object.md @@ -0,0 +1,13 @@ +# Upload_Single_File_In_Input_Object + +```text +{ + "data": { + "singleUploadWithInput": { + "fileName": "test.txt", + "contentType": "text/plain", + "content": "abc" + } + } +} +``` diff --git a/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_Single_File_In_Input_Object_Inline.md b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_Single_File_In_Input_Object_Inline.md new file mode 100644 index 00000000000..765dcb13c0a --- /dev/null +++ b/src/HotChocolate/Fusion/test/Fusion.Connectors.InMemory.Tests/__snapshots__/FileUploadTests.Upload_Single_File_In_Input_Object_Inline.md @@ -0,0 +1,13 @@ +# Upload_Single_File_In_Input_Object_Inline + +```text +{ + "data": { + "singleUploadWithInput": { + "fileName": "test.txt", + "contentType": "text/plain", + "content": "abc" + } + } +} +``` From 2d3d6c5d8580bd927173dff4f91a5e113312b249 Mon Sep 17 00:00:00 2001 From: Michael Staib Date: Sat, 28 Mar 2026 21:54:09 +0000 Subject: [PATCH 7/7] Added better error handling --- .../SchemaCompositionException.cs | 47 ++++++++++++++++ .../InMemoryConfigurationProvider.cs | 53 ++++++++++--------- .../Execution/FusionRequestExecutorManager.cs | 10 ++-- 3 files changed, 82 insertions(+), 28 deletions(-) create mode 100644 src/HotChocolate/Fusion/src/Fusion.Composition/SchemaCompositionException.cs diff --git a/src/HotChocolate/Fusion/src/Fusion.Composition/SchemaCompositionException.cs b/src/HotChocolate/Fusion/src/Fusion.Composition/SchemaCompositionException.cs new file mode 100644 index 00000000000..31ca6f32a90 --- /dev/null +++ b/src/HotChocolate/Fusion/src/Fusion.Composition/SchemaCompositionException.cs @@ -0,0 +1,47 @@ +using HotChocolate.Fusion.Logging; + +namespace HotChocolate.Fusion.Composition; + +/// +/// The exception that is thrown when schema composition fails. +/// +public sealed class SchemaCompositionException : Exception +{ + /// + /// Initializes a new instance of . + /// + /// The composition log containing the errors. + public SchemaCompositionException(CompositionLog compositionLog) + : base(BuildMessage(compositionLog)) + { + CompositionLog = compositionLog; + } + + /// + /// Gets the composition log containing the detailed errors and warnings + /// encountered during schema composition. + /// + public CompositionLog CompositionLog { get; } + + private static string BuildMessage(CompositionLog log) + { + ArgumentNullException.ThrowIfNull(log); + + var messages = new List(); + + foreach (var entry in log) + { + if (entry.Severity == LogSeverity.Error) + { + messages.Add(entry.Message); + } + } + + return messages.Count switch + { + 0 => "Schema composition failed.", + 1 => $"Schema composition failed: {messages[0]}", + _ => $"Schema composition failed with {messages.Count} errors: {messages[0]}" + }; + } +} diff --git a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemoryConfigurationProvider.cs b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemoryConfigurationProvider.cs index a9ed20ee179..e4ecfbdc126 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemoryConfigurationProvider.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Connectors.InMemory/InMemoryConfigurationProvider.cs @@ -4,6 +4,7 @@ using System.Threading.Channels; using HotChocolate.Buffers; using HotChocolate.Execution; +using HotChocolate.Fusion.Composition; using HotChocolate.Fusion.Configuration; using HotChocolate.Fusion.Logging; using HotChocolate.Fusion.Options; @@ -137,23 +138,26 @@ private async Task ComposeLoopAsync(CancellationToken ct) sourceSchemas[i] = new SourceSchemaText(_schemaNames[i], sdl); } + var compositionLog = new CompositionLog(); var result = new SchemaComposer( sourceSchemas, new SchemaComposerOptions(), - new CompositionLog()).Compose(); + compositionLog).Compose(); if (result.IsFailure) { - continue; + NotifyError(new SchemaCompositionException(compositionLog)); + return; } var documentNode = result.Value.ToSyntaxNode(); var settings = new JsonDocumentOwner(defaultSettings, EmptyMemoryOwner.Instance); NotifyObservers(new FusionConfiguration(documentNode, settings)); } - catch + catch (Exception ex) { - // ignore and wait for next update + NotifyError(ex); + return; } } } @@ -179,6 +183,21 @@ private void NotifyObservers(FusionConfiguration configuration) } } + private void NotifyError(Exception exception) + { + ImmutableArray sessions; + + lock (_syncRoot) + { + sessions = _sessions; + } + + foreach (var session in sessions) + { + session.Error(exception); + } + } + /// public ValueTask DisposeAsync() { @@ -217,29 +236,13 @@ private sealed class ObserverSession( : IDisposable { public void Notify(FusionConfiguration schemaDocument) - { - try - { - observer.OnNext(schemaDocument); - } - catch (Exception ex) - { - observer.OnError(ex); - } - } + => observer.OnNext(schemaDocument); + + public void Error(Exception exception) + => observer.OnError(exception); public void Complete() - { - try - { - observer.OnCompleted(); - } - catch - { - // We do not want to throw an exception if the observer - // throws an exception on completion. - } - } + => observer.OnCompleted(); public void Dispose() => provider.Unsubscribe(this); diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/FusionRequestExecutorManager.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/FusionRequestExecutorManager.cs index e7218506abb..0236173ba24 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/FusionRequestExecutorManager.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/FusionRequestExecutorManager.cs @@ -228,7 +228,10 @@ private async Task WarmupExecutorAsync( var documentProvider = documentProviderFactory.Invoke(_applicationServices); var documentPromise = new TaskCompletionSource(); - using var subscription = documentProvider.Subscribe(s => documentPromise.TrySetResult(s)); + using var subscription = documentProvider.Subscribe( + onNext: s => documentPromise.TrySetResult(s), + onError: ex => documentPromise.TrySetException(ex), + onCompleted: () => documentPromise.TrySetCanceled()); await using var cancellation = cancellationToken.Register(() => documentPromise.TrySetCanceled()); return (await documentPromise.Task.ConfigureAwait(false), documentProvider); } @@ -721,8 +724,9 @@ public RequestExecutorRegistration( _settingsHash = XxHash64.HashToUInt64(GetRawUtf8Value(configuration.Settings.Document.RootElement)); _documentProviderSubscription = documentProvider.Subscribe( - OnDocumentChanged, - () => _channel.Writer.TryComplete()); + onNext: OnDocumentChanged, + onError: ex => _channel.Writer.TryComplete(ex), + onCompleted: () => _channel.Writer.TryComplete()); DocumentProvider = documentProvider; Executor = executor;