Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public void invalidateCache() {
matcher.invalidateCache();
}

public boolean notMoreExtractorNeededToBeAssigned() {
public boolean notMoreSourceNeededToBeAssigned() {
return matcher.getRegisterCount() == 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@

@Override
public void run() {
T event = null;
long nextSequence = sequence.get() + 1L;

while (running) {
Expand All @@ -78,29 +77,59 @@
final long availableSequence = sequenceBarrier.waitFor(nextSequence);

// Batch process all available events
while (nextSequence <= availableSequence) {
event = ringBuffer.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}

// Update sequence
sequence.set(availableSequence);
nextSequence = processAvailableEvents(nextSequence, availableSequence);

} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
LOGGER.info("Processor interrupted");
if (running) {
Thread.currentThread().interrupt();
LOGGER.info("Processor interrupted");
}
break;
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, event);
exceptionHandler.handleEventException(ex, nextSequence, ringBuffer.get(nextSequence));
sequence.set(nextSequence);
nextSequence++;
}
}

if (!running) {
drainRemainingPublishedEvents(nextSequence);
}
LOGGER.info("Processor stopped");
}

private long processAvailableEvents(long nextSequence, long availableSequence) throws Throwable {

Check warning on line 101 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace generic exceptions with specific library exceptions or a custom exception.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ3MslClL8INsO5yWgxO&open=AZ3MslClL8INsO5yWgxO&pullRequest=17549
while (nextSequence <= availableSequence) {
final T event = ringBuffer.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}

sequence.set(availableSequence);
return nextSequence;
}

private void drainRemainingPublishedEvents(long nextSequence) {
final long availableSequence = sequenceBarrier.getCursor();
if (availableSequence < nextSequence) {
return;
}

final long highestPublishedSequence =
sequenceBarrier.getHighestPublishedSequence(nextSequence, availableSequence);
while (nextSequence <= highestPublishedSequence) {
final T event = ringBuffer.get(nextSequence);
try {
eventHandler.onEvent(event, nextSequence, nextSequence == highestPublishedSequence);
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, event);
} finally {
sequence.set(nextSequence);
}
nextSequence++;
}
}

