Skip to content

refactor(submitter): concurrent submitter#3287

Draft
julienrbrt wants to merge 2 commits intomainfrom
julien/speedup-submitter
Draft

refactor(submitter): concurrent submitter#3287
julienrbrt wants to merge 2 commits intomainfrom
julien/speedup-submitter

Conversation

@julienrbrt
Copy link
Copy Markdown
Member

Overview

Attempt to improve submitter by doing concurrent sends. We don't need to wait for the answer and we would submit.
Useful when the throughput of blobs needs to be high.

Related to #3244 as Fiber takes time to return, so it is necessary there. This PR is mainly to investigate if we can generalize that improvement to mainline evnode.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 24, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 730951e8-ffd7-4636-bf97-97a89e3b7e4d

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch julien/speedup-submitter

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 24, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedApr 24, 2026, 9:52 PM

@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Apr 24, 2026

Claude finished @julienrbrt's task in 4m 8s —— View job


Code Review

  • Gather context and understand the PR
  • Read changed files
  • Analyze concurrency patterns and correctness
  • Review error handling and resilience
  • Assess performance and resource efficiency
  • Check testing adequacy
  • Post comprehensive review

Overview

This PR shifts SubmitHeaders / SubmitData from synchronous (caller blocks until DA responds) to asynchronous (items are queued into a per-namespace channel; a dedicated worker goroutine does the actual DA submission). The design is sound for high-throughput scenarios, but there are a few correctness and usability issues worth addressing before merging.


Bugs

1. Close() is not idempotent — double-close deadlocks

