diff --git a/block/manager.go b/block/manager.go index eae533986b..990b8634b3 100644 --- a/block/manager.go +++ b/block/manager.go @@ -74,6 +74,11 @@ type Manager struct { retrieveCond *sync.Cond logger log.Logger + + // For usage by Lazy Aggregator mode + buildingBlock bool + txsAvailable <-chan struct{} + doneBuildingBlock chan struct{} } // getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc. @@ -96,6 +101,7 @@ func NewManager( dalc da.DataAvailabilityLayerClient, eventBus *tmtypes.EventBus, logger log.Logger, + doneBuildingCh chan struct{}, ) (*Manager, error) { s, err := getInitialState(store, genesis) if err != nil { @@ -128,6 +134,13 @@ func NewManager( } } + var txsAvailableCh <-chan struct{} + if mempool != nil { + txsAvailableCh = mempool.TxsAvailable() + } else { + txsAvailableCh = nil + } + agg := &Manager{ proposerKey: proposerKey, conf: conf, @@ -139,14 +152,17 @@ func NewManager( retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP) daHeight: s.DAHeight, // channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary - HeaderOutCh: make(chan *types.SignedHeader, 100), - HeaderInCh: make(chan *types.SignedHeader, 100), - SyncedHeadersCh: make(chan *types.SignedHeader, 100), - blockInCh: make(chan newBlockEvent, 100), - FraudProofInCh: make(chan *abci.FraudProof, 100), - retrieveMtx: new(sync.Mutex), - syncCache: make(map[uint64]*types.Block), - logger: logger, + HeaderOutCh: make(chan *types.SignedHeader, 100), + HeaderInCh: make(chan *types.SignedHeader, 100), + SyncedHeadersCh: make(chan *types.SignedHeader, 100), + blockInCh: make(chan newBlockEvent, 100), + FraudProofInCh: make(chan *abci.FraudProof, 100), + retrieveMtx: new(sync.Mutex), + syncCache: make(map[uint64]*types.Block), + logger: logger, + txsAvailable: txsAvailableCh, + doneBuildingBlock: doneBuildingCh, + buildingBlock: false, } agg.retrieveCond = sync.NewCond(agg.retrieveMtx) @@ -172,7 +188,7 @@ func (m *Manager) GetFraudProofOutChan() chan *abci.FraudProof { } // AggregationLoop is responsible for aggregating transactions into rollup-blocks. -func (m *Manager) AggregationLoop(ctx context.Context) { +func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) { initialHeight := uint64(m.genesis.InitialHeight) height := m.store.Height() var delay time.Duration @@ -189,19 +205,49 @@ func (m *Manager) AggregationLoop(ctx context.Context) { time.Sleep(delay) } + //var timer *time.Timer timer := time.NewTimer(0) - for { - select { - case <-ctx.Done(): - return - case <-timer.C: - start := time.Now() - err := m.publishBlock(ctx) - if err != nil { - m.logger.Error("error while publishing block", "error", err) + if !lazy { + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + start := time.Now() + err := m.publishBlock(ctx) + if err != nil { + m.logger.Error("error while publishing block", "error", err) + } + timer.Reset(m.getRemainingSleep(start)) + } + } + } else { + <-timer.C + for { + select { + case <-ctx.Done(): + return + // the buildBlock channel is signalled when Txns become available + // in the mempool, or after transactions remain in the mempool after + // building a block. + case <-m.txsAvailable: + if !m.buildingBlock { + m.buildingBlock = true + timer.Reset(1 * time.Second) + } + case <-timer.C: + // build a block with all the transactions received in the last 1 second + err := m.publishBlock(ctx) + if err != nil { + m.logger.Error("error while publishing block", "error", err) + } + // this can be used to notify multiple subscribers when a block has been built + // intended to help improve the UX of lightclient frontends and wallets. + close(m.doneBuildingBlock) + m.doneBuildingBlock = make(chan struct{}) + m.buildingBlock = false } - timer.Reset(m.getRemainingSleep(start)) } } } diff --git a/block/manager_test.go b/block/manager_test.go index beab374597..4eec43f867 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -80,7 +80,8 @@ func TestInitialState(t *testing.T) { assert := assert.New(t) logger := log.TestingLogger() dalc := getMockDALC(logger) - agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, nil, logger) + dumbChan := make(chan struct{}) + agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, nil, logger, dumbChan) assert.NoError(err) assert.NotNil(agg) assert.Equal(c.expectedChainID, agg.lastState.ChainID) diff --git a/config/config.go b/config/config.go index 07388e78ec..7ca286a203 100644 --- a/config/config.go +++ b/config/config.go @@ -11,16 +11,17 @@ import ( ) const ( - flagAggregator = "rollkit.aggregator" - flagDALayer = "rollkit.da_layer" - flagDAConfig = "rollkit.da_config" - flagBlockTime = "rollkit.block_time" - flagDABlockTime = "rollkit.da_block_time" - flagDAStartHeight = "rollkit.da_start_height" - flagNamespaceID = "rollkit.namespace_id" - flagFraudProofs = "rollkit.experimental_insecure_fraud_proofs" - flagLight = "rollkit.light" - flagTrustedHash = "rollkit.trusted_hash" + flagAggregator = "rollkit.aggregator" + flagDALayer = "rollkit.da_layer" + flagDAConfig = "rollkit.da_config" + flagBlockTime = "rollkit.block_time" + flagDABlockTime = "rollkit.da_block_time" + flagDAStartHeight = "rollkit.da_start_height" + flagNamespaceID = "rollkit.namespace_id" + flagFraudProofs = "rollkit.experimental_insecure_fraud_proofs" + flagLight = "rollkit.light" + flagTrustedHash = "rollkit.trusted_hash" + flagLazyAggregator = "rollkit.lazy_aggregator" ) // NodeConfig stores Rollkit node configuration. @@ -37,6 +38,7 @@ type NodeConfig struct { DAConfig string `mapstructure:"da_config"` Light bool `mapstructure:"light"` HeaderConfig `mapstructure:",squash"` + LazyAggregator bool `mapstructure:"lazy_aggregator"` } // HeaderConfig allows node to pass the initial trusted header hash to start the header exchange service @@ -66,6 +68,7 @@ func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error { nc.DAStartHeight = v.GetUint64(flagDAStartHeight) nc.DABlockTime = v.GetDuration(flagDABlockTime) nc.BlockTime = v.GetDuration(flagBlockTime) + nc.LazyAggregator = v.GetBool(flagLazyAggregator) nsID := v.GetString(flagNamespaceID) nc.FraudProofs = v.GetBool(flagFraudProofs) nc.Light = v.GetBool(flagLight) @@ -84,6 +87,7 @@ func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error { func AddFlags(cmd *cobra.Command) { def := DefaultNodeConfig cmd.Flags().Bool(flagAggregator, def.Aggregator, "run node in aggregator mode") + cmd.Flags().Bool(flagLazyAggregator, def.LazyAggregator, "wait for transactions, don't build empty blocks") cmd.Flags().String(flagDALayer, def.DALayer, "Data Availability Layer Client name (mock or grpc") cmd.Flags().String(flagDAConfig, def.DAConfig, "Data Availability Layer Client config") cmd.Flags().Duration(flagBlockTime, def.BlockTime, "block time (for aggregator mode)") diff --git a/config/defaults.go b/config/defaults.go index 3a733ea422..d3daedc363 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -18,7 +18,8 @@ var DefaultNodeConfig = NodeConfig{ ListenAddress: DefaultListenAddress, Seeds: "", }, - Aggregator: false, + Aggregator: false, + LazyAggregator: false, BlockManagerConfig: BlockManagerConfig{ BlockTime: 30 * time.Second, NamespaceID: types.NamespaceID{}, diff --git a/node/full.go b/node/full.go index 1770229b06..ef2a7165a3 100644 --- a/node/full.go +++ b/node/full.go @@ -84,6 +84,9 @@ type FullNode struct { ctx context.Context cancel context.CancelFunc + + // For use in Lazy Aggregator + DoneBuildingBlock chan struct{} } // newFullNode creates a new Rollkit full node. @@ -140,8 +143,10 @@ func newFullNode( mp := mempoolv1.NewTxMempool(logger, llcfg.DefaultMempoolConfig(), appClient, 0) mpIDs := newMempoolIDs() + mp.EnableTxsAvailable() - blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, appClient, dalc, eventBus, logger.With("module", "BlockManager")) + doneBuildingChannel := make(chan struct{}) + blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, appClient, dalc, eventBus, logger.With("module", "BlockManager"), doneBuildingChannel) if err != nil { return nil, fmt.Errorf("BlockManager initialization error: %w", err) } @@ -154,23 +159,24 @@ func newFullNode( ctx, cancel := context.WithCancel(ctx) node := &FullNode{ - appClient: appClient, - eventBus: eventBus, - genesis: genesis, - conf: conf, - P2P: client, - blockManager: blockManager, - dalc: dalc, - Mempool: mp, - mempoolIDs: mpIDs, - incomingTxCh: make(chan *p2p.GossipMessage), - Store: s, - TxIndexer: txIndexer, - IndexerService: indexerService, - BlockIndexer: blockIndexer, - hExService: headerExchangeService, - ctx: ctx, - cancel: cancel, + appClient: appClient, + eventBus: eventBus, + genesis: genesis, + conf: conf, + P2P: client, + blockManager: blockManager, + dalc: dalc, + Mempool: mp, + mempoolIDs: mpIDs, + incomingTxCh: make(chan *p2p.GossipMessage), + Store: s, + TxIndexer: txIndexer, + IndexerService: indexerService, + BlockIndexer: blockIndexer, + hExService: headerExchangeService, + ctx: ctx, + cancel: cancel, + DoneBuildingBlock: doneBuildingChannel, } node.BaseService = *service.NewBaseService(logger, "Node", node) @@ -268,7 +274,7 @@ func (n *FullNode) OnStart() error { } if n.conf.Aggregator { n.Logger.Info("working in aggregator mode", "block time", n.conf.BlockTime) - go n.blockManager.AggregationLoop(n.ctx) + go n.blockManager.AggregationLoop(n.ctx, n.conf.LazyAggregator) go n.headerPublishLoop(n.ctx) } go n.blockManager.RetrieveLoop(n.ctx) diff --git a/node/full_node_integration_test.go b/node/full_node_integration_test.go index 43a7c6a93a..babd579150 100644 --- a/node/full_node_integration_test.go +++ b/node/full_node_integration_test.go @@ -133,6 +133,64 @@ func TestTxGossipingAndAggregation(t *testing.T) { } } +func TestLazyAggregator(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + app := &mocks.Application{} + app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) + app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{}) + app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) + app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{}) + app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) + app.On("Commit", mock.Anything).Return(abci.ResponseCommit{}) + app.On("GetAppHash", mock.Anything).Return(abci.ResponseGetAppHash{}) + + key, _, _ := crypto.GenerateEd25519Key(rand.Reader) + signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) + + blockManagerConfig := config.BlockManagerConfig{ + BlockTime: 1 * time.Second, + NamespaceID: types.NamespaceID{1, 2, 3, 4, 5, 6, 7, 8}, + } + + node, err := NewNode(context.Background(), config.NodeConfig{ + DALayer: "mock", + Aggregator: true, + BlockManagerConfig: blockManagerConfig, + LazyAggregator: true, + }, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + assert.False(node.IsRunning()) + assert.NoError(err) + err = node.Start() + assert.NoError(err) + defer func() { + err := node.Stop() + assert.NoError(err) + }() + assert.True(node.IsRunning()) + + require.NoError(err) + + client := node.GetClient() + + _, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 1}) + assert.NoError(err) + time.Sleep(2 * time.Second) + assert.Equal(node.(*FullNode).Store.Height(), uint64(1)) + + _, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 2}) + assert.NoError(err) + time.Sleep(2 * time.Second) + assert.Equal(node.(*FullNode).Store.Height(), uint64(2)) + + _, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 3}) + assert.NoError(err) + time.Sleep(2 * time.Second) + assert.Equal(node.(*FullNode).Store.Height(), uint64(3)) + +} + func TestHeaderExchange(t *testing.T) { testSingleAggreatorSingleFullNode(t) testSingleAggreatorTwoFullNode(t) diff --git a/state/executor.go b/state/executor.go index c65f891d32..fe466cda44 100644 --- a/state/executor.go +++ b/state/executor.go @@ -182,6 +182,7 @@ func (e *BlockExecutor) Commit(ctx context.Context, state types.State, block *ty if err != nil { e.logger.Error("failed to fire block events", "error", err) } + return appHash, retainHeight, nil }