From d06a587bdac95332e4ff6879774c0c3db7c0333a Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Tue, 16 Jan 2024 13:22:11 -0800 Subject: [PATCH] RATIS-1934. Support Zero-Copy in GrpcServerProtocolService --- .../server/GrpcServerProtocolService.java | 71 ++++++++++++++++--- .../apache/ratis/grpc/server/GrpcService.java | 4 +- 2 files changed, 65 insertions(+), 10 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java index dde01c39a3..ebe764fac0 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -20,10 +20,13 @@ import java.util.function.Consumer; import java.util.function.Function; import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.grpc.metrics.ZeroCopyMetrics; +import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.server.util.ServerStringUtils; +import org.apache.ratis.thirdparty.io.grpc.ServerServiceDefinition; import org.apache.ratis.thirdparty.io.grpc.Status; import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; @@ -41,6 +44,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import static org.apache.ratis.grpc.GrpcUtil.addMethodWithCustomMarshaller; +import static org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.getAppendEntriesMethod; + class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class); @@ -48,8 +54,9 @@ static class PendingServerRequest { private final REQUEST request; private final CompletableFuture future = new CompletableFuture<>(); - PendingServerRequest(REQUEST request) { - this.request = request; + PendingServerRequest(ReferenceCountedObject requestRef) { + this.request = requestRef.retain(); + this.future.whenComplete((r, e) -> requestRef.release()); } REQUEST getRequest() { @@ -83,7 +90,21 @@ private String getPreviousRequestString() { .orElse(null); } - abstract CompletableFuture process(REQUEST request) throws IOException; + CompletableFuture process(REQUEST request) throws IOException { + throw new UnsupportedOperationException("This method is not supported."); + } + + CompletableFuture process(ReferenceCountedObject requestRef) + throws IOException { + try { + return process(requestRef.retain()); + } finally { + requestRef.release(); + } + } + + void release(REQUEST req) { + } abstract long getCallId(REQUEST request); @@ -120,22 +141,29 @@ void composeRequest(CompletableFuture current) { @Override public void onNext(REQUEST request) { + ReferenceCountedObject requestRef = ReferenceCountedObject.wrap(request, () -> {}, released -> { + if (released) { + release(request); + } + }); + if (!replyInOrder(request)) { try { - composeRequest(process(request).thenApply(this::handleReply)); + composeRequest(process(requestRef).thenApply(this::handleReply)); } catch (Exception e) { handleError(e, request); + release(request); } return; } - final PendingServerRequest current = new PendingServerRequest<>(request); + final PendingServerRequest current = new PendingServerRequest<>(requestRef); final PendingServerRequest previous = previousOnNext.getAndSet(current); final CompletableFuture previousFuture = Optional.ofNullable(previous) .map(PendingServerRequest::getFuture) .orElse(CompletableFuture.completedFuture(null)); try { - final CompletableFuture f = process(request).exceptionally(e -> { + final CompletableFuture f = process(requestRef).exceptionally(e -> { // Handle cases, such as RaftServer is paused handleError(e, request); current.getFuture().completeExceptionally(e); @@ -176,16 +204,35 @@ public void onError(Throwable t) { private final Supplier idSupplier; private final RaftServer server; + private final ZeroCopyMessageMarshaller zeroCopyRequestMarshaller; - GrpcServerProtocolService(Supplier idSupplier, RaftServer server) { + GrpcServerProtocolService(Supplier idSupplier, RaftServer server, ZeroCopyMetrics zeroCopyMetrics) { this.idSupplier = idSupplier; this.server = server; + this.zeroCopyRequestMarshaller = new ZeroCopyMessageMarshaller<>(AppendEntriesRequestProto.getDefaultInstance(), + zeroCopyMetrics::onZeroCopyMessage, zeroCopyMetrics::onNonZeroCopyMessage, zeroCopyMetrics::onReleasedMessage); } RaftPeerId getId() { return idSupplier.get(); } + ServerServiceDefinition bindServiceWithZeroCopy() { + ServerServiceDefinition orig = super.bindService(); + ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(orig.getServiceDescriptor().getName()); + + // Add appendEntries with zero copy marshaller. + addMethodWithCustomMarshaller(orig, builder, getAppendEntriesMethod(), zeroCopyRequestMarshaller); + // Add remaining methods as is. + orig.getMethods().stream().filter( + x -> !x.getMethodDescriptor().getFullMethodName().equals(getAppendEntriesMethod().getFullMethodName()) + ).forEach( + builder::addMethod + ); + + return builder.build(); + } + @Override public void requestVote(RequestVoteRequestProto request, StreamObserver responseObserver) { @@ -226,8 +273,14 @@ public StreamObserver appendEntries( return new ServerRequestStreamObserver( RaftServerProtocol.Op.APPEND_ENTRIES, responseObserver) { @Override - CompletableFuture process(AppendEntriesRequestProto request) throws IOException { - return server.appendEntriesAsync(ReferenceCountedObject.wrap(request)); + CompletableFuture process(ReferenceCountedObject requestRef) + throws IOException { + return server.appendEntriesAsync(requestRef); + } + + @Override + void release(AppendEntriesRequestProto req) { + zeroCopyRequestMarshaller.release(req); } @Override diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java index d89afd565e..d2dadcd90a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java @@ -214,8 +214,10 @@ private GrpcService(RaftServer raftServer, Supplier idSupplier, final NettyServerBuilder serverBuilder = startBuildingNettyServer(serverHost, serverPort, serverTlsConfig, grpcMessageSizeMax, flowControlWindow); + GrpcServerProtocolService serverProtocolService = new GrpcServerProtocolService(idSupplier, raftServer, + zeroCopyMetrics); serverBuilder.addService(ServerInterceptors.intercept( - new GrpcServerProtocolService(idSupplier, raftServer), serverInterceptor)); + serverProtocolService.bindServiceWithZeroCopy(), serverInterceptor)); if (!separateAdminServer) { addAdminService(raftServer, serverBuilder); }