Wait for consensus start before answering region requests#17546
Wait for consensus start before answering region requests#17546CRZbulabula merged 4 commits intomasterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17546 +/- ##
============================================
+ Coverage 39.80% 39.82% +0.01%
Complexity 312 312
============================================
Files 5142 5147 +5
Lines 347882 348088 +206
Branches 44404 44430 +26
============================================
+ Hits 138489 138618 +129
- Misses 209393 209470 +77 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
CRZbulabula
left a comment
There was a problem hiding this comment.
Code Review
Overview
This PR adds a defensive guard to DataNode internal RPCs: before handling any region management request (create/delete region, change leader, add/remove/reset peer, notify migration), the handler polls until consensus is initialized or rejects after 30 seconds with CONSENSUS_NOT_INITIALIZED.
Three layers of change:
Await/ConditionAwaiter— a custom lightweight polling utility innode-commons(Awaitility clone for production code, since Awaitility is test-scoped)DataNodeContext— inner class onDataNodeexposingisAllConsensusStarted()viavolatileflags- RPC guards —
waitForConsensusStarted()check inserted into 10 region management RPCs
The existing status code CONSENSUS_NOT_INITIALIZED (904) is already in the NEED_RETRY set in StatusUtils, so ConfigNode will automatically retry these requests — good.
Issues & Suggestions
1. Race condition: getImpl() can be called before setDataNodeContext() (Bug — High)
DataNodeInternalRPCService.getImpl() uses compareAndSet:
public DataNodeInternalRPCServiceImpl getImpl() {
impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl(dataNodeContext));
return impl.get();
}If getImpl() is called before setDataNodeContext(), the impl is created with dataNodeContext = null. The compareAndSet then prevents it from ever being recreated. A subsequent waitForConsensusStarted() call will NPE on dataNodeContext.isAllConsensusStarted().
This can happen: ClusterConfigTaskExecutor.java:1292 calls DataNodeInternalRPCService.getInstance().getImpl().clearCacheImpl(...) — if this runs before registerInternalRPCService() sets the context, the impl is permanently null-contexted.
Suggestion: Either:
- Reset
impltonullinsetDataNodeContext()so it's recreated on nextgetImpl()call - Or add a null-check in
waitForConsensusStarted()(treat null context as "not started") - Or inject
DataNodeContextviagetImpl()parameter instead of storing it on the service
2. Blocking RPC threads for 30 seconds with Thread.sleep (Performance — Medium)
waitForConsensusStarted() polls with Thread.sleep(100ms) for up to 30 seconds, blocking the Thrift handler thread. If ConfigNode dispatches multiple region requests simultaneously to a restarting DataNode, each one blocks a handler thread.
Suggestion: Replace polling with a CountDownLatch or java.util.concurrent.locks.Condition that is signaled when consensus starts. This would wake all waiting threads immediately instead of polling:
private final CountDownLatch consensusLatch = new CountDownLatch(1);
// In DataNodeContext: call consensusLatch.countDown() when ready
// In waitForConsensusStarted(): consensusLatch.await(30, TimeUnit.SECONDS)This is also more efficient (no CPU wakeups every 100ms) and would eliminate the need for the custom Await utility in this use case.
3. DataNodeContext should be a static inner class or interface (Design)
DataNodeContext is a non-static inner class, so it holds an implicit reference to the DataNode instance. This works but:
- Makes it impossible to instantiate without a
DataNodeinstance (complicates testing — hence the need for Mockito) - Couples the RPC service to the
DataNodeclass directly
Suggestion: Extract an interface (e.g., ConsensusStateProvider) or make it a static inner class that receives the volatile references explicitly:
public interface ConsensusStateProvider {
boolean isAllConsensusStarted();
}4. null return convention from waitForConsensusStarted() (Style — Minor)
Returning null to mean "success" and a non-null TSStatus to mean "failure" is error-prone — easy to forget the null check at call sites. Consider Optional<TSStatus> or a helper that throws (letting the caller handle the exception once).
5. consensusWaitTimeoutSeconds is mutable without synchronization (Minor)
private long consensusWaitTimeoutSeconds = 30;
// ...
public void setConsensusWaitTimeoutSeconds(long consensusWaitTimeoutSeconds) {
this.consensusWaitTimeoutSeconds = consensusWaitTimeoutSeconds;
}This non-final, non-volatile field is written from test threads and read from RPC handler threads. In practice the setter is only called before any RPC handling, so this is safe, but it should be either volatile or passed via constructor. Annotating with @VisibleForTesting would also clarify intent.
6. Await/ConditionAwaiter — well-built but consider justification (Design)
The custom utility is ~200 lines duplicating Awaitility's core API. Awaitility is already a managed dependency (4.2.0 in the parent POM), just test-scoped. Two options:
- Promote Awaitility to
compilescope (simplest, but adds a production dependency) - Keep the custom utility (current approach — no new dependency, but maintenance cost)
The current approach is reasonable, but worth a comment in Await.java explaining why Awaitility isn't used (test-scope only).
7. Missing test: consensus becomes available mid-wait (Test coverage)
ConsensusWaitTest covers:
- Consensus not started → timeout rejection ✓
But doesn't test:
- Consensus becomes available during the 30s wait → request should succeed
This is the primary happy-path scenario the feature is designed for. A test with a ScheduledExecutorService flipping the mock after a short delay would validate the polling behavior end-to-end.
Positives
- Correct use of
volatileon the consensus flags for cross-thread visibility CONSENSUS_NOT_INITIALIZEDis already inNEED_RETRY— seamless ConfigNode retry- Comprehensive unit tests for the
Awaitutility (8 test cases covering timeout, delay, exception handling,untilAsserted,forever) - Clean separation of concerns (utility / context / guard)
- Existing
DataNodeInternalRPCServiceImplTestcorrectly updated to pass mocked context
Summary
The defensive guard is a sound idea for protecting against startup race conditions. The main concerns are:
- Bug:
getImpl()race can create the impl with null context → NPE (must fix) - Performance: Polling with
Thread.sleepblocks handler threads — considerCountDownLatch - Test gap: No test for the primary scenario (consensus becomes ready mid-wait)
- Design:
DataNodeContextas non-static inner class is awkward for testing — extract interface



Summary
Await/ConditionAwaiter) innode-commonsthat provides a fluent API for waiting until a condition becomes true, with configurable timeout and poll interval.DataNodeContextinner class inDataNodeto expose consensus initialization state (isAllConsensusStarted()), withvolatilevisibility on the underlying flags.DataNodeInternalRPCServiceImpl(create/delete region, change leader, add/remove/delete/reset peer, notify migration) with awaitForConsensusStarted()check that polls up to 30 seconds before rejecting withCONSENSUS_NOT_INITIALIZED.Test plan
Await/ConditionAwaitercovering: already-true condition, condition becomes true, timeout, poll delay, exception ignoring,untilAsserted,forevermode (AwaitTest)createSchemaRegion,createDataRegion,deleteRegion,changeRegionLeaderall reject withCONSENSUS_NOT_INITIALIZEDwhen consensus is not started (ConsensusWaitTest)DataNodeInternalRPCServiceImplTestupdated to pass a mockedDataNodeContextwith consensus started