private static class DefaultExceptionHandler<T> implements ExceptionHandler<T> {
@Override
public void handleEventException(Throwable ex, long sequence, T event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,15 @@ public void shutdown() {

if (processorThread != null) {
try {
processorThread.interrupt();
processorThread.join(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted waiting for processor to stop");
}
if (processorThread.isAlive()) {
LOGGER.warn("Timed out waiting for processor to stop");
}
}

started = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,8 @@ public long waitFor(long sequence) throws InterruptedException {
public long getCursor() {
return sequencer.getCursor().get();
}

public long getHighestPublishedSequence(long lowerBound, long availableSequence) {
return sequencer.getHighestPublishedSequence(lowerBound, availableSequence);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,34 @@ public synchronized void startListenAndAssign(

public synchronized void stopListenAndAssign(
final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
if (assigner == null) {
return;
}
PipeDataRegionAssigner assignerToClose = null;

assigner.stopAssignTo(extractor);
synchronized (this) {
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
if (assigner == null) {
return;
}

if (extractor.isNeedListenToTsFile()) {
listenToTsFileExtractorCount.decrementAndGet();
}
if (extractor.isNeedListenToInsertNode()) {
listenToInsertNodeExtractorCount.decrementAndGet();
assigner.stopAssignTo(extractor);

if (extractor.isNeedListenToTsFile()) {
listenToTsFileExtractorCount.decrementAndGet();
}
if (extractor.isNeedListenToInsertNode()) {
listenToInsertNodeExtractorCount.decrementAndGet();
}

if (assigner.notMoreSourceNeededToBeAssigned()) {
// The removed assigner will is the same as the one referenced by the variable `assigner`
dataRegionId2Assigner.remove(dataRegionId);
// This will help to release the memory occupied by the assigner
assignerToClose = assigner;
}
}

if (assigner.notMoreExtractorNeededToBeAssigned()) {
// The removed assigner will is the same as the one referenced by the variable `assigner`
dataRegionId2Assigner.remove(dataRegionId);
// This will help to release the memory occupied by the assigner
assigner.close();
if (assignerToClose != null) {
// Closing the disruptor may block for a while, so keep it out of the global listener lock.
assignerToClose.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;

import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class DisruptorShutdownTest {

@Test
public void testBatchEventProcessorDrainsPublishedEventsOnShutdownInterrupt() throws Exception {
final RingBuffer<TestEvent> ringBuffer = RingBuffer.createMultiProducer(TestEvent::new, 32);
ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1);

final TestSequenceBarrier barrier = new TestSequenceBarrier(0L);
final AtomicInteger handledEventCount = new AtomicInteger();
final BatchEventProcessor<TestEvent> processor =
new BatchEventProcessor<>(
ringBuffer,
barrier,
(event, sequence, endOfBatch) -> handledEventCount.incrementAndGet());

final Thread processorThread = new Thread(processor, "pipe-batch-event-processor-test");
processorThread.start();

Assert.assertTrue(barrier.awaitWaitForCall());
processor.halt();
barrier.interruptWait();

processorThread.join(TimeUnit.SECONDS.toMillis(5));

Assert.assertFalse(processorThread.isAlive());
Assert.assertEquals(1, handledEventCount.get());
Assert.assertEquals(0L, processor.getSequence().get());
}

@Test
public void testBatchEventProcessorDrainsEventsPublishedAfterCurrentBatchWhenHalting()
throws Exception {
final RingBuffer<TestEvent> ringBuffer = RingBuffer.createMultiProducer(TestEvent::new, 32);
ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1);
ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 2);

final SnapshotSequenceBarrier barrier = new SnapshotSequenceBarrier(0L, 1L);
final AtomicInteger handledEventCount = new AtomicInteger();
final AtomicReference<BatchEventProcessor<TestEvent>> processorReference =
new AtomicReference<>();
final BatchEventProcessor<TestEvent> processor =
new BatchEventProcessor<>(
ringBuffer,
barrier,
(event, sequence, endOfBatch) -> {
handledEventCount.incrementAndGet();
if (event.value == 1) {
processorReference.get().halt();
}
});
processorReference.set(processor);

final Thread processorThread =
new Thread(processor, "pipe-batch-event-processor-snapshot-test");
processorThread.start();
processorThread.join(TimeUnit.SECONDS.toMillis(5));

Assert.assertFalse(processorThread.isAlive());
Assert.assertEquals(2, handledEventCount.get());
Assert.assertEquals(1L, processor.getSequence().get());
}

@Test
public void testDisruptorShutdownInterruptsWaitingProcessor() throws Exception {
final AtomicReference<Thread> processorThreadReference = new AtomicReference<>();
final ThreadFactory threadFactory =
runnable -> {
final Thread thread = new Thread(runnable, "pipe-disruptor-shutdown-test");
processorThreadReference.set(thread);
return thread;
};

final Disruptor<TestEvent> disruptor = new Disruptor<>(TestEvent::new, 32, threadFactory);
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {});
disruptor.start();

final Thread processorThread = processorThreadReference.get();
Assert.assertNotNull(processorThread);

TimeUnit.MILLISECONDS.sleep(50);
disruptor.shutdown();

Assert.assertFalse(processorThread.isAlive());
}

private static class TestEvent {
private int value;
}

private static class TestSequenceBarrier extends SequenceBarrier {

private final long cursor;
private final CountDownLatch waitForCalled = new CountDownLatch(1);
private final CountDownLatch interruptWait = new CountDownLatch(1);

private TestSequenceBarrier(final long cursor) {
super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]);
this.cursor = cursor;
}

@Override
public long waitFor(final long sequence) throws InterruptedException {
waitForCalled.countDown();
interruptWait.await();
throw new InterruptedException();
}

@Override
public long getCursor() {
return cursor;
}

@Override
public long getHighestPublishedSequence(final long lowerBound, final long availableSequence) {
return availableSequence;
}

private boolean awaitWaitForCall() throws InterruptedException {
return waitForCalled.await(5, TimeUnit.SECONDS);
}

private void interruptWait() {
interruptWait.countDown();
}
}

private static class SnapshotSequenceBarrier extends SequenceBarrier {

private final long waitForResult;
private final long cursor;

private SnapshotSequenceBarrier(final long waitForResult, final long cursor) {
super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]);
this.waitForResult = waitForResult;
this.cursor = cursor;
}

@Override
public long waitFor(final long sequence) {
return waitForResult;
}

@Override
public long getCursor() {
return cursor;
}

@Override
public long getHighestPublishedSequence(final long lowerBound, final long availableSequence) {
return availableSequence;
}
}
}
Loading