Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
628 changes: 394 additions & 234 deletions block/internal/submitting/da_submitter.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions block/internal/submitting/da_submitter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted(
require.NoError(t, err)
require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen))

daSubmitter.Close()

// After submission, inclusion markers should be set
_, ok := cm.GetHeaderDAIncludedByHeight(1)
assert.True(t, ok)
Expand Down
270 changes: 0 additions & 270 deletions block/internal/submitting/da_submitter_mocks_test.go
Original file line number Diff line number Diff line change
@@ -1,271 +1 @@
package submitting

import (
"context"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/pkg/config"
datypes "github.com/evstack/ev-node/pkg/da/types"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/test/mocks"
)

// helper to build a basic submitter with provided DA mock client and config overrides
func newTestSubmitter(t *testing.T, mockClient *mocks.MockClient, override func(*config.Config)) *DASubmitter {
cfg := config.Config{}
// Keep retries small and backoffs minimal
cfg.DA.BlockTime.Duration = 1 * time.Millisecond
cfg.DA.MaxSubmitAttempts = 3
cfg.DA.SubmitOptions = "opts"
cfg.DA.Namespace = "ns"
cfg.DA.DataNamespace = "ns-data"
if override != nil {
override(&cfg)
}
if mockClient == nil {
mockClient = mocks.NewMockClient(t)
}
mockClient.On("GetHeaderNamespace").Return([]byte(cfg.DA.Namespace)).Maybe()
mockClient.On("GetDataNamespace").Return([]byte(cfg.DA.DataNamespace)).Maybe()
mockClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe()
mockClient.On("HasForcedInclusionNamespace").Return(false).Maybe()
return NewDASubmitter(mockClient, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, common.NopMetrics(), zerolog.Nop(), nil, nil)
}

func TestSubmitToDA_MempoolRetry_IncreasesGasAndSucceeds(t *testing.T) {
t.Parallel()

client := mocks.NewMockClient(t)

nsBz := datypes.NamespaceFromString("ns").Bytes()
opts := []byte("opts")
var usedGas []float64

client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts).
Run(func(args mock.Arguments) {
usedGas = append(usedGas, args.Get(2).(float64))
}).
Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusNotIncludedInBlock, SubmittedCount: 0}}).
Once()

ids := [][]byte{[]byte("id1"), []byte("id2"), []byte("id3")}
client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts).
Run(func(args mock.Arguments) {
usedGas = append(usedGas, args.Get(2).(float64))
}).
Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: ids, SubmittedCount: uint64(len(ids))}}).
Once()

s := newTestSubmitter(t, client, nil)

items := []string{"a", "b", "c"}
marshalledItems := make([][]byte, len(items))
for idx, item := range items {
marshalledItems[idx] = []byte(item)
}

ctx := context.Background()
err := submitToDA[string](
s,
ctx,
items,
marshalledItems,
func(_ []string, _ *datypes.ResultSubmit) {},
"item",
nsBz,
opts,
)
assert.NoError(t, err)

// Sentinel value is preserved on retry
assert.Equal(t, []float64{-1, -1}, usedGas)
}

func TestSubmitToDA_UnknownError_RetriesSameGasThenSucceeds(t *testing.T) {
t.Parallel()

client := mocks.NewMockClient(t)

nsBz := datypes.NamespaceFromString("ns").Bytes()

opts := []byte("opts")
var usedGas []float64

// First attempt: unknown failure -> reasonFailure, gas unchanged for next attempt
client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts).
Run(func(args mock.Arguments) { usedGas = append(usedGas, args.Get(2).(float64)) }).
Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "boom"}}).
Once()

// Second attempt: same gas, success
ids := [][]byte{[]byte("id1")}
client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts).
Run(func(args mock.Arguments) { usedGas = append(usedGas, args.Get(2).(float64)) }).
Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: ids, SubmittedCount: uint64(len(ids))}}).
Once()

s := newTestSubmitter(t, client, nil)

items := []string{"x"}
marshalledItems := make([][]byte, len(items))
for idx, item := range items {
marshalledItems[idx] = []byte(item)
}

