From b3f548a7ce91ed0a6769c6512f9d869d8c4be208 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 Apr 2026 09:33:17 +0800 Subject: [PATCH 1/2] fix --- .../assigner/PipeDataRegionAssigner.java | 2 +- .../disruptor/BatchEventProcessor.java | 53 +++-- .../realtime/disruptor/Disruptor.java | 4 + .../realtime/disruptor/SequenceBarrier.java | 4 + .../PipeInsertionDataNodeListener.java | 39 ++-- .../disruptor/DisruptorShutdownTest.java | 183 ++++++++++++++++++ 6 files changed, 257 insertions(+), 28 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index bdeebde8938e4..f40de994c315f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -256,7 +256,7 @@ public void invalidateCache() { matcher.invalidateCache(); } - public boolean notMoreExtractorNeededToBeAssigned() { + public boolean notMoreSourceNeededToBeAssigned() { return matcher.getRegisterCount() == 0; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java index 34930be977e16..d0432821cf77c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java @@ -69,7 +69,6 @@ public void halt() { @Override public void run() { - T event = null; long nextSequence = sequence.get() + 1L; while (running) { @@ -78,29 +77,59 @@ public void run() { final long availableSequence = sequenceBarrier.waitFor(nextSequence); // Batch process all available events - while (nextSequence <= availableSequence) { - event = ringBuffer.get(nextSequence); - eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); - nextSequence++; - } - - // Update sequence - sequence.set(availableSequence); + nextSequence = processAvailableEvents(nextSequence, availableSequence); } catch (final InterruptedException ex) { - Thread.currentThread().interrupt(); - LOGGER.info("Processor interrupted"); + if (running) { + Thread.currentThread().interrupt(); + LOGGER.info("Processor interrupted"); + } break; } catch (final Throwable ex) { - exceptionHandler.handleEventException(ex, nextSequence, event); + exceptionHandler.handleEventException(ex, nextSequence, ringBuffer.get(nextSequence)); sequence.set(nextSequence); nextSequence++; } } + if (!running) { + drainRemainingPublishedEvents(nextSequence); + } LOGGER.info("Processor stopped"); } + private long processAvailableEvents(long nextSequence, long availableSequence) throws Throwable { + while (nextSequence <= availableSequence) { + final T event = ringBuffer.get(nextSequence); + eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); + nextSequence++; + } + + sequence.set(availableSequence); + return nextSequence; + } + + private void drainRemainingPublishedEvents(long nextSequence) { + final long availableSequence = sequenceBarrier.getCursor(); + if (availableSequence < nextSequence) { + return; + } + + final long highestPublishedSequence = + sequenceBarrier.getHighestPublishedSequence(nextSequence, availableSequence); + while (nextSequence <= highestPublishedSequence) { + final T event = ringBuffer.get(nextSequence); + try { + eventHandler.onEvent(event, nextSequence, nextSequence == highestPublishedSequence); + } catch (final Throwable ex) { + exceptionHandler.handleEventException(ex, nextSequence, event); + } finally { + sequence.set(nextSequence); + } + nextSequence++; + } + } + private static class DefaultExceptionHandler implements ExceptionHandler { @Override public void handleEventException(Throwable ex, long sequence, T event) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java index 57c6e853f6100..f3a647012856e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java @@ -122,11 +122,15 @@ public void shutdown() { if (processorThread != null) { try { + processorThread.interrupt(); processorThread.join(5000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.warn("Interrupted waiting for processor to stop"); } + if (processorThread.isAlive()) { + LOGGER.warn("Timed out waiting for processor to stop"); + } } started = false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java index 4c8011eb1c225..80f41162fc73a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java @@ -75,4 +75,8 @@ public long waitFor(long sequence) throws InterruptedException { public long getCursor() { return sequencer.getCursor().get(); } + + public long getHighestPublishedSequence(long lowerBound, long availableSequence) { + return sequencer.getHighestPublishedSequence(lowerBound, availableSequence); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index 3cce521a51e0b..72d76bf5b2f04 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -72,25 +72,34 @@ public synchronized void startListenAndAssign( public synchronized void stopListenAndAssign( final int dataRegionId, final PipeRealtimeDataRegionSource extractor) { - final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); - if (assigner == null) { - return; - } + PipeDataRegionAssigner assignerToClose = null; - assigner.stopAssignTo(extractor); + synchronized (this) { + final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); + if (assigner == null) { + return; + } - if (extractor.isNeedListenToTsFile()) { - listenToTsFileExtractorCount.decrementAndGet(); - } - if (extractor.isNeedListenToInsertNode()) { - listenToInsertNodeExtractorCount.decrementAndGet(); + assigner.stopAssignTo(extractor); + + if (extractor.isNeedListenToTsFile()) { + listenToTsFileExtractorCount.decrementAndGet(); + } + if (extractor.isNeedListenToInsertNode()) { + listenToInsertNodeExtractorCount.decrementAndGet(); + } + + if (assigner.notMoreSourceNeededToBeAssigned()) { + // The removed assigner will is the same as the one referenced by the variable `assigner` + dataRegionId2Assigner.remove(dataRegionId); + // This will help to release the memory occupied by the assigner + assigner.close(); + } } - if (assigner.notMoreExtractorNeededToBeAssigned()) { - // The removed assigner will is the same as the one referenced by the variable `assigner` - dataRegionId2Assigner.remove(dataRegionId); - // This will help to release the memory occupied by the assigner - assigner.close(); + if (assignerToClose != null) { + // Closing the disruptor may block for a while, so keep it out of the global listener lock. + assignerToClose.close(); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java new file mode 100644 index 0000000000000..3fd40c4d4f2b5 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class DisruptorShutdownTest { + + @Test + public void testBatchEventProcessorDrainsPublishedEventsOnShutdownInterrupt() throws Exception { + final RingBuffer ringBuffer = RingBuffer.createMultiProducer(TestEvent::new, 32); + ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1); + + final TestSequenceBarrier barrier = new TestSequenceBarrier(0L); + final AtomicInteger handledEventCount = new AtomicInteger(); + final BatchEventProcessor processor = + new BatchEventProcessor<>( + ringBuffer, + barrier, + (event, sequence, endOfBatch) -> handledEventCount.incrementAndGet()); + + final Thread processorThread = new Thread(processor, "pipe-batch-event-processor-test"); + processorThread.start(); + + Assert.assertTrue(barrier.awaitWaitForCall()); + processor.halt(); + barrier.interruptWait(); + + processorThread.join(TimeUnit.SECONDS.toMillis(5)); + + Assert.assertFalse(processorThread.isAlive()); + Assert.assertEquals(1, handledEventCount.get()); + Assert.assertEquals(0L, processor.getSequence().get()); + } + + @Test + public void testBatchEventProcessorDrainsEventsPublishedAfterCurrentBatchWhenHalting() + throws Exception { + final RingBuffer ringBuffer = RingBuffer.createMultiProducer(TestEvent::new, 32); + ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1); + ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 2); + + final SnapshotSequenceBarrier barrier = new SnapshotSequenceBarrier(0L, 1L); + final AtomicInteger handledEventCount = new AtomicInteger(); + final AtomicReference> processorReference = + new AtomicReference<>(); + final BatchEventProcessor processor = + new BatchEventProcessor<>( + ringBuffer, + barrier, + (event, sequence, endOfBatch) -> { + handledEventCount.incrementAndGet(); + if (event.value == 1) { + processorReference.get().halt(); + } + }); + processorReference.set(processor); + + final Thread processorThread = + new Thread(processor, "pipe-batch-event-processor-snapshot-test"); + processorThread.start(); + processorThread.join(TimeUnit.SECONDS.toMillis(5)); + + Assert.assertFalse(processorThread.isAlive()); + Assert.assertEquals(2, handledEventCount.get()); + Assert.assertEquals(1L, processor.getSequence().get()); + } + + @Test + public void testDisruptorShutdownInterruptsWaitingProcessor() throws Exception { + final AtomicReference processorThreadReference = new AtomicReference<>(); + final ThreadFactory threadFactory = + runnable -> { + final Thread thread = new Thread(runnable, "pipe-disruptor-shutdown-test"); + processorThreadReference.set(thread); + return thread; + }; + + final Disruptor disruptor = new Disruptor<>(TestEvent::new, 32, threadFactory); + disruptor.handleEventsWith((event, sequence, endOfBatch) -> {}); + disruptor.start(); + + final Thread processorThread = processorThreadReference.get(); + Assert.assertNotNull(processorThread); + + TimeUnit.MILLISECONDS.sleep(50); + disruptor.shutdown(); + + Assert.assertFalse(processorThread.isAlive()); + } + + private static class TestEvent { + private int value; + } + + private static class TestSequenceBarrier extends SequenceBarrier { + + private final long cursor; + private final CountDownLatch waitForCalled = new CountDownLatch(1); + private final CountDownLatch interruptWait = new CountDownLatch(1); + + private TestSequenceBarrier(final long cursor) { + super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]); + this.cursor = cursor; + } + + @Override + public long waitFor(final long sequence) throws InterruptedException { + waitForCalled.countDown(); + interruptWait.await(); + throw new InterruptedException(); + } + + @Override + public long getCursor() { + return cursor; + } + + @Override + public long getHighestPublishedSequence(final long lowerBound, final long availableSequence) { + return availableSequence; + } + + private boolean awaitWaitForCall() throws InterruptedException { + return waitForCalled.await(5, TimeUnit.SECONDS); + } + + private void interruptWait() { + interruptWait.countDown(); + } + } + + private static class SnapshotSequenceBarrier extends SequenceBarrier { + + private final long waitForResult; + private final long cursor; + + private SnapshotSequenceBarrier(final long waitForResult, final long cursor) { + super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]); + this.waitForResult = waitForResult; + this.cursor = cursor; + } + + @Override + public long waitFor(final long sequence) { + return waitForResult; + } + + @Override + public long getCursor() { + return cursor; + } + + @Override + public long getHighestPublishedSequence(final long lowerBound, final long availableSequence) { + return availableSequence; + } + } +} From 6d591e97fc7de53bbcfb4539b4fab6a8add1dc13 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 Apr 2026 09:56:23 +0800 Subject: [PATCH 2/2] fix --- .../realtime/listener/PipeInsertionDataNodeListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index 72d76bf5b2f04..fded546d87d65 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -93,7 +93,7 @@ public synchronized void stopListenAndAssign( // The removed assigner will is the same as the one referenced by the variable `assigner` dataRegionId2Assigner.remove(dataRegionId); // This will help to release the memory occupied by the assigner - assigner.close(); + assignerToClose = assigner; } }