diff --git a/block/manager.go b/block/manager.go index 73325b3d73..ef8a995f4c 100644 --- a/block/manager.go +++ b/block/manager.go @@ -193,7 +193,7 @@ func NewManager( mempool mempool.Mempool, mempoolReaper *mempool.CListMempoolReaper, seqClient *grpc.Client, - proxyApp proxy.AppConnConsensus, + proxyApp proxy.AppConns, dalc *da.DAClient, eventBus *cmtypes.EventBus, logger log.Logger, @@ -242,7 +242,7 @@ func NewManager( // allow buffer for the block header and protocol encoding maxBlobSize -= blockProtocolOverhead - exec := state.NewBlockExecutor(proposerAddress, genesis.ChainID, mempool, mempoolReaper, proxyApp, eventBus, maxBlobSize, logger, execMetrics) + exec := state.NewBlockExecutor(proposerAddress, genesis.ChainID, mempool, mempoolReaper, proxyApp.Consensus(), eventBus, maxBlobSize, logger, execMetrics) if s.LastBlockHeight+1 == uint64(genesis.InitialHeight) { //nolint:gosec res, err := exec.InitChain(genesis) if err != nil { @@ -298,6 +298,14 @@ func NewManager( bq: NewBatchQueue(), } agg.init(context.Background()) + + if agg.conf.Replay { + // Handshake to ensure that app and rollup are in sync + if err := agg.Handshake(context.Background(), proxyApp); err != nil { + return nil, err + } + } + return agg, nil } @@ -1492,3 +1500,125 @@ func updateState(s *types.State, res *abci.ResponseInitChain) error { return nil } + +// Handshake performs the ABCI handshake with the application. +func (m *Manager) Handshake(ctx context.Context, proxyApp proxy.AppConns) error { + // Handshake is done via ABCI Info on the query conn. + res, err := proxyApp.Query().Info(ctx, proxy.RequestInfo) + if err != nil { + return fmt.Errorf("error calling Info: %w", err) + } + + blockHeight := res.LastBlockHeight + if blockHeight < 0 { + return fmt.Errorf("got a negative last block height (%d) from the app", blockHeight) + } + appHash := res.LastBlockAppHash + + m.logger.Info("ABCI Handshake App Info", + "height", blockHeight, + "hash", fmt.Sprintf("%X", appHash), + "software-version", res.Version, + "protocol-version", res.AppVersion, + ) + + // Replay blocks up to the latest in the blockstore. + appHash, err = m.ReplayBlocks(ctx, appHash, blockHeight) + if err != nil { + return fmt.Errorf("error on replay: %w", err) + } + + m.logger.Info("Completed ABCI Handshake - CometBFT and App are synced", + "appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash)) + + // TODO: (on restart) replay mempool + + return nil +} + +// ReplayBlocks replays blocks from the last state to the app's last block height. +func (m *Manager) ReplayBlocks( + ctx context.Context, + appHash []byte, + appBlockHeight int64, +) ([]byte, error) { + state := m.lastState + stateBlockHeight := m.lastState.LastBlockHeight + m.logger.Info( + "ABCI Replay Blocks", + "appHeight", + appBlockHeight, + "stateHeight", + stateBlockHeight) + + if appBlockHeight < int64(stateBlockHeight) { + // the app is behind, so replay blocks + return m.replayBlocks(ctx, state, uint64(appBlockHeight), stateBlockHeight) + } else if appBlockHeight == int64(stateBlockHeight) { + // We're good! + assertAppHashEqualsOneFromState(appHash, state) + return appHash, nil + } + + return nil, fmt.Errorf("app height (%d) higher than state height (%d), possible app rollback needed", appBlockHeight, stateBlockHeight) +} + +func (m *Manager) replayBlocks( + ctx context.Context, + state types.State, + appBlockHeight, + stateBlockHeight uint64, +) ([]byte, error) { + var appHash []byte + finalBlock := stateBlockHeight + firstBlock := appBlockHeight + 1 + if firstBlock == 1 { + firstBlock = state.InitialHeight + } + for i := firstBlock; i <= finalBlock; i++ { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + m.logger.Info("Applying block", "height", i) + header, data, err := m.store.GetBlockData(ctx, i) + if err != nil { + return nil, fmt.Errorf("failed to get block data for height %d: %w", i, err) + } + // Extra check to ensure the app was not changed in a way it shouldn't have. + if len(appHash) > 0 { + assertAppHashEqualsOneFromBlock(appHash, header) + } + appHash, err = m.executor.ExecCommitBlock(header, data, m.logger, state) + if err != nil { + return nil, err + } + } + + assertAppHashEqualsOneFromState(appHash, state) + return appHash, nil +} + +func assertAppHashEqualsOneFromBlock(appHash []byte, header *types.SignedHeader) { + if !bytes.Equal(appHash, header.AppHash) { + panic(fmt.Sprintf(`block.AppHash does not match AppHash after replay. Got %X, expected %X. + +Block: %v +`, + appHash, header.AppHash, header)) + } +} + +func assertAppHashEqualsOneFromState(appHash []byte, state types.State) { + if !bytes.Equal(appHash, state.AppHash) { + panic(fmt.Sprintf(`state.AppHash does not match AppHash after replay. Got +%X, expected %X. + +State: %v + +Did you reset CometBFT without resetting your application's data?`, + appHash, state.AppHash, state)) + } +} diff --git a/cmd/rollkit/docs/rollkit_start.md b/cmd/rollkit/docs/rollkit_start.md index 110d5a851d..749b71239d 100644 --- a/cmd/rollkit/docs/rollkit_start.md +++ b/cmd/rollkit/docs/rollkit_start.md @@ -44,6 +44,7 @@ rollkit start [flags] --rollkit.lazy_block_time duration block time (for lazy mode) (default 1m0s) --rollkit.light run light client --rollkit.max_pending_blocks uint limit of blocks pending DA submission (0 for no limit) + --rollkit.replay enable block replay to synchronize application and rollup states --rollkit.sequencer_address string sequencer middleware address (host:port) (default "localhost:50051") --rollkit.sequencer_rollup_id string sequencer middleware rollup ID (default: mock-rollup) (default "mock-rollup") --rollkit.trusted_hash string initial trusted hash to start the header exchange service diff --git a/config/config.go b/config/config.go index 070495b0b1..467b766391 100644 --- a/config/config.go +++ b/config/config.go @@ -46,6 +46,8 @@ const ( FlagSequencerAddress = "rollkit.sequencer_address" // FlagSequencerRollupID is a flag for specifying the sequencer middleware rollup ID FlagSequencerRollupID = "rollkit.sequencer_rollup_id" + // FlagReplay is a flag for replaying blocks + FlagReplay = "rollkit.replay" ) // NodeConfig stores Rollkit node configuration. @@ -96,6 +98,8 @@ type BlockManagerConfig struct { // LazyBlockTime defines how often new blocks are produced in lazy mode // even if there are no transactions LazyBlockTime time.Duration `mapstructure:"lazy_block_time"` + // Replay defines whether to replay blocks + Replay bool `mapstructure:"replay"` } // GetNodeConfig translates Tendermint's configuration into Rollkit configuration. @@ -147,6 +151,7 @@ func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error { nc.LazyBlockTime = v.GetDuration(FlagLazyBlockTime) nc.SequencerAddress = v.GetString(FlagSequencerAddress) nc.SequencerRollupID = v.GetString(FlagSequencerRollupID) + nc.Replay = v.GetBool(FlagReplay) return nil } @@ -175,4 +180,5 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Duration(FlagLazyBlockTime, def.LazyBlockTime, "block time (for lazy mode)") cmd.Flags().String(FlagSequencerAddress, def.SequencerAddress, "sequencer middleware address (host:port)") cmd.Flags().String(FlagSequencerRollupID, def.SequencerRollupID, "sequencer middleware rollup ID (default: mock-rollup)") + cmd.Flags().Bool(FlagReplay, def.Replay, "enable block replay to synchronize application and rollup states") } diff --git a/config/defaults.go b/config/defaults.go index 20aee801d3..34b118b37d 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -32,6 +32,7 @@ var DefaultNodeConfig = NodeConfig{ DABlockTime: 15 * time.Second, LazyAggregator: false, LazyBlockTime: 60 * time.Second, + Replay: false, }, DAAddress: DefaultDAAddress, DAGasPrice: -1, diff --git a/node/full.go b/node/full.go index 458c9b919b..10b926154b 100644 --- a/node/full.go +++ b/node/full.go @@ -280,7 +280,7 @@ func initDataSyncService(mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, g } func initBlockManager(signingKey crypto.PrivKey, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, store store.Store, mempool mempool.Mempool, mempoolReaper *mempool.CListMempoolReaper, seqClient *seqGRPC.Client, proxyApp proxy.AppConns, dalc *da.DAClient, eventBus *cmtypes.EventBus, logger log.Logger, headerSyncService *block.HeaderSyncService, dataSyncService *block.DataSyncService, seqMetrics *block.Metrics, execMetrics *state.Metrics) (*block.Manager, error) { - blockManager, err := block.NewManager(signingKey, nodeConfig.BlockManagerConfig, genesis, store, mempool, mempoolReaper, seqClient, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"), headerSyncService.Store(), dataSyncService.Store(), seqMetrics, execMetrics) + blockManager, err := block.NewManager(signingKey, nodeConfig.BlockManagerConfig, genesis, store, mempool, mempoolReaper, seqClient, proxyApp, dalc, eventBus, logger.With("module", "BlockManager"), headerSyncService.Store(), dataSyncService.Store(), seqMetrics, execMetrics) if err != nil { return nil, fmt.Errorf("error while initializing BlockManager: %w", err) } diff --git a/state/executor.go b/state/executor.go index d5ce1df0db..0f88302173 100644 --- a/state/executor.go +++ b/state/executor.go @@ -289,6 +289,62 @@ func (e *BlockExecutor) Commit(ctx context.Context, state types.State, header *t return appHash, retainHeight, nil } +// ---------------------------------------------------------------------------------------------------- +// Execute block without state. TODO: eliminate + +// ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state. +// It returns the application root hash (result of abci.Commit). +func (e *BlockExecutor) ExecCommitBlock( + header *types.SignedHeader, + data *types.Data, + logger log.Logger, + state types.State, +) ([]byte, error) { + abciHeader, err := abciconv.ToABCIHeaderPB(&header.Header) + if err != nil { + return nil, err + } + abciHeader.ChainID = e.chainID + abciBlock, err := abciconv.ToABCIBlock(header, data) + if err != nil { + return nil, err + } + resp, err := e.proxyApp.FinalizeBlock(context.TODO(), &abci.RequestFinalizeBlock{ + Hash: header.Hash(), + NextValidatorsHash: state.Validators.Hash(), + ProposerAddress: abciHeader.ProposerAddress, + Height: abciHeader.Height, + Time: abciHeader.Time, + DecidedLastCommit: abci.CommitInfo{ + Round: 0, + Votes: nil, + }, + Misbehavior: abciBlock.Evidence.Evidence.ToABCI(), + Txs: abciBlock.Txs.ToSliceOfBytes(), + }) + if err != nil { + logger.Error("Error in proxyAppConn.FinalizeBlock", "err", err) + return nil, err + } + + // Assert that the application correctly returned tx results for each of the transactions provided in the block + if len(data.Txs) != len(resp.TxResults) { + return nil, fmt.Errorf("expected tx results length to match size of transactions in block. Expected %d, got %d", len(data.Txs), len(resp.TxResults)) + } + + logger.Info("Executed block", "height", header.Height(), "app_hash", fmt.Sprintf("%X", resp.AppHash)) + + // Commit block + _, err = e.proxyApp.Commit(context.TODO()) + if err != nil { + logger.Error("Client error during proxyAppConn.Commit", "err", err) + return nil, err + } + + // ResponseCommit has no error or log + return resp.AppHash, nil +} + // updateConsensusParams updates the consensus parameters based on the provided updates. func (e *BlockExecutor) updateConsensusParams(height uint64, params cmtypes.ConsensusParams, consensusParamUpdates *cmproto.ConsensusParams) (cmproto.ConsensusParams, uint64, error) { nextParams := params.Update(consensusParamUpdates)