From e277538121f3c5dc5752e5e18f9b340dc3943443 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 24 Apr 2026 23:48:28 +0200 Subject: [PATCH 1/2] refactor(submitter): concurrent submitter --- block/internal/submitting/da_submitter.go | 631 +++++++++++------- .../da_submitter_integration_test.go | 2 + .../submitting/da_submitter_mocks_test.go | 270 -------- .../internal/submitting/da_submitter_test.go | 9 +- .../submitting/da_submitter_tracing.go | 4 + .../submitting/da_submitter_tracing_test.go | 2 + block/internal/submitting/submitter.go | 4 +- block/internal/submitting/submitter_test.go | 2 + 8 files changed, 413 insertions(+), 511 deletions(-) diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 83f56d9cb5..103ac74493 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -33,6 +33,8 @@ const ( // signingWorkerPoolSize determines how many parallel signing goroutines to use. // Ed25519 signing is CPU-bound, so we use GOMAXPROCS workers. signingWorkerPoolSize = 0 // 0 means use runtime.GOMAXPROCS(0) + + submitQueueSize = 256 ) const initialBackoff = 100 * time.Millisecond @@ -107,6 +109,12 @@ type DAHintAppender interface { AppendDAHint(ctx context.Context, daHeight uint64, heights ...uint64) error } +type batchGroup struct { + marshaled [][]byte + onSuccess func(count int, daHeight uint64) + makeRemaining func(submittedCount int, remainingMarshaled [][]byte) *batchGroup +} + // DASubmitter handles DA submission operations type DASubmitter struct { client da.Client @@ -131,6 +139,13 @@ type DASubmitter struct { // signingWorkers is the number of parallel workers for signing signingWorkers int + + headerSubmitCh chan *batchGroup + dataSubmitCh chan *batchGroup + headerFlushCh chan chan struct{} + dataFlushCh chan chan struct{} + workerCancel context.CancelFunc + workerWg sync.WaitGroup } // NewDASubmitter creates a new DA submitter @@ -179,7 +194,7 @@ func NewDASubmitter( workers = runtime.GOMAXPROCS(0) } - return &DASubmitter{ + s := &DASubmitter{ client: client, config: config, genesis: genesis, @@ -191,10 +206,313 @@ func NewDASubmitter( signingWorkers: workers, headerDAHintAppender: headerDAHintAppender, dataDAHintAppender: dataDAHintAppender, + headerSubmitCh: make(chan *batchGroup, submitQueueSize), + dataSubmitCh: make(chan *batchGroup, submitQueueSize), + headerFlushCh: make(chan chan struct{}), + dataFlushCh: make(chan chan struct{}), + } + + ctx, cancel := context.WithCancel(context.Background()) + s.workerCancel = cancel + s.workerWg.Add(2) + go s.runNamespaceWorker(ctx, s.headerSubmitCh, s.headerFlushCh, "header", s.client.GetHeaderNamespace()) + go s.runNamespaceWorker(ctx, s.dataSubmitCh, s.dataFlushCh, "data", s.client.GetDataNamespace()) + + return s +} + +func (s *DASubmitter) Close() { + if s.workerCancel != nil { + s.Flush() + s.workerCancel() + s.workerWg.Wait() + } +} + +func (s *DASubmitter) Flush() { + headerDone := make(chan struct{}) + s.headerFlushCh <- headerDone + dataDone := make(chan struct{}) + s.dataFlushCh <- dataDone + <-headerDone + <-dataDone +} + +func (s *DASubmitter) runNamespaceWorker(ctx context.Context, ch chan *batchGroup, flushCh chan chan struct{}, itemType string, namespace []byte) { + defer s.workerWg.Done() + for { + select { + case <-ctx.Done(): + drainChannel(ch, func(g *batchGroup) { + s.processBatch(ctx, []*batchGroup{g}, itemType, namespace) + }) + return + case done := <-flushCh: + drainChannel(ch, func(g *batchGroup) { + s.processBatch(ctx, []*batchGroup{g}, itemType, namespace) + }) + close(done) + case first := <-ch: + groups := []*batchGroup{first} + drainLoop: + for { + select { + case g := <-ch: + if g != nil { + groups = append(groups, g) + } + default: + break drainLoop + } + } + s.logger.Debug().Str("itemType", itemType).Int("groups", len(groups)).Msg("worker processing batch") + s.processBatchSafe(ctx, groups, itemType, namespace) + s.logger.Debug().Str("itemType", itemType).Msg("worker done processing batch") + } + } +} + +func drainChannel[T any](ch chan T, fn func(T)) { + for { + select { + case item := <-ch: + fn(item) + default: + return + } + } +} + +func (s *DASubmitter) processBatchSafe(ctx context.Context, groups []*batchGroup, itemType string, namespace []byte) { + defer func() { + if r := recover(); r != nil { + s.logger.Error().Any("panic", r).Str("itemType", itemType).Msg("worker panic in processBatch") + for _, g := range groups { + if g.onSuccess != nil { + g.onSuccess(0, 0) + } + } + } + }() + s.processBatch(ctx, groups, itemType, namespace) +} + +func (s *DASubmitter) processBatch(ctx context.Context, groups []*batchGroup, itemType string, namespace []byte) { + pol := defaultRetryPolicy(s.config.DA.MaxSubmitAttempts, s.config.DA.BlockTime.Duration) + options := []byte(s.config.DA.SubmitOptions) + + var allMarshaled [][]byte + for _, g := range groups { + allMarshaled = append(allMarshaled, g.marshaled...) + } + + if len(allMarshaled) == 0 { + for _, g := range groups { + if g.onSuccess != nil { + g.onSuccess(0, 0) + } + } + return + } + + limitedMarshaled, oversized := limitBatchBySizeBytes(allMarshaled, pol.MaxBlobBytes) + if oversized { + s.logger.Error(). + Str("itemType", itemType). + Uint64("maxBlobBytes", pol.MaxBlobBytes). + Msg("CRITICAL: item exceeds maximum blob size") + for _, g := range groups { + if g.onSuccess != nil { + g.onSuccess(0, 0) + } + } + return + } + allMarshaled = limitedMarshaled + + rs := retryState{} + + for rs.Attempt < pol.MaxAttempts { + if rs.Attempt > 0 { + s.metrics.DASubmitterResends.Add(1) + } + + if err := waitForBackoffOrContext(ctx, rs.Backoff); err != nil { + for _, g := range groups { + if g.onSuccess != nil { + g.onSuccess(0, 0) + } + } + return + } + + signingAddress := s.addressSelector.Next() + mergedOptions, err := mergeSubmitOptions(options, signingAddress) + if err != nil { + s.logger.Error().Err(err).Msg("failed to merge submit options with signing address") + for _, g := range groups { + if g.onSuccess != nil { + g.onSuccess(0, 0) + } + } + return + } + + start := time.Now() + res := s.client.Submit(ctx, allMarshaled, -1, namespace, mergedOptions) + s.logger.Debug().Int("attempts", rs.Attempt).Dur("elapsed", time.Since(start)).Uint64("code", uint64(res.Code)).Msg("got Submit response") + + if vis := server.GetDAVisualizationServer(); vis != nil { + vis.RecordSubmission(&res, 0, uint64(len(allMarshaled)), namespace) + } + + switch res.Code { + case datypes.StatusSuccess: + submitted := int(res.SubmittedCount) + s.distributeSuccess(groups, submitted, res.Height) + s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer") + if submitted == len(allMarshaled) { + rs.Next(reasonSuccess, pol) + return + } + allMarshaled = allMarshaled[submitted:] + groups = s.advanceGroups(groups, submitted) + rs.Next(reasonSuccess, pol) + + case datypes.StatusTooBig: + s.recordFailure(common.DASubmitterFailureReasonTooBig) + if len(allMarshaled) == 1 { + s.logger.Error(). + Str("itemType", itemType). + Msg("CRITICAL: single item exceeds DA blob size limit") + for _, g := range groups { + if g.onSuccess != nil { + g.onSuccess(0, 0) + } + } + return + } + half := len(allMarshaled) / 2 + if half == 0 { + half = 1 + } + allMarshaled = allMarshaled[:half] + groups = s.truncateGroups(groups, half) + s.logger.Debug().Int("newBatchSize", half).Msg("batch too big; halving and retrying") + rs.Next(reasonTooBig, pol) + + case datypes.StatusNotIncludedInBlock: + s.recordFailure(common.DASubmitterFailureReasonNotIncludedInBlock) + rs.Next(reasonMempool, pol) + + case datypes.StatusAlreadyInMempool: + s.recordFailure(common.DASubmitterFailureReasonAlreadyInMempool) + rs.Next(reasonMempool, pol) + + case datypes.StatusContextCanceled: + s.recordFailure(common.DASubmitterFailureReasonContextCanceled) + for _, g := range groups { + if g.onSuccess != nil { + g.onSuccess(0, 0) + } + } + return + + default: + s.recordFailure(common.DASubmitterFailureReasonUnknown) + s.logger.Error().Str("error", res.Message).Int("attempt", rs.Attempt+1).Msg("DA layer submission failed") + rs.Next(reasonFailure, pol) + } + } + + s.recordFailure(common.DASubmitterFailureReasonTimeout) + s.logger.Error().Str("itemType", itemType).Int("attempts", rs.Attempt).Msg("failed to submit all items to DA layer after max attempts") + for _, g := range groups { + if g.onSuccess != nil { + g.onSuccess(0, 0) + } + } +} + +func (s *DASubmitter) distributeSuccess(groups []*batchGroup, submittedCount int, daHeight uint64) { + remaining := submittedCount + for _, g := range groups { + if remaining <= 0 { + break + } + n := min(remaining, len(g.marshaled)) + if g.onSuccess != nil { + g.onSuccess(n, daHeight) + } + remaining -= n + } +} + +func (s *DASubmitter) advanceGroups(groups []*batchGroup, submittedCount int) []*batchGroup { + remaining := submittedCount + for i, g := range groups { + if remaining <= 0 { + return groups[i:] + } + if remaining >= len(g.marshaled) { + remaining -= len(g.marshaled) + } else { + if g.makeRemaining != nil { + if next := g.makeRemaining(remaining, g.marshaled[remaining:]); next != nil { + groups[i] = next + return groups[i:] + } + } + return groups[i+1:] + } + } + return nil +} + +func (s *DASubmitter) truncateGroups(groups []*batchGroup, maxItems int) []*batchGroup { + count := 0 + for i, g := range groups { + if count+len(g.marshaled) > maxItems { + trim := maxItems - count + if g.makeRemaining != nil { + if next := g.makeRemaining(trim, g.marshaled[trim:]); next != nil { + groups[i] = next + return groups[:i+1] + } + } + return groups[:i] + } + count += len(g.marshaled) + if count == maxItems { + return groups[:i+1] + } } + return groups +} + +func limitBatchBySizeBytes(marshaled [][]byte, maxBytes uint64) ([][]byte, bool) { + total := uint64(0) + count := 0 + for i, b := range marshaled { + sz := uint64(len(b)) + if sz > maxBytes { + if i == 0 { + return nil, true + } + break + } + if total+sz > maxBytes { + break + } + total += sz + count++ + } + if count == 0 { + return nil, true + } + return marshaled[:count], false } -// recordFailure records a DA submission failure in metrics func (s *DASubmitter) recordFailure(reason common.DASubmitterFailureReason) { counter, ok := s.metrics.DASubmitterFailures[reason] if !ok { @@ -208,7 +526,6 @@ func (s *DASubmitter) recordFailure(reason common.DASubmitterFailureReason) { } } -// SubmitHeaders submits pending headers to DA layer func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { if len(headers) == 0 { return nil @@ -230,37 +547,53 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.Signed return err } - return submitToDA(s, ctx, headers, envelopes, - func(submitted []*types.SignedHeader, res *datypes.ResultSubmit) { - heights := make([]uint64, len(submitted)) - for i, header := range submitted { - cache.SetHeaderDAIncluded(header.Hash().String(), res.Height, header.Height()) - heights[i] = header.Height() - } - if err := s.headerDAHintAppender.AppendDAHint(ctx, res.Height, heights...); err != nil { - s.logger.Error().Err(err).Msg("failed to append da height hint in header p2p store") - // ignoring error here, since we don't want to block the block submission' + postSubmit := s.makeHeaderPostSubmit(ctx, cache) + + s.headerSubmitCh <- &batchGroup{ + marshaled: envelopes, + onSuccess: func(count int, daHeight uint64) { + if count > 0 { + postSubmit(headers[:count], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(count), Height: daHeight}}) } - if l := len(submitted); l > 0 { - lastHeight := submitted[l-1].Height() - cache.SetLastSubmittedHeaderHeight(ctx, lastHeight) - // Update last submitted height for lazy cache invalidation (O(1) instead of O(N)) - s.lastSubmittedHeight.Store(lastHeight) + }, + makeRemaining: func(submittedCount int, remainingMarshaled [][]byte) *batchGroup { + remainingHeaders := headers[submittedCount:] + return &batchGroup{ + marshaled: remainingMarshaled, + onSuccess: func(count int, daHeight uint64) { + if count > 0 { + postSubmit(remainingHeaders[:count], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(count), Height: daHeight}}) + } + }, + makeRemaining: nil, } }, - "header", - s.client.GetHeaderNamespace(), - []byte(s.config.DA.SubmitOptions), - ) + } + return nil +} + +func (s *DASubmitter) makeHeaderPostSubmit(ctx context.Context, cache cache.Manager) func([]*types.SignedHeader, *datypes.ResultSubmit) { + return func(submitted []*types.SignedHeader, res *datypes.ResultSubmit) { + heights := make([]uint64, len(submitted)) + for i, header := range submitted { + cache.SetHeaderDAIncluded(header.Hash().String(), res.Height, header.Height()) + heights[i] = header.Height() + } + if err := s.headerDAHintAppender.AppendDAHint(ctx, res.Height, heights...); err != nil { + s.logger.Error().Err(err).Msg("failed to append da height hint in header p2p store") + } + if l := len(submitted); l > 0 { + lastHeight := submitted[l-1].Height() + cache.SetLastSubmittedHeaderHeight(ctx, lastHeight) + s.lastSubmittedHeight.Store(lastHeight) + } + } } -// createDAEnvelopes creates signed DA envelopes for the given headers. -// It uses caching to avoid re-signing on retries and parallel signing for new envelopes. func (s *DASubmitter) createDAEnvelopes(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, signer signer.Signer) ([][]byte, error) { envelopes := make([][]byte, len(headers)) - // First pass: check cache for already-signed envelopes - var needSigning []int // indices that need signing + var needSigning []int for i, header := range headers { height := header.Height() if cached := s.getCachedEnvelope(height); cached != nil { @@ -270,7 +603,6 @@ func (s *DASubmitter) createDAEnvelopes(ctx context.Context, headers []*types.Si } } - // If all envelopes were cached, we're done if len(needSigning) == 0 { s.logger.Debug().Int("cached", len(headers)).Msg("all envelopes retrieved from cache") return envelopes, nil @@ -281,7 +613,6 @@ func (s *DASubmitter) createDAEnvelopes(ctx context.Context, headers []*types.Si Int("to_sign", len(needSigning)). Msg("signing DA envelopes") - // For small batches, sign sequentially to avoid goroutine overhead if len(needSigning) <= 2 || s.signingWorkers <= 1 { for _, i := range needSigning { envelope, err := s.signAndCacheEnvelope(ctx, headers[i], marshalledHeaders[i], signer) @@ -293,11 +624,9 @@ func (s *DASubmitter) createDAEnvelopes(ctx context.Context, headers []*types.Si return envelopes, nil } - // Parallel signing for larger batches return s.signEnvelopesParallel(ctx, headers, marshalledHeaders, envelopes, needSigning, signer) } -// signEnvelopesParallel signs envelopes in parallel using a worker pool. func (s *DASubmitter) signEnvelopesParallel( ctx context.Context, headers []*types.SignedHeader, @@ -318,7 +647,6 @@ func (s *DASubmitter) signEnvelopesParallel( jobs := make(chan signJob, len(needSigning)) results := make(chan signResult, len(needSigning)) - // Start workers numWorkers := min(s.signingWorkers, len(needSigning)) var wg sync.WaitGroup for range numWorkers { @@ -330,19 +658,16 @@ func (s *DASubmitter) signEnvelopesParallel( }) } - // Send jobs for _, i := range needSigning { jobs <- signJob{index: i} } close(jobs) - // Wait for workers to finish and close results go func() { wg.Wait() close(results) }() - // Collect results var firstErr error for result := range results { if result.err != nil && firstErr == nil { @@ -361,33 +686,26 @@ func (s *DASubmitter) signEnvelopesParallel( return envelopes, nil } -// signAndCacheEnvelope signs a single header and caches the result. func (s *DASubmitter) signAndCacheEnvelope(ctx context.Context, header *types.SignedHeader, marshalledHeader []byte, signer signer.Signer) ([]byte, error) { - // Sign the pre-marshalled header content envelopeSignature, err := signer.Sign(ctx, marshalledHeader) if err != nil { return nil, fmt.Errorf("failed to sign envelope: %w", err) } - // Create the envelope and marshal it envelope, err := header.MarshalDAEnvelope(envelopeSignature) if err != nil { return nil, fmt.Errorf("failed to marshal DA envelope: %w", err) } - // Cache for potential retries s.setCachedEnvelope(header.Height(), envelope) return envelope, nil } -// getCachedEnvelope retrieves a cached envelope for the given height. -// Uses lazy invalidation: entries at or below lastSubmittedHeight are considered invalid. func (s *DASubmitter) getCachedEnvelope(height uint64) []byte { if s.envelopeCache == nil { return nil } - // Lazy invalidation: don't return cached data for already-submitted heights if height <= s.lastSubmittedHeight.Load() { return nil } @@ -400,13 +718,10 @@ func (s *DASubmitter) getCachedEnvelope(height uint64) []byte { return nil } -// setCachedEnvelope stores an envelope in the cache. -// Does not cache heights that have already been submitted. func (s *DASubmitter) setCachedEnvelope(height uint64, envelope []byte) { if s.envelopeCache == nil { return } - // Don't cache already-submitted heights if height <= s.lastSubmittedHeight.Load() { return } @@ -416,7 +731,6 @@ func (s *DASubmitter) setCachedEnvelope(height uint64, envelope []byte) { s.envelopeCache.Add(height, envelope) } -// SubmitData submits pending data to DA layer func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { if len(unsignedDataList) == 0 { return nil @@ -426,41 +740,59 @@ func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types. return fmt.Errorf("marshalledData length (%d) does not match unsignedDataList length (%d)", len(marshalledData), len(unsignedDataList)) } - // Sign the data (cache returns unsigned SignedData structs) signedDataList, signedDataListBz, err := s.signData(ctx, unsignedDataList, marshalledData, signer, genesis) if err != nil { return fmt.Errorf("failed to sign data: %w", err) } if len(signedDataList) == 0 { - return nil // No non-empty data to submit + return nil } s.logger.Info().Int("count", len(signedDataList)).Msg("submitting data to DA") - return submitToDA(s, ctx, signedDataList, signedDataListBz, - func(submitted []*types.SignedData, res *datypes.ResultSubmit) { - heights := make([]uint64, len(submitted)) - for i, sd := range submitted { - cache.SetDataDAIncluded(sd.Data.DACommitment().String(), res.Height, sd.Height()) - heights[i] = sd.Height() - } - if err := s.dataDAHintAppender.AppendDAHint(ctx, res.Height, heights...); err != nil { - s.logger.Error().Err(err).Msg("failed to append da height hint in data p2p store") - // ignoring error here, since we don't want to block the block submission' + postSubmit := s.makeDataPostSubmit(ctx, cache) + + s.dataSubmitCh <- &batchGroup{ + marshaled: signedDataListBz, + onSuccess: func(count int, daHeight uint64) { + if count > 0 { + postSubmit(signedDataList[:count], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(count), Height: daHeight}}) } - if l := len(submitted); l > 0 { - lastHeight := submitted[l-1].Height() - cache.SetLastSubmittedDataHeight(ctx, lastHeight) + }, + makeRemaining: func(submittedCount int, remainingMarshaled [][]byte) *batchGroup { + remainingData := signedDataList[submittedCount:] + return &batchGroup{ + marshaled: remainingMarshaled, + onSuccess: func(count int, daHeight uint64) { + if count > 0 { + postSubmit(remainingData[:count], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(count), Height: daHeight}}) + } + }, + makeRemaining: nil, } }, - "data", - s.client.GetDataNamespace(), - []byte(s.config.DA.SubmitOptions), - ) + } + return nil +} + +func (s *DASubmitter) makeDataPostSubmit(ctx context.Context, cache cache.Manager) func([]*types.SignedData, *datypes.ResultSubmit) { + return func(submitted []*types.SignedData, res *datypes.ResultSubmit) { + heights := make([]uint64, len(submitted)) + for i, sd := range submitted { + cache.SetDataDAIncluded(sd.Data.DACommitment().String(), res.Height, sd.Height()) + heights[i] = sd.Height() + } + if err := s.dataDAHintAppender.AppendDAHint(ctx, res.Height, heights...); err != nil { + s.logger.Error().Err(err).Msg("failed to append da height hint in data p2p store") + } + if l := len(submitted); l > 0 { + lastHeight := submitted[l-1].Height() + cache.SetLastSubmittedDataHeight(ctx, lastHeight) + } + } } -// signData signs unsigned SignedData structs returned from cache func (s *DASubmitter) signData(ctx context.Context, unsignedDataList []*types.SignedData, unsignedDataListBz [][]byte, signer signer.Signer, genesis genesis.Genesis) ([]*types.SignedData, [][]byte, error) { if signer == nil { return nil, nil, fmt.Errorf("signer is nil") @@ -489,7 +821,6 @@ func (s *DASubmitter) signData(ctx context.Context, unsignedDataList []*types.Si signedDataListBz := make([][]byte, 0, len(unsignedDataListBz)) for i, unsignedData := range unsignedDataList { - // Skip empty data if len(unsignedData.Txs) == 0 { continue } @@ -518,10 +849,6 @@ func (s *DASubmitter) signData(ctx context.Context, unsignedDataList []*types.Si return signedDataList, signedDataListBz, nil } -// mergeSubmitOptions merges the base submit options with a signing address. -// If the base options are valid JSON, the signing address is added to the JSON object. -// Otherwise, a new JSON object is created with just the signing address. -// Returns the base options unchanged if no signing address is provided. func mergeSubmitOptions(baseOptions []byte, signingAddress string) ([]byte, error) { if signingAddress == "" { return baseOptions, nil @@ -529,25 +856,18 @@ func mergeSubmitOptions(baseOptions []byte, signingAddress string) ([]byte, erro var optionsMap map[string]any - // If base options are provided, try to parse them as JSON if len(baseOptions) > 0 { - // Try to unmarshal existing options, ignoring errors for non-JSON input if err := json.Unmarshal(baseOptions, &optionsMap); err != nil { - // Not valid JSON - start with empty map optionsMap = make(map[string]any) } } - // Ensure map is initialized even if unmarshal returned nil if optionsMap == nil { optionsMap = make(map[string]any) } - // Add or override the signing address - // Note: Uses "signer_address" to match Celestia's TxConfig JSON schema optionsMap["signer_address"] = signingAddress - // Marshal back to JSON mergedOptions, err := json.Marshal(optionsMap) if err != nil { return nil, fmt.Errorf("failed to marshal submit options: %w", err) @@ -556,163 +876,6 @@ func mergeSubmitOptions(baseOptions []byte, signingAddress string) ([]byte, erro return mergedOptions, nil } -// submitToDA is a generic helper for submitting items to the DA layer with retry, backoff, and gas price logic. -func submitToDA[T any]( - s *DASubmitter, - ctx context.Context, - items []T, - marshaled [][]byte, - postSubmit func([]T, *datypes.ResultSubmit), - itemType string, - namespace []byte, - options []byte, -) error { - if len(items) != len(marshaled) { - return fmt.Errorf("items length (%d) does not match marshaled length (%d)", len(items), len(marshaled)) - } - - pol := defaultRetryPolicy(s.config.DA.MaxSubmitAttempts, s.config.DA.BlockTime.Duration) - - rs := retryState{Attempt: 0, Backoff: 0} - - // Limit this submission to a single size-capped batch - if len(marshaled) > 0 { - batchItems, batchMarshaled, err := limitBatchBySize(items, marshaled, pol.MaxBlobBytes) - if err != nil { - s.logger.Error(). - Str("itemType", itemType). - Uint64("maxBlobBytes", pol.MaxBlobBytes). - Err(err). - Msg("CRITICAL: Unrecoverable error - item exceeds maximum blob size") - return fmt.Errorf("unrecoverable error: no %s items fit within max blob size: %w", itemType, err) - } - items = batchItems - marshaled = batchMarshaled - } - - // Start the retry loop - for rs.Attempt < pol.MaxAttempts { - // Record resend metric for retry attempts (not the first attempt) - if rs.Attempt > 0 { - s.metrics.DASubmitterResends.Add(1) - } - - if err := waitForBackoffOrContext(ctx, rs.Backoff); err != nil { - return err - } - - // Select signing address and merge with options - signingAddress := s.addressSelector.Next() - mergedOptions, err := mergeSubmitOptions(options, signingAddress) - if err != nil { - s.logger.Error().Err(err).Msg("failed to merge submit options with signing address") - return fmt.Errorf("failed to merge submit options: %w", err) - } - - if signingAddress != "" { - s.logger.Debug().Str("signingAddress", signingAddress).Msg("using signing address for DA submission") - } - - // Perform submission - start := time.Now() - res := s.client.Submit(ctx, marshaled, -1, namespace, mergedOptions) - s.logger.Debug().Int("attempts", rs.Attempt).Dur("elapsed", time.Since(start)).Uint64("code", uint64(res.Code)).Msg("got SubmitWithHelpers response from celestia") - - // Record submission result for observability - if daVisualizationServer := server.GetDAVisualizationServer(); daVisualizationServer != nil { - daVisualizationServer.RecordSubmission(&res, 0, uint64(len(items)), namespace) - } - - switch res.Code { - case datypes.StatusSuccess: - submitted := items[:res.SubmittedCount] - postSubmit(submitted, &res) - s.logger.Info().Str("itemType", itemType).Uint64("count", res.SubmittedCount).Msg("successfully submitted items to DA layer") - if int(res.SubmittedCount) == len(items) { - rs.Next(reasonSuccess, pol) - return nil - } - // partial success: advance window - items = items[res.SubmittedCount:] - marshaled = marshaled[res.SubmittedCount:] - rs.Next(reasonSuccess, pol) - - case datypes.StatusTooBig: - // Record failure metric - s.recordFailure(common.DASubmitterFailureReasonTooBig) - // Iteratively halve until it fits or single-item too big - if len(items) == 1 { - s.logger.Error(). - Str("itemType", itemType). - Uint64("maxBlobBytes", pol.MaxBlobBytes). - Msg("CRITICAL: Unrecoverable error - single item exceeds DA blob size limit") - return fmt.Errorf("unrecoverable error: %w: single %s item exceeds DA blob size limit", common.ErrOversizedItem, itemType) - } - half := len(items) / 2 - if half == 0 { - half = 1 - } - items = items[:half] - marshaled = marshaled[:half] - s.logger.Debug().Int("newBatchSize", half).Msg("batch too big; halving and retrying") - rs.Next(reasonTooBig, pol) - - case datypes.StatusNotIncludedInBlock: - // Record failure metric - s.recordFailure(common.DASubmitterFailureReasonNotIncludedInBlock) - s.logger.Info().Dur("backoff", pol.MaxBackoff).Msg("retrying due to mempool state") - rs.Next(reasonMempool, pol) - - case datypes.StatusAlreadyInMempool: - // Record failure metric - s.recordFailure(common.DASubmitterFailureReasonAlreadyInMempool) - s.logger.Info().Dur("backoff", pol.MaxBackoff).Msg("retrying due to mempool state") - rs.Next(reasonMempool, pol) - - case datypes.StatusContextCanceled: - // Record failure metric - s.recordFailure(common.DASubmitterFailureReasonContextCanceled) - s.logger.Info().Msg("DA layer submission canceled due to context cancellation") - return context.Canceled - - default: - // Record failure metric - s.recordFailure(common.DASubmitterFailureReasonUnknown) - s.logger.Error().Str("error", res.Message).Int("attempt", rs.Attempt+1).Msg("DA layer submission failed") - rs.Next(reasonFailure, pol) - } - } - - // Final failure after max attempts - s.recordFailure(common.DASubmitterFailureReasonTimeout) - return fmt.Errorf("failed to submit all %s(s) to DA layer after %d attempts", itemType, rs.Attempt) -} - -// limitBatchBySize returns a prefix of items whose total marshaled size does not exceed maxBytes. -// If the first item exceeds maxBytes, it returns ErrOversizedItem which is unrecoverable. -func limitBatchBySize[T any](items []T, marshaled [][]byte, maxBytes uint64) ([]T, [][]byte, error) { - total := uint64(0) - count := 0 - for i := range items { - sz := uint64(len(marshaled[i])) - if sz > maxBytes { - if i == 0 { - return nil, nil, fmt.Errorf("%w: item size %d exceeds max %d", common.ErrOversizedItem, sz, maxBytes) - } - break - } - if total+sz > maxBytes { - break - } - total += sz - count++ - } - if count == 0 { - return nil, nil, fmt.Errorf("no items fit within %d bytes", maxBytes) - } - return items[:count], marshaled[:count], nil -} - func waitForBackoffOrContext(ctx context.Context, backoff time.Duration) error { if backoff <= 0 { select { diff --git a/block/internal/submitting/da_submitter_integration_test.go b/block/internal/submitting/da_submitter_integration_test.go index b2c4efcd20..3742d099ab 100644 --- a/block/internal/submitting/da_submitter_integration_test.go +++ b/block/internal/submitting/da_submitter_integration_test.go @@ -107,6 +107,8 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted( require.NoError(t, err) require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen)) + daSubmitter.Flush() + // After submission, inclusion markers should be set _, ok := cm.GetHeaderDAIncludedByHeight(1) assert.True(t, ok) diff --git a/block/internal/submitting/da_submitter_mocks_test.go b/block/internal/submitting/da_submitter_mocks_test.go index 2d79208e92..863379904e 100644 --- a/block/internal/submitting/da_submitter_mocks_test.go +++ b/block/internal/submitting/da_submitter_mocks_test.go @@ -1,271 +1 @@ package submitting - -import ( - "context" - "testing" - "time" - - "github.com/rs/zerolog" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - - "github.com/evstack/ev-node/block/internal/common" - "github.com/evstack/ev-node/pkg/config" - datypes "github.com/evstack/ev-node/pkg/da/types" - "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/test/mocks" -) - -// helper to build a basic submitter with provided DA mock client and config overrides -func newTestSubmitter(t *testing.T, mockClient *mocks.MockClient, override func(*config.Config)) *DASubmitter { - cfg := config.Config{} - // Keep retries small and backoffs minimal - cfg.DA.BlockTime.Duration = 1 * time.Millisecond - cfg.DA.MaxSubmitAttempts = 3 - cfg.DA.SubmitOptions = "opts" - cfg.DA.Namespace = "ns" - cfg.DA.DataNamespace = "ns-data" - if override != nil { - override(&cfg) - } - if mockClient == nil { - mockClient = mocks.NewMockClient(t) - } - mockClient.On("GetHeaderNamespace").Return([]byte(cfg.DA.Namespace)).Maybe() - mockClient.On("GetDataNamespace").Return([]byte(cfg.DA.DataNamespace)).Maybe() - mockClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() - mockClient.On("HasForcedInclusionNamespace").Return(false).Maybe() - return NewDASubmitter(mockClient, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, common.NopMetrics(), zerolog.Nop(), nil, nil) -} - -func TestSubmitToDA_MempoolRetry_IncreasesGasAndSucceeds(t *testing.T) { - t.Parallel() - - client := mocks.NewMockClient(t) - - nsBz := datypes.NamespaceFromString("ns").Bytes() - opts := []byte("opts") - var usedGas []float64 - - client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts). - Run(func(args mock.Arguments) { - usedGas = append(usedGas, args.Get(2).(float64)) - }). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusNotIncludedInBlock, SubmittedCount: 0}}). - Once() - - ids := [][]byte{[]byte("id1"), []byte("id2"), []byte("id3")} - client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts). - Run(func(args mock.Arguments) { - usedGas = append(usedGas, args.Get(2).(float64)) - }). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: ids, SubmittedCount: uint64(len(ids))}}). - Once() - - s := newTestSubmitter(t, client, nil) - - items := []string{"a", "b", "c"} - marshalledItems := make([][]byte, len(items)) - for idx, item := range items { - marshalledItems[idx] = []byte(item) - } - - ctx := context.Background() - err := submitToDA[string]( - s, - ctx, - items, - marshalledItems, - func(_ []string, _ *datypes.ResultSubmit) {}, - "item", - nsBz, - opts, - ) - assert.NoError(t, err) - - // Sentinel value is preserved on retry - assert.Equal(t, []float64{-1, -1}, usedGas) -} - -func TestSubmitToDA_UnknownError_RetriesSameGasThenSucceeds(t *testing.T) { - t.Parallel() - - client := mocks.NewMockClient(t) - - nsBz := datypes.NamespaceFromString("ns").Bytes() - - opts := []byte("opts") - var usedGas []float64 - - // First attempt: unknown failure -> reasonFailure, gas unchanged for next attempt - client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts). - Run(func(args mock.Arguments) { usedGas = append(usedGas, args.Get(2).(float64)) }). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "boom"}}). - Once() - - // Second attempt: same gas, success - ids := [][]byte{[]byte("id1")} - client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts). - Run(func(args mock.Arguments) { usedGas = append(usedGas, args.Get(2).(float64)) }). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: ids, SubmittedCount: uint64(len(ids))}}). - Once() - - s := newTestSubmitter(t, client, nil) - - items := []string{"x"} - marshalledItems := make([][]byte, len(items)) - for idx, item := range items { - marshalledItems[idx] = []byte(item) - } - - ctx := context.Background() - err := submitToDA[string]( - s, - ctx, - items, - marshalledItems, - func(_ []string, _ *datypes.ResultSubmit) {}, - "item", - nsBz, - opts, - ) - assert.NoError(t, err) - assert.Equal(t, []float64{-1, -1}, usedGas) -} - -func TestSubmitToDA_TooBig_HalvesBatch(t *testing.T) { - t.Parallel() - - client := mocks.NewMockClient(t) - - nsBz := datypes.NamespaceFromString("ns").Bytes() - - opts := []byte("opts") - var batchSizes []int - - client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, opts). - Run(func(args mock.Arguments) { - blobs := args.Get(1).([][]byte) - batchSizes = append(batchSizes, len(blobs)) - }). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusTooBig}}). - Once() - - ids := [][]byte{[]byte("id1"), []byte("id2")} - client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, opts). - Run(func(args mock.Arguments) { - blobs := args.Get(1).([][]byte) - batchSizes = append(batchSizes, len(blobs)) - }). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: ids, SubmittedCount: uint64(len(ids))}}). - Once() - - s := newTestSubmitter(t, client, nil) - - items := []string{"a", "b", "c", "d"} - marshalledItems := make([][]byte, len(items)) - for idx, item := range items { - marshalledItems[idx] = []byte(item) - } - - ctx := context.Background() - err := submitToDA[string]( - s, - ctx, - items, - marshalledItems, - func(_ []string, _ *datypes.ResultSubmit) {}, - "item", - nsBz, - opts, - ) - assert.NoError(t, err) - assert.Equal(t, []int{4, 2}, batchSizes) -} - -func TestSubmitToDA_SentinelNoGas_PreservesGasAcrossRetries(t *testing.T) { - t.Parallel() - - client := mocks.NewMockClient(t) - - nsBz := datypes.NamespaceFromString("ns").Bytes() - - opts := []byte("opts") - var usedGas []float64 - - client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts). - Run(func(args mock.Arguments) { usedGas = append(usedGas, args.Get(2).(float64)) }). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusAlreadyInMempool}}). - Once() - - ids := [][]byte{[]byte("id1")} - client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts). - Run(func(args mock.Arguments) { usedGas = append(usedGas, args.Get(2).(float64)) }). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: ids, SubmittedCount: uint64(len(ids))}}). - Once() - - s := newTestSubmitter(t, client, nil) - - items := []string{"only"} - marshalledItems := make([][]byte, len(items)) - for idx, item := range items { - marshalledItems[idx] = []byte(item) - } - - ctx := context.Background() - err := submitToDA[string]( - s, - ctx, - items, - marshalledItems, - func(_ []string, _ *datypes.ResultSubmit) {}, - "item", - nsBz, - opts, - ) - assert.NoError(t, err) - assert.Equal(t, []float64{-1, -1}, usedGas) -} - -func TestSubmitToDA_PartialSuccess_AdvancesWindow(t *testing.T) { - t.Parallel() - - client := mocks.NewMockClient(t) - - nsBz := datypes.NamespaceFromString("ns").Bytes() - - opts := []byte("opts") - var totalSubmitted int - - firstIDs := [][]byte{[]byte("id1"), []byte("id2")} - client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, opts). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: firstIDs, SubmittedCount: uint64(len(firstIDs))}}). - Once() - - secondIDs := [][]byte{[]byte("id3")} - client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, opts). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: secondIDs, SubmittedCount: uint64(len(secondIDs))}}). - Once() - - s := newTestSubmitter(t, client, nil) - - items := []string{"a", "b", "c"} - marshalledItems := make([][]byte, len(items)) - for idx, item := range items { - marshalledItems[idx] = []byte(item) - } - - ctx := context.Background() - err := submitToDA[string]( - s, - ctx, - items, - marshalledItems, - func(submitted []string, _ *datypes.ResultSubmit) { totalSubmitted += len(submitted) }, - "item", - nsBz, - opts, - ) - assert.NoError(t, err) - assert.Equal(t, 3, totalSubmitted) -} diff --git a/block/internal/submitting/da_submitter_test.go b/block/internal/submitting/da_submitter_test.go index d25786018b..bd6a77c0fc 100644 --- a/block/internal/submitting/da_submitter_test.go +++ b/block/internal/submitting/da_submitter_test.go @@ -35,18 +35,15 @@ const ( func setupDASubmitterTest(t *testing.T) (*DASubmitter, store.Store, cache.Manager, *mocks.MockClient, genesis.Genesis) { t.Helper() - // Create store and cache ds := sync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) require.NoError(t, err) - // Create config cfg := config.DefaultConfig() cfg.DA.Namespace = testHeaderNamespace cfg.DA.DataNamespace = testDataNamespace - // Mock DA client mockDA := mocks.NewMockClient(t) headerNamespace := datypes.NamespaceFromString(cfg.DA.Namespace).Bytes() dataNamespace := datypes.NamespaceFromString(cfg.DA.DataNamespace).Bytes() @@ -55,7 +52,6 @@ func setupDASubmitterTest(t *testing.T) (*DASubmitter, store.Store, cache.Manage mockDA.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() mockDA.On("HasForcedInclusionNamespace").Return(false).Maybe() - // Create genesis gen := genesis.Genesis{ ChainID: "test-chain", InitialHeight: 1, @@ -63,7 +59,6 @@ func setupDASubmitterTest(t *testing.T) (*DASubmitter, store.Store, cache.Manage ProposerAddress: []byte("test-proposer"), } - // Create DA submitter daSubmitter := NewDASubmitter( mockDA, cfg, @@ -220,6 +215,8 @@ func TestDASubmitter_SubmitHeaders_Success(t *testing.T) { require.NoError(t, err) err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer) require.NoError(t, err) + submitter.Flush() + submitter.Close() // Verify headers are marked as DA included _, ok1 := cm.GetHeaderDAIncludedByHeight(1) @@ -335,6 +332,8 @@ func TestDASubmitter_SubmitData_Success(t *testing.T) { require.NoError(t, err) err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer, gen) require.NoError(t, err) + submitter.Flush() + submitter.Close() // Verify data is marked as DA included _, ok := cm.GetDataDAIncludedByHeight(1) diff --git a/block/internal/submitting/da_submitter_tracing.go b/block/internal/submitting/da_submitter_tracing.go index e3c531fcf8..2dcab792b5 100644 --- a/block/internal/submitting/da_submitter_tracing.go +++ b/block/internal/submitting/da_submitter_tracing.go @@ -95,3 +95,7 @@ func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*ty return nil } + +func (t *tracedDASubmitter) Close() { + t.inner.Close() +} diff --git a/block/internal/submitting/da_submitter_tracing_test.go b/block/internal/submitting/da_submitter_tracing_test.go index 6edc5c5ec1..e0da3eb1b4 100644 --- a/block/internal/submitting/da_submitter_tracing_test.go +++ b/block/internal/submitting/da_submitter_tracing_test.go @@ -37,6 +37,8 @@ func (m *mockDASubmitterAPI) SubmitData(ctx context.Context, signedDataList []*t return nil } +func (m *mockDASubmitterAPI) Close() {} + func setupDASubmitterTrace(t *testing.T, inner DASubmitterAPI) (DASubmitterAPI, *tracetest.SpanRecorder) { t.Helper() sr := tracetest.NewSpanRecorder() diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 25dcd781a1..7e1ea9974e 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -23,10 +23,10 @@ import ( "github.com/evstack/ev-node/types" ) -// DASubmitterAPI defines minimal methods needed by Submitter for DA submissions. type DASubmitterAPI interface { SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error + Close() } // Submitter handles DA submission and inclusion processing for both sync and aggregator nodes @@ -136,7 +136,6 @@ func (s *Submitter) Start(ctx context.Context) (err error) { return err } - // Start DA submission loop if signer is available (aggregator nodes only) if s.signer != nil { s.logger.Info().Msg("starting DA submission loop") s.wg.Go(s.daSubmissionLoop) @@ -153,6 +152,7 @@ func (s *Submitter) Stop() error { if s.cancel != nil { s.cancel() } + s.daSubmitter.Close() // Wait for goroutines to finish with a timeout to prevent hanging done := make(chan struct{}) go func() { diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index b1e2d2e988..9cd1660012 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -437,6 +437,8 @@ func (f *fakeDASubmitter) SubmitData(ctx context.Context, _ []*types.SignedData, return nil } +func (f *fakeDASubmitter) Close() {} + // fakeSigner implements signer.Signer with deterministic behavior for tests. type fakeSigner struct{} From b501384ec5b8624c0e5c5978d6d29c79aa8c0144 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 24 Apr 2026 23:52:04 +0200 Subject: [PATCH 2/2] cleanup api --- block/internal/submitting/da_submitter.go | 11 ++++------- .../submitting/da_submitter_integration_test.go | 2 +- block/internal/submitting/da_submitter_test.go | 2 -- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 103ac74493..8fe4524e52 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -222,20 +222,17 @@ func NewDASubmitter( } func (s *DASubmitter) Close() { - if s.workerCancel != nil { - s.Flush() - s.workerCancel() - s.workerWg.Wait() + if s.workerCancel == nil { + return } -} - -func (s *DASubmitter) Flush() { headerDone := make(chan struct{}) s.headerFlushCh <- headerDone dataDone := make(chan struct{}) s.dataFlushCh <- dataDone <-headerDone <-dataDone + s.workerCancel() + s.workerWg.Wait() } func (s *DASubmitter) runNamespaceWorker(ctx context.Context, ch chan *batchGroup, flushCh chan chan struct{}, itemType string, namespace []byte) { diff --git a/block/internal/submitting/da_submitter_integration_test.go b/block/internal/submitting/da_submitter_integration_test.go index 3742d099ab..986d58c94d 100644 --- a/block/internal/submitting/da_submitter_integration_test.go +++ b/block/internal/submitting/da_submitter_integration_test.go @@ -107,7 +107,7 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted( require.NoError(t, err) require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen)) - daSubmitter.Flush() + daSubmitter.Close() // After submission, inclusion markers should be set _, ok := cm.GetHeaderDAIncludedByHeight(1) diff --git a/block/internal/submitting/da_submitter_test.go b/block/internal/submitting/da_submitter_test.go index bd6a77c0fc..cd3a2e9ce6 100644 --- a/block/internal/submitting/da_submitter_test.go +++ b/block/internal/submitting/da_submitter_test.go @@ -215,7 +215,6 @@ func TestDASubmitter_SubmitHeaders_Success(t *testing.T) { require.NoError(t, err) err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer) require.NoError(t, err) - submitter.Flush() submitter.Close() // Verify headers are marked as DA included @@ -332,7 +331,6 @@ func TestDASubmitter_SubmitData_Success(t *testing.T) { require.NoError(t, err) err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer, gen) require.NoError(t, err) - submitter.Flush() submitter.Close() // Verify data is marked as DA included