Close() guards with if s.workerCancel == nil { return }, but never sets workerCancel to nil after using it. After the first call the workers have exited, so on a second call the sends to the unbuffered headerFlushCh / dataFlushCh block forever. Stop() calls Close() unconditionally, and if anything else also calls Close() (e.g. the traced wrapper's Close just delegates), this will hang.

// da_submitter.go:224
func (s *DASubmitter) Close() {
    if s.workerCancel == nil {
        return
    }
    headerDone := make(chan struct{})
    s.headerFlushCh <- headerDone   // ← blocks forever on second call
    ...
    s.workerCancel()
    // workerCancel is never set to nil → guard is useless
}

Fix: use sync.Once, or nil out s.workerCancel before returning. Fix this →


2. lastHeaderSubmit / lastDataSubmit updated at queue time, not at actual submit time

In submitter.go the ticker goroutine does:

if err := s.daSubmitter.SubmitHeaders(...); err != nil {
    ...
} else {
    s.lastHeaderSubmit.Store(time.Now().UnixNano())  // recorded when queued
}

SubmitHeaders now returns as soon as the item is enqueued, so lastHeaderSubmit is always updated even if the DA submission ultimately fails. On the next tick the batching strategy thinks "we just submitted", suppressing the retry for a full backoff window. Under high failure rates this effectively stalls the retry loop.

Consider updating the timestamp inside onSuccess (only when count > 0) and exposing a way for the worker to feed it back, or at minimum document this as an intentional trade-off. Fix this →


Design / Architecture

3. onSuccess semantics are misleading

The callback is named onSuccess but is also invoked on every failure path with (0, 0):

// oversized item, context cancellation, max-attempts exhausted, panic recovery…
if g.onSuccess != nil {
    g.onSuccess(0, 0)   // called on failure
}

Callers guard with if count > 0, so failures are silently swallowed. Renaming to onComplete (or splitting into onSuccess / onFailure) would make intent clearer and prevent future callers from accidentally ignoring the failure signal.


4. Captured ctx from callers may be stale by the time callbacks run

// da_submitter.go:547
postSubmit := s.makeHeaderPostSubmit(ctx, cache)   // ctx from SubmitHeaders caller
s.headerSubmitCh <- &batchGroup{
    onSuccess: func(count int, daHeight uint64) {
        postSubmit(...)   // may run seconds/minutes later in the worker
    },
}

If the Submitter's own context is cancelled (e.g. during Stop()), AppendDAHint inside the callback will silently fail. The error is logged so it degrades gracefully, but the DA hint store will be inconsistent. Passing the worker's context (from runNamespaceWorker) into the callback, or using context.WithoutCancel(ctx) when building postSubmit, would be more robust.


5. truncateGroups / advanceGroups silently skip remaining items when makeRemaining == nil

The nested batchGroup created inside each makeRemaining closure has makeRemaining: nil:

makeRemaining: func(submittedCount int, remainingMarshaled [][]byte) *batchGroup {
    remainingHeaders := headers[submittedCount:]
    return &batchGroup{
        marshaled:     remainingMarshaled,
        onSuccess:     ...,
        makeRemaining: nil,   // ← can't be split further
    }
},

If a merged batch containing one of these nested groups hits StatusTooBig and truncateGroups is called:

// da_submitter.go:469
if g.makeRemaining != nil { ... }
return groups[:i]   // group i's items are excluded without ever calling onSuccess

Those items are not re-queued; they remain pending in the cache and will be picked up on the next tick, so correctness is preserved — but the silent skip is hard to reason about. A comment explaining why this is intentional (or an assertion) would help.


Concurrency

6. runNamespaceWorker: drain-on-cancel calls processBatch with a done context

case <-ctx.Done():
    drainChannel(ch, func(g *batchGroup) {
        s.processBatch(ctx, []*batchGroup{g}, itemType, namespace)  // ctx already cancelled
    })

waitForBackoffOrContext with backoff == 0 uses a non-blocking select that returns nil even on a cancelled context, so the first retry attempt goes straight to client.Submit. In practice this path is only hit after Close() has already flushed via headerFlushCh, so the channel should be empty — but the behaviour is fragile and worth a comment.


7. SubmitHeaders / SubmitData block if the queue is full

s.headerSubmitCh <- &batchGroup{...}   // blocks when queue is full

With submitQueueSize = 256, a backpressure spike (slow DA layer) will block the goroutine holding headerSubmissionMtx in daSubmissionLoop. This is better than an unbounded queue, but the caller has no way to propagate a timeout or respect cancellation here. Consider a context-aware send:

select {
case s.headerSubmitCh <- group:
case <-ctx.Done():
    return ctx.Err()
}

Fix this →


Testing

8. Missing test coverage for the new async paths

The updated tests verify basic success with Close() to flush, but don't cover:

  • Batch merging: two concurrent SubmitHeaders calls draining into a single processBatch invocation — this is the primary new behaviour.
  • StatusTooBig with multiple groups: verifies that the group-split/advance logic is correct across merged batches.
  • Context cancellation during queue send: ensures SubmitHeaders doesn't hang if the context is done and the queue is full.
  • Panic recovery: processBatchSafe catches panics and calls all onSuccess(0,0) — worth a regression test.
  • Concurrent SubmitHeaders + Close(): races between sending to the channel and the flush.

These are all exercisable with the current mocks.MockClient infrastructure.


Minor / Nits

  • signingWorkerPoolSize = 0 with comment // 0 means use runtime.GOMAXPROCS(0) is confusing since the code immediately replaces it: if workers <= 0 || workers > runtime.GOMAXPROCS(0) { workers = runtime.GOMAXPROCS(0) }. The constant adds no value — just initialise workers := runtime.GOMAXPROCS(0) directly.
  • submitQueueSize = 256 is a magic constant; even a brief comment on the rationale (or making it configurable via config.DA) would help operators tune for high-throughput deployments.
  • The wg.Go(...) API (sync.WaitGroup.Go) requires Go 1.25+. If the go.mod minimum isn't already ≥ 1.25 it will silently fail CI on older toolchains.

Summary

The async worker design is the right direction and the core batch-merge/retry logic is well-structured. The two items to fix before merging are the double-close deadlock (issue 1) and the lastHeaderSubmit optimistic update (issue 2), which can cause visible retry-delay regressions under a flaky DA layer. The context-aware channel send (issue 7) is a low-effort hardening improvement worth adding at the same time.

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 24, 2026

Codecov Report

❌ Patch coverage is 44.73684% with 147 lines in your changes missing coverage. Please review.
✅ Project coverage is 62.02%. Comparing base (49ef5c9) to head (b501384).

Files with missing lines Patch % Lines
block/internal/submitting/da_submitter.go 44.86% 129 Missing and 16 partials ⚠️
block/internal/submitting/da_submitter_tracing.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3287      +/-   ##
==========================================
- Coverage   62.45%   62.02%   -0.44%     
==========================================
  Files         122      122              
  Lines       13047    13194     +147     
==========================================
+ Hits         8149     8184      +35     
- Misses       4012     4122     +110     
- Partials      886      888       +2     
Flag Coverage Δ
combined 62.02% <44.73%> (-0.44%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant