Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ const headerInChLength = 10000
var initialBackoff = 100 * time.Millisecond

// DAIncludedHeightKey is the key used for persisting the da included height in store.
const DAIncludedHeightKey = "da included height"
const DAIncludedHeightKey = "d"

// LastBatchHashKey is the key used for persisting the last batch hash in store.
const LastBatchHashKey = "l"

// dataHashForEmptyTxs to be used while only syncing headers from DA and no p2p to get the Data for no txs scenarios, the syncing can proceed without getting stuck forever.
var dataHashForEmptyTxs = []byte{110, 52, 11, 156, 255, 179, 122, 152, 156, 165, 68, 230, 187, 120, 10, 44, 120, 144, 29, 63, 179, 55, 56, 118, 133, 17, 163, 6, 23, 175, 160, 29}
Expand Down Expand Up @@ -271,6 +274,12 @@ func NewManager(
return nil, err
}

// If lastBatchHash is not set, retrieve the last batch hash from store
lastBatchHash, err := store.GetMetadata(context.Background(), LastBatchHashKey)
if err != nil {
logger.Error("error while retrieving last batch hash", "error", err)
}

agg := &Manager{
proposerKey: proposerKey,
conf: conf,
Expand All @@ -290,6 +299,7 @@ func NewManager(
headerStore: headerStore,
dataStore: dataStore,
lastStateMtx: new(sync.RWMutex),
lastBatchHash: lastBatchHash,
headerCache: NewHeaderCache(),
dataCache: NewDataCache(),
retrieveCh: make(chan struct{}, 1),
Expand Down Expand Up @@ -443,6 +453,11 @@ func (m *Manager) BatchRetrieveLoop(ctx context.Context) {

// Update lastBatchHash only if the batch contains transactions
if batch.Transactions != nil {
err := m.store.SetMetadata(ctx, LastBatchHashKey, h)
Comment thread
gupadhyaya marked this conversation as resolved.
if err != nil {
m.logger.Error("error while setting last batch hash", "error", err)
}

m.lastBatchHash = h
}
} else {
Expand Down
16 changes: 12 additions & 4 deletions block/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,26 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context,
if err := syncService.store.Init(ctx, headerOrData); err != nil {
return fmt.Errorf("failed to initialize the store")
}
}

firstStart := false
if !syncService.syncerStatus.started.Load() {
firstStart = true
if err := syncService.StartSyncer(ctx); err != nil {
return fmt.Errorf("failed to start syncer after initializing the store")
}
}

// Broadcast for subscribers
if err := syncService.sub.Broadcast(ctx, headerOrData); err != nil {
// for the genesis header, broadcast error is expected as we have already initialized the store
// for starting the syncer. Hence, we ignore the error.
// exact reason: validation failed, err header verification failed: known header: '1' <= current '1'
if isGenesis && errors.Is(err, pubsub.ValidationError{Reason: pubsub.RejectValidationFailed}) {
// for the first block when starting the app, broadcast error is expected
// as we have already initialized the store for starting the syncer.
// Hence, we ignore the error. Exact reason: validation ignored
if (firstStart && errors.Is(err, pubsub.ValidationError{Reason: pubsub.RejectValidationIgnored})) ||
Comment thread
gupadhyaya marked this conversation as resolved.
// for the genesis header, broadcast error is expected as we have already initialized the store
// for starting the syncer. Hence, we ignore the error.
// exact reason: validation failed, err header verification failed: known header: '1' <= current '1'
(isGenesis && errors.Is(err, pubsub.ValidationError{Reason: pubsub.RejectValidationFailed})) {
return nil
}
return fmt.Errorf("failed to broadcast: %w", err)
Expand Down