ctx := context.Background()
err := submitToDA[string](
s,
ctx,
items,
marshalledItems,
func(_ []string, _ *datypes.ResultSubmit) {},
"item",
nsBz,
opts,
)
assert.NoError(t, err)
assert.Equal(t, []float64{-1, -1}, usedGas)
}

func TestSubmitToDA_TooBig_HalvesBatch(t *testing.T) {
t.Parallel()

client := mocks.NewMockClient(t)

nsBz := datypes.NamespaceFromString("ns").Bytes()

opts := []byte("opts")
var batchSizes []int

client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, opts).
Run(func(args mock.Arguments) {
blobs := args.Get(1).([][]byte)
batchSizes = append(batchSizes, len(blobs))
}).
Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusTooBig}}).
Once()

ids := [][]byte{[]byte("id1"), []byte("id2")}
client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, opts).
Run(func(args mock.Arguments) {
blobs := args.Get(1).([][]byte)
batchSizes = append(batchSizes, len(blobs))
}).
Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: ids, SubmittedCount: uint64(len(ids))}}).
Once()

s := newTestSubmitter(t, client, nil)

items := []string{"a", "b", "c", "d"}
marshalledItems := make([][]byte, len(items))
for idx, item := range items {
marshalledItems[idx] = []byte(item)
}

ctx := context.Background()
err := submitToDA[string](
s,
ctx,
items,
marshalledItems,
func(_ []string, _ *datypes.ResultSubmit) {},
"item",
nsBz,
opts,
)
assert.NoError(t, err)
assert.Equal(t, []int{4, 2}, batchSizes)
}

func TestSubmitToDA_SentinelNoGas_PreservesGasAcrossRetries(t *testing.T) {
t.Parallel()

client := mocks.NewMockClient(t)

nsBz := datypes.NamespaceFromString("ns").Bytes()

opts := []byte("opts")
var usedGas []float64

client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts).
Run(func(args mock.Arguments) { usedGas = append(usedGas, args.Get(2).(float64)) }).
Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusAlreadyInMempool}}).
Once()

ids := [][]byte{[]byte("id1")}
client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts).
Run(func(args mock.Arguments) { usedGas = append(usedGas, args.Get(2).(float64)) }).
Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: ids, SubmittedCount: uint64(len(ids))}}).
Once()

s := newTestSubmitter(t, client, nil)

items := []string{"only"}
marshalledItems := make([][]byte, len(items))
for idx, item := range items {
marshalledItems[idx] = []byte(item)
}

ctx := context.Background()
err := submitToDA[string](
s,
ctx,
items,
marshalledItems,
func(_ []string, _ *datypes.ResultSubmit) {},
"item",
nsBz,
opts,
)
assert.NoError(t, err)
assert.Equal(t, []float64{-1, -1}, usedGas)
}

func TestSubmitToDA_PartialSuccess_AdvancesWindow(t *testing.T) {
t.Parallel()

client := mocks.NewMockClient(t)

nsBz := datypes.NamespaceFromString("ns").Bytes()

opts := []byte("opts")
var totalSubmitted int

firstIDs := [][]byte{[]byte("id1"), []byte("id2")}
client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, opts).
Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: firstIDs, SubmittedCount: uint64(len(firstIDs))}}).
Once()

secondIDs := [][]byte{[]byte("id3")}
client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, opts).
Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: secondIDs, SubmittedCount: uint64(len(secondIDs))}}).
Once()

s := newTestSubmitter(t, client, nil)

items := []string{"a", "b", "c"}
marshalledItems := make([][]byte, len(items))
for idx, item := range items {
marshalledItems[idx] = []byte(item)
}

