From f5b09054af1af87b830f966df4a225bce40b127e Mon Sep 17 00:00:00 2001 From: guohao1 Date: Tue, 15 Feb 2022 11:51:24 +0800 Subject: [PATCH 1/2] RATIS-1519. When DataStreamManagement#read an exception occurs, remove DataStream --- .../netty/server/DataStreamManagement.java | 44 ++++++++++--------- .../ratis/datastream/DataStreamTestUtils.java | 7 ++- .../TestNettyDataStreamWithMock.java | 9 +++- 3 files changed, 37 insertions(+), 23 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index 37cf90e541..7a721dcea8 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -110,19 +110,19 @@ static class StreamInfo { private final boolean primary; private final LocalStream local; private final Set remotes; - private final RaftServer server; + private final Division division; @SuppressFBWarnings("NP_NULL_PARAM_DEREF") private final AtomicReference> previous = new AtomicReference<>(CompletableFuture.completedFuture(null)); - StreamInfo(RaftClientRequest request, boolean primary, CompletableFuture stream, RaftServer server, + StreamInfo(RaftClientRequest request, boolean primary, CompletableFuture stream, Division division, CheckedBiFunction, Set, IOException> getStreams) throws IOException { this.request = request; this.primary = primary; this.local = new LocalStream(stream); - this.server = server; - final Set successors = getSuccessors(server.getId()); + this.division = division; + final Set successors = getSuccessors(division.getId()); final Set outs = getStreams.apply(request, successors); this.remotes = outs.stream().map(RemoteStream::new).collect(Collectors.toSet()); } @@ -135,16 +135,12 @@ RaftClientRequest getRequest() { return request; } - Division getDivision() throws IOException { - return server.getDivision(request.getRaftGroupId()); + Division getDivision() { + return division; } Collection getCommitInfos() { - try { - return getDivision().getCommitInfos(); - } catch (IOException e) { - throw new IllegalStateException(e); - } + return getDivision().getCommitInfos(); } boolean isPrimary() { @@ -164,7 +160,7 @@ public String toString() { return JavaUtils.getClassSimpleName(getClass()) + ":" + request; } - private Set getSuccessors(RaftPeerId peerId) throws IOException { + private Set getSuccessors(RaftPeerId peerId) { final RaftConfiguration conf = getDivision().getRaftConf(); final RoutingTable routingTable = request.getRoutingTable(); @@ -176,7 +172,7 @@ private Set getSuccessors(RaftPeerId peerId) throws IOException { // Default start topology // get the other peers from the current configuration return conf.getCurrentPeers().stream() - .filter(p -> !p.getId().equals(server.getId())) + .filter(p -> !p.getId().equals(division.getId())) .collect(Collectors.toSet()); } @@ -246,7 +242,8 @@ private StreamInfo newStreamInfo(ByteBuf buf, final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest( RaftClientRequestProto.parseFrom(buf.nioBuffer())); final boolean isPrimary = server.getId().equals(request.getServerId()); - return new StreamInfo(request, isPrimary, computeDataStreamIfAbsent(request), server, getStreams); + final Division division = server.getDivision(request.getRaftGroupId()); + return new StreamInfo(request, isPrimary, computeDataStreamIfAbsent(request), division, getStreams); } catch (Throwable e) { throw new CompletionException(e); } @@ -380,10 +377,21 @@ void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, readImpl(request, ctx, buf, getStreams); } catch (Throwable t) { buf.release(); + removeDataStream(ClientInvocationId.valueOf(request.getClientId(), request.getStreamId()), null); throw t; } } + private void removeDataStream(ClientInvocationId invocationId, StreamInfo info) { + final StreamInfo removed = streams.remove(invocationId); + if (info == null) { + info = removed; + } + if (info != null) { + info.getDivision().getDataStreamMap().remove(invocationId); + } + } + private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf, CheckedBiFunction, Set, IOException> getStreams) { boolean close = WriteOption.containsOption(request.getWriteOptions(), StandardWriteOption.CLOSE); @@ -393,7 +401,6 @@ private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ct final MemoizedSupplier supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getStreams)); info = streams.computeIfAbsent(key, id -> supplier.get()); if (!supplier.isInitialized()) { - streams.remove(key); throw new IllegalStateException("Failed to create a new stream for " + request + " since a stream already exists Key: " + key + " StreamInfo:" + info); } @@ -402,10 +409,7 @@ private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ct () -> new IllegalStateException("Failed to remove StreamInfo for " + request)); } else { info = Optional.ofNullable(streams.get(key)).orElseThrow( - () -> { - streams.remove(key); - return new IllegalStateException("Failed to get StreamInfo for " + request); - }); + () -> new IllegalStateException("Failed to get StreamInfo for " + request)); } final CompletableFuture localWrite; @@ -439,7 +443,7 @@ private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ct }, requestExecutor)).whenComplete((v, exception) -> { try { if (exception != null) { - streams.remove(key); + removeDataStream(key, info); replyDataStreamException(server, exception, info.getRequest(), request, ctx); } } finally { diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java index b00e389656..47b6157a28 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java @@ -148,6 +148,7 @@ class MultiDataStreamStateMachine extends BaseStateMachine { @Override public CompletableFuture stream(RaftClientRequest request) { final SingleDataStream s = new SingleDataStream(request); + LOG.info("XXX {} put {}, {}", this, ClientInvocationId.valueOf(request), s); streams.put(ClientInvocationId.valueOf(request), s); return CompletableFuture.completedFuture(s); } @@ -176,7 +177,9 @@ SingleDataStream getSingleDataStream(RaftClientRequest request) { } SingleDataStream getSingleDataStream(ClientInvocationId invocationId) { - return streams.get(invocationId); + final SingleDataStream s = streams.get(invocationId); + LOG.info("XXX {}: get {} return {}", this, invocationId, s); + return s; } Collection getStreams() { @@ -325,6 +328,8 @@ static CompletableFuture writeAndCloseAndAssertReplies( static void assertHeader(RaftServer server, RaftClientRequest header, int dataSize, boolean stepDownLeader) throws Exception { + LOG.info("XXX {}: dataSize={}, stepDownLeader={}, header={}", + server.getId(), dataSize, stepDownLeader, header); // check header Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), header.getType()); diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java index 39dbf4bf30..daca84e26a 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java @@ -68,13 +68,18 @@ public void setup() { RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY); } - RaftServer.Division mockDivision(RaftServer server, RaftClient client) { + RaftServer.Division mockDivision(RaftServer server, RaftGroupId groupId, RaftClient client) { final RaftServer.Division division = mock(RaftServer.Division.class); when(division.getRaftServer()).thenReturn(server); when(division.getRaftClient()).thenReturn(client); when(division.getRaftConf()).thenAnswer(i -> getRaftConf()); final MultiDataStreamStateMachine stateMachine = new MultiDataStreamStateMachine(); + try { + stateMachine.initialize(server, groupId, null); + } catch (IOException e) { + throw new IllegalStateException(e); + } when(division.getStateMachine()).thenReturn(stateMachine); final DataStreamMap streamMap = RaftServerTestUtil.newDataStreamMap(server.getId()); @@ -110,7 +115,7 @@ private void testMockCluster(int numServers, RaftException leaderException, AsyncRpcApi asyncRpcApi = Mockito.mock(AsyncRpcApi.class); when(client.async()).thenReturn(asyncRpcApi); - final RaftServer.Division myDivision = mockDivision(raftServer, client); + final RaftServer.Division myDivision = mockDivision(raftServer, groupId, client); when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenReturn(myDivision); if (submitException != null) { From fbc9b3aed27f658ad0da82422628b979372a3666 Mon Sep 17 00:00:00 2001 From: guohao1 Date: Tue, 15 Feb 2022 16:04:42 +0800 Subject: [PATCH 2/2] trigger new CI