diff --git a/block/manager.go b/block/manager.go index 65cfe98444..3e0341560a 100644 --- a/block/manager.go +++ b/block/manager.go @@ -56,17 +56,12 @@ type Manager struct { // daHeight is the height of the latest processed DA block daHeight uint64 - HeaderOutCh chan *types.SignedHeader - HeaderInCh chan *types.SignedHeader - SyncedHeadersCh chan *types.SignedHeader - - lastCommit atomic.Value + HeaderCh chan *types.SignedHeader FraudProofInCh chan *abci.FraudProof - syncTarget uint64 - blockInCh chan newBlockEvent - syncCache map[uint64]*types.Block + blockInCh chan newBlockEvent + syncCache map[uint64]*types.Block // retrieveMtx is used by retrieveCond retrieveMtx *sync.Mutex @@ -152,9 +147,7 @@ func NewManager( retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP) daHeight: s.DAHeight, // channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary - HeaderOutCh: make(chan *types.SignedHeader, 100), - HeaderInCh: make(chan *types.SignedHeader, 100), - SyncedHeadersCh: make(chan *types.SignedHeader, 100), + HeaderCh: make(chan *types.SignedHeader, 100), blockInCh: make(chan newBlockEvent, 100), FraudProofInCh: make(chan *abci.FraudProof, 100), retrieveMtx: new(sync.Mutex), @@ -262,27 +255,6 @@ func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) { select { case <-daTicker.C: m.retrieveCond.Signal() - case header := <-m.HeaderInCh: - m.logger.Debug("block header received", "height", header.Header.Height(), "hash", header.Header.Hash()) - m.SyncedHeadersCh <- header - newHeight := header.Header.BaseHeader.Height - currentHeight := m.store.Height() - // in case of client reconnecting after being offline - // newHeight may be significantly larger than currentHeight - // it's handled gently in RetrieveLoop - if newHeight > currentHeight { - atomic.StoreUint64(&m.syncTarget, newHeight) - m.retrieveCond.Signal() - } - commit := &header.Commit - // TODO(tzdybal): check if it's from right aggregator - m.lastCommit.Store(commit) - err := m.trySyncNextBlock(ctx, 0) - if err != nil { - m.logger.Info("failed to sync next block", "error", err) - } else { - m.logger.Debug("synced using signed header", "height", commit.Height) - } case blockEvent := <-m.blockInCh: block := blockEvent.block daHeight := blockEvent.daHeight @@ -335,43 +307,36 @@ func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) { // If block at height h+1 is not available, value of last gossiped commit is checked. // If commit for block h is available, we proceed with sync process, and remove synced block from sync cache. func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error { - var b1 *types.Block var commit *types.Commit currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory - b1, ok1 := m.syncCache[currentHeight+1] - if !ok1 { + b, ok := m.syncCache[currentHeight+1] + if !ok { return nil } - b2, ok2 := m.syncCache[currentHeight+2] - if ok2 { - m.logger.Debug("using last commit from next block") - commit = &b2.SignedHeader.Commit - } else { - lastCommit := m.getLastCommit() - if lastCommit != nil && lastCommit.Height == currentHeight+1 { - m.logger.Debug("using gossiped commit") - commit = lastCommit - } + + signedHeader := &b.SignedHeader + if signedHeader != nil { + commit = &b.SignedHeader.Commit } - if b1 != nil && commit != nil { - m.logger.Info("Syncing block", "height", b1.SignedHeader.Header.Height()) - newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, b1) + if b != nil && commit != nil { + m.logger.Info("Syncing block", "height", b.SignedHeader.Header.Height()) + newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, b) if err != nil { return fmt.Errorf("failed to ApplyBlock: %w", err) } - err = m.store.SaveBlock(b1, commit) + err = m.store.SaveBlock(b, commit) if err != nil { return fmt.Errorf("failed to save block: %w", err) } - _, _, err = m.executor.Commit(ctx, newState, b1, responses) + _, _, err = m.executor.Commit(ctx, newState, b, responses) if err != nil { return fmt.Errorf("failed to Commit: %w", err) } - m.store.SetHeight(uint64(b1.SignedHeader.Header.Height())) + m.store.SetHeight(uint64(b.SignedHeader.Header.Height())) - err = m.store.SaveBlockResponses(uint64(b1.SignedHeader.Header.Height()), responses) + err = m.store.SaveBlockResponses(uint64(b.SignedHeader.Header.Height()), responses) if err != nil { return fmt.Errorf("failed to save block responses: %w", err) } @@ -390,14 +355,6 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error { return nil } -func (m *Manager) getLastCommit() *types.Commit { - ptr := m.lastCommit.Load() - if ptr == nil { - return nil - } - return ptr.(*types.Commit) -} - // RetrieveLoop is responsible for interacting with DA layer. func (m *Manager) RetrieveLoop(ctx context.Context) { // waitCh is used to signal the retrieve loop, that it should process next blocks @@ -602,7 +559,8 @@ func (m *Manager) publishBlock(ctx context.Context) error { // Only update the stored height after successfully submitting to DA layer and committing to the DB m.store.SetHeight(uint64(block.SignedHeader.Header.Height())) - m.publishSignedHeader(block, commit) + // Publish header to channel so that header exchange service can broadcast + m.HeaderCh <- &block.SignedHeader return nil } @@ -639,11 +597,6 @@ func (m *Manager) exponentialBackoff(backoff time.Duration) time.Duration { return backoff } -// TODO(tzdybal): consider inlining -func (m *Manager) publishSignedHeader(block *types.Block, commit *types.Commit) { - m.HeaderOutCh <- &types.SignedHeader{Header: block.SignedHeader.Header, Commit: *commit} -} - func updateState(s *types.State, res *abci.ResponseInitChain) { // If the app did not return an app hash, we keep the one set from the genesis doc in // the state. We don't set appHash since we don't want the genesis doc app hash diff --git a/node/full.go b/node/full.go index ef2a7165a3..330e11e4c0 100644 --- a/node/full.go +++ b/node/full.go @@ -32,7 +32,6 @@ import ( "github.com/rollkit/rollkit/state/txindex" "github.com/rollkit/rollkit/state/txindex/kv" "github.com/rollkit/rollkit/store" - "github.com/rollkit/rollkit/types" ) // prefixes used in KV store to separate main node data from DALC data @@ -151,7 +150,7 @@ func newFullNode( return nil, fmt.Errorf("BlockManager initialization error: %w", err) } - headerExchangeService, err := NewHeaderExchangeService(ctx, mainKV, conf, genesis, client, blockManager.SyncedHeadersCh, logger.With("module", "HeaderExchangeService")) + headerExchangeService, err := NewHeaderExchangeService(ctx, mainKV, conf, genesis, client, logger.With("module", "HeaderExchangeService")) if err != nil { return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err) } @@ -182,7 +181,6 @@ func newFullNode( node.BaseService = *service.NewBaseService(logger, "Node", node) node.P2P.SetTxValidator(node.newTxValidator()) - node.P2P.SetHeaderValidator(node.newHeaderValidator()) node.P2P.SetFraudProofValidator(node.newFraudProofValidator()) return node, nil @@ -220,16 +218,8 @@ func (n *FullNode) initGenesisChunks() error { func (n *FullNode) headerPublishLoop(ctx context.Context) { for { select { - case signedHeader := <-n.blockManager.HeaderOutCh: + case signedHeader := <-n.blockManager.HeaderCh: n.hExService.writeToHeaderStoreAndBroadcast(ctx, signedHeader) - headerBytes, err := signedHeader.MarshalBinary() - if err != nil { - n.Logger.Error("failed to serialize signed block header", "error", err) - } - err = n.P2P.GossipSignedHeader(ctx, headerBytes) - if err != nil { - n.Logger.Error("failed to gossip signed block header", "error", err) - } case <-ctx.Done(): return } @@ -363,27 +353,6 @@ func (n *FullNode) newTxValidator() p2p.GossipValidator { } } -// newHeaderValidator returns a pubsub validator that runs basic checks and forwards -// the deserialized header for further processing -func (n *FullNode) newHeaderValidator() p2p.GossipValidator { - return func(headerMsg *p2p.GossipMessage) bool { - n.Logger.Debug("header received", "from", headerMsg.From, "bytes", len(headerMsg.Data)) - var header types.SignedHeader - err := header.UnmarshalBinary(headerMsg.Data) - if err != nil { - n.Logger.Error("failed to deserialize header", "error", err) - return false - } - err = header.ValidateBasic() - if err != nil { - n.Logger.Error("failed to validate header", "error", err) - return false - } - n.blockManager.HeaderInCh <- &header - return true - } -} - // newFraudProofValidator returns a pubsub validator that validates a fraud proof and forwards // it to be verified func (n *FullNode) newFraudProofValidator() p2p.GossipValidator { diff --git a/node/full_node_integration_test.go b/node/full_node_integration_test.go index babd579150..0327aeff2e 100644 --- a/node/full_node_integration_test.go +++ b/node/full_node_integration_test.go @@ -493,6 +493,7 @@ func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, d ListenAddress: "/ip4/127.0.0.1/tcp/" + strconv.Itoa(startPort+n), } bmConfig := config.BlockManagerConfig{ + DABlockTime: 100 * time.Millisecond, BlockTime: 1 * time.Second, // blocks must be at least 1 sec apart for adjacent headers to get verified correctly NamespaceID: types.NamespaceID{8, 7, 6, 5, 4, 3, 2, 1}, FraudProofs: true, diff --git a/node/header_exchange.go b/node/header_exchange.go index b2e6f4f9fd..68837f800c 100644 --- a/node/header_exchange.go +++ b/node/header_exchange.go @@ -26,22 +26,21 @@ import ( ) type HeaderExchangeService struct { - conf config.NodeConfig - genesis *tmtypes.GenesisDoc - p2p *p2p.Client - ex *goheaderp2p.Exchange[*types.SignedHeader] - syncer *sync.Syncer[*types.SignedHeader] - sub *goheaderp2p.Subscriber[*types.SignedHeader] - p2pServer *goheaderp2p.ExchangeServer[*types.SignedHeader] - headerStore *goheaderstore.Store[*types.SignedHeader] - syncerStarted bool - syncedHeadersCh chan *types.SignedHeader + conf config.NodeConfig + genesis *tmtypes.GenesisDoc + p2p *p2p.Client + ex *goheaderp2p.Exchange[*types.SignedHeader] + syncer *sync.Syncer[*types.SignedHeader] + sub *goheaderp2p.Subscriber[*types.SignedHeader] + p2pServer *goheaderp2p.ExchangeServer[*types.SignedHeader] + headerStore *goheaderstore.Store[*types.SignedHeader] + syncerStarted bool logger log.Logger ctx context.Context } -func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *tmtypes.GenesisDoc, p2p *p2p.Client, syncedHeadersCh chan *types.SignedHeader, logger log.Logger) (*HeaderExchangeService, error) { +func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *tmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*HeaderExchangeService, error) { // store is TxnDatastore, but we require Batching, hence the type assertion // note, the badger datastore impl that is used in the background implements both storeBatch, ok := store.(ds.Batching) @@ -54,13 +53,12 @@ func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf c } return &HeaderExchangeService{ - conf: conf, - genesis: genesis, - p2p: p2p, - ctx: ctx, - headerStore: ss, - syncedHeadersCh: syncedHeadersCh, - logger: logger, + conf: conf, + genesis: genesis, + p2p: p2p, + ctx: ctx, + headerStore: ss, + logger: logger, }, nil } @@ -92,13 +90,6 @@ func (hExService *HeaderExchangeService) tryInitHeaderStoreAndStartSyncer(ctx co if err := hExService.initHeaderStoreAndStartSyncer(ctx, trustedHeader); err != nil { hExService.logger.Error("failed to initialize the headerstore and start syncer", "error", err) } - } else { - signedHeader := <-hExService.syncedHeadersCh - if signedHeader.Header.Height() == hExService.genesis.InitialHeight { - if err := hExService.initHeaderStoreAndStartSyncer(ctx, signedHeader); err != nil { - hExService.logger.Error("failed to initialize the headerstore and start syncer", "error", err) - } - } } } diff --git a/node/light.go b/node/light.go index c09976a7c5..b4c99276a1 100644 --- a/node/light.go +++ b/node/light.go @@ -17,7 +17,6 @@ import ( "github.com/rollkit/rollkit/config" "github.com/rollkit/rollkit/p2p" "github.com/rollkit/rollkit/store" - "github.com/rollkit/rollkit/types" ) var _ Node = &LightNode{} @@ -56,7 +55,7 @@ func newLightNode( return nil, err } - headerExchangeService, err := NewHeaderExchangeService(ctx, datastore, conf, genesis, client, make(chan *types.SignedHeader), logger.With("module", "HeaderExchangeService")) + headerExchangeService, err := NewHeaderExchangeService(ctx, datastore, conf, genesis, client, logger.With("module", "HeaderExchangeService")) if err != nil { return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err) } diff --git a/types/signed_header.go b/types/signed_header.go index 3b15ece3e7..6e53050ad6 100644 --- a/types/signed_header.go +++ b/types/signed_header.go @@ -1,6 +1,10 @@ package types -import "github.com/celestiaorg/go-header" +import ( + "fmt" + + "github.com/celestiaorg/go-header" +) func (sH *SignedHeader) New() header.Header { return new(SignedHeader) @@ -9,3 +13,24 @@ func (sH *SignedHeader) New() header.Header { func (sH *SignedHeader) IsZero() bool { return sH == nil } + +func (sH *SignedHeader) VerifyAdjacent(untrst header.Header) error { + // Explicit type checks are required due to embedded Header which also does the explicit type check + untrstH, ok := untrst.(*SignedHeader) + if !ok { + return &header.VerifyError{ + Reason: fmt.Errorf("%T is not of type %T", untrst, sH), + } + } + return sH.Header.VerifyAdjacent(&untrstH.Header) +} + +func (sH *SignedHeader) VerifyNonAdjacent(untrst header.Header) error { + untrstH, ok := untrst.(*SignedHeader) + if !ok { + return &header.VerifyError{ + Reason: fmt.Errorf("%T is not of type %T", untrst, sH), + } + } + return sH.Header.VerifyNonAdjacent(&untrstH.Header) +}