ctx := context.Background()
err := submitToDA[string](
s,
ctx,
items,
marshalledItems,
func(submitted []string, _ *datypes.ResultSubmit) { totalSubmitted += len(submitted) },
"item",
nsBz,
opts,
)
assert.NoError(t, err)
assert.Equal(t, 3, totalSubmitted)
}
7 changes: 2 additions & 5 deletions block/internal/submitting/da_submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,15 @@ const (
func setupDASubmitterTest(t *testing.T) (*DASubmitter, store.Store, cache.Manager, *mocks.MockClient, genesis.Genesis) {
t.Helper()

// Create store and cache
ds := sync.MutexWrap(datastore.NewMapDatastore())
st := store.New(ds)
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
require.NoError(t, err)

// Create config
cfg := config.DefaultConfig()
cfg.DA.Namespace = testHeaderNamespace
cfg.DA.DataNamespace = testDataNamespace

// Mock DA client
mockDA := mocks.NewMockClient(t)
headerNamespace := datypes.NamespaceFromString(cfg.DA.Namespace).Bytes()
dataNamespace := datypes.NamespaceFromString(cfg.DA.DataNamespace).Bytes()
Expand All @@ -55,15 +52,13 @@ func setupDASubmitterTest(t *testing.T) (*DASubmitter, store.Store, cache.Manage
mockDA.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe()
mockDA.On("HasForcedInclusionNamespace").Return(false).Maybe()

// Create genesis
gen := genesis.Genesis{
ChainID: "test-chain",
InitialHeight: 1,
StartTime: time.Now(),
ProposerAddress: []byte("test-proposer"),
}

// Create DA submitter
daSubmitter := NewDASubmitter(
mockDA,
cfg,
Expand Down Expand Up @@ -220,6 +215,7 @@ func TestDASubmitter_SubmitHeaders_Success(t *testing.T) {
require.NoError(t, err)
err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer)
require.NoError(t, err)
submitter.Close()

// Verify headers are marked as DA included
_, ok1 := cm.GetHeaderDAIncludedByHeight(1)
Expand Down Expand Up @@ -335,6 +331,7 @@ func TestDASubmitter_SubmitData_Success(t *testing.T) {
require.NoError(t, err)
err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer, gen)
require.NoError(t, err)
submitter.Close()

// Verify data is marked as DA included
_, ok := cm.GetDataDAIncludedByHeight(1)
Expand Down
4 changes: 4 additions & 0 deletions block/internal/submitting/da_submitter_tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,7 @@ func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*ty

return nil
}

func (t *tracedDASubmitter) Close() {
t.inner.Close()
}
2 changes: 2 additions & 0 deletions block/internal/submitting/da_submitter_tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (m *mockDASubmitterAPI) SubmitData(ctx context.Context, signedDataList []*t
return nil
}

func (m *mockDASubmitterAPI) Close() {}

func setupDASubmitterTrace(t *testing.T, inner DASubmitterAPI) (DASubmitterAPI, *tracetest.SpanRecorder) {
t.Helper()
sr := tracetest.NewSpanRecorder()
Expand Down
4 changes: 2 additions & 2 deletions block/internal/submitting/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/evstack/ev-node/types"
)

// DASubmitterAPI defines minimal methods needed by Submitter for DA submissions.
type DASubmitterAPI interface {
SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error
SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error
Close()
}

// Submitter handles DA submission and inclusion processing for both sync and aggregator nodes
Expand Down Expand Up @@ -136,7 +136,6 @@ func (s *Submitter) Start(ctx context.Context) (err error) {
return err
}

// Start DA submission loop if signer is available (aggregator nodes only)
if s.signer != nil {
s.logger.Info().Msg("starting DA submission loop")
s.wg.Go(s.daSubmissionLoop)
Expand All @@ -153,6 +152,7 @@ func (s *Submitter) Stop() error {
if s.cancel != nil {
s.cancel()
}
s.daSubmitter.Close()
// Wait for goroutines to finish with a timeout to prevent hanging
done := make(chan struct{})
go func() {
Expand Down
2 changes: 2 additions & 0 deletions block/internal/submitting/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ func (f *fakeDASubmitter) SubmitData(ctx context.Context, _ []*types.SignedData,
return nil
}

func (f *fakeDASubmitter) Close() {}

// fakeSigner implements signer.Signer with deterministic behavior for tests.
type fakeSigner struct{}

Expand Down
Loading