Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 65 additions & 19 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ type Manager struct {
retrieveCond *sync.Cond

logger log.Logger

// For usage by Lazy Aggregator mode
buildingBlock bool
txsAvailable <-chan struct{}
doneBuildingBlock chan struct{}
}

// getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc.
Expand All @@ -96,6 +101,7 @@ func NewManager(
dalc da.DataAvailabilityLayerClient,
eventBus *tmtypes.EventBus,
logger log.Logger,
doneBuildingCh chan struct{},
) (*Manager, error) {
s, err := getInitialState(store, genesis)
if err != nil {
Expand Down Expand Up @@ -128,6 +134,13 @@ func NewManager(
}
}

var txsAvailableCh <-chan struct{}
if mempool != nil {
txsAvailableCh = mempool.TxsAvailable()
} else {
txsAvailableCh = nil
}

agg := &Manager{
proposerKey: proposerKey,
conf: conf,
Expand All @@ -139,14 +152,17 @@ func NewManager(
retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP)
daHeight: s.DAHeight,
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
HeaderOutCh: make(chan *types.SignedHeader, 100),
HeaderInCh: make(chan *types.SignedHeader, 100),
SyncedHeadersCh: make(chan *types.SignedHeader, 100),
blockInCh: make(chan newBlockEvent, 100),
FraudProofInCh: make(chan *abci.FraudProof, 100),
retrieveMtx: new(sync.Mutex),
syncCache: make(map[uint64]*types.Block),
logger: logger,
HeaderOutCh: make(chan *types.SignedHeader, 100),
HeaderInCh: make(chan *types.SignedHeader, 100),
SyncedHeadersCh: make(chan *types.SignedHeader, 100),
blockInCh: make(chan newBlockEvent, 100),
FraudProofInCh: make(chan *abci.FraudProof, 100),
retrieveMtx: new(sync.Mutex),
syncCache: make(map[uint64]*types.Block),
logger: logger,
txsAvailable: txsAvailableCh,
doneBuildingBlock: doneBuildingCh,
buildingBlock: false,
}
agg.retrieveCond = sync.NewCond(agg.retrieveMtx)

Expand All @@ -172,7 +188,7 @@ func (m *Manager) GetFraudProofOutChan() chan *abci.FraudProof {
}

// AggregationLoop is responsible for aggregating transactions into rollup-blocks.
func (m *Manager) AggregationLoop(ctx context.Context) {
func (m *Manager) AggregationLoop(ctx context.Context, lazy bool) {
Comment thread
S1nus marked this conversation as resolved.
initialHeight := uint64(m.genesis.InitialHeight)
height := m.store.Height()
var delay time.Duration
Expand All @@ -189,19 +205,49 @@ func (m *Manager) AggregationLoop(ctx context.Context) {
time.Sleep(delay)
}

//var timer *time.Timer
timer := time.NewTimer(0)

for {
select {
case <-ctx.Done():
return
case <-timer.C:
start := time.Now()
err := m.publishBlock(ctx)
if err != nil {
m.logger.Error("error while publishing block", "error", err)
if !lazy {
for {
select {
case <-ctx.Done():
return
case <-timer.C:
start := time.Now()
err := m.publishBlock(ctx)
if err != nil {
m.logger.Error("error while publishing block", "error", err)
}
timer.Reset(m.getRemainingSleep(start))
}
}
} else {
<-timer.C
for {
select {
case <-ctx.Done():
return
// the buildBlock channel is signalled when Txns become available
// in the mempool, or after transactions remain in the mempool after
// building a block.
case <-m.txsAvailable:
if !m.buildingBlock {
m.buildingBlock = true
timer.Reset(1 * time.Second)
}
case <-timer.C:
// build a block with all the transactions received in the last 1 second
err := m.publishBlock(ctx)
if err != nil {
m.logger.Error("error while publishing block", "error", err)
}
// this can be used to notify multiple subscribers when a block has been built
// intended to help improve the UX of lightclient frontends and wallets.
close(m.doneBuildingBlock)
m.doneBuildingBlock = make(chan struct{})
m.buildingBlock = false
}
timer.Reset(m.getRemainingSleep(start))
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func TestInitialState(t *testing.T) {
assert := assert.New(t)
logger := log.TestingLogger()
dalc := getMockDALC(logger)
agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, nil, logger)
dumbChan := make(chan struct{})
Comment thread
S1nus marked this conversation as resolved.
agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, nil, logger, dumbChan)
assert.NoError(err)
assert.NotNil(agg)
assert.Equal(c.expectedChainID, agg.lastState.ChainID)
Expand Down
24 changes: 14 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ import (
)

const (
flagAggregator = "rollkit.aggregator"
flagDALayer = "rollkit.da_layer"
flagDAConfig = "rollkit.da_config"
flagBlockTime = "rollkit.block_time"
flagDABlockTime = "rollkit.da_block_time"
flagDAStartHeight = "rollkit.da_start_height"
flagNamespaceID = "rollkit.namespace_id"
flagFraudProofs = "rollkit.experimental_insecure_fraud_proofs"
flagLight = "rollkit.light"
flagTrustedHash = "rollkit.trusted_hash"
flagAggregator = "rollkit.aggregator"
flagDALayer = "rollkit.da_layer"
flagDAConfig = "rollkit.da_config"
flagBlockTime = "rollkit.block_time"
flagDABlockTime = "rollkit.da_block_time"
flagDAStartHeight = "rollkit.da_start_height"
flagNamespaceID = "rollkit.namespace_id"
flagFraudProofs = "rollkit.experimental_insecure_fraud_proofs"
flagLight = "rollkit.light"
flagTrustedHash = "rollkit.trusted_hash"
flagLazyAggregator = "rollkit.lazy_aggregator"
)

// NodeConfig stores Rollkit node configuration.
Expand All @@ -37,6 +38,7 @@ type NodeConfig struct {
DAConfig string `mapstructure:"da_config"`
Light bool `mapstructure:"light"`
HeaderConfig `mapstructure:",squash"`
LazyAggregator bool `mapstructure:"lazy_aggregator"`
}

// HeaderConfig allows node to pass the initial trusted header hash to start the header exchange service
Expand Down Expand Up @@ -66,6 +68,7 @@ func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error {
nc.DAStartHeight = v.GetUint64(flagDAStartHeight)
nc.DABlockTime = v.GetDuration(flagDABlockTime)
nc.BlockTime = v.GetDuration(flagBlockTime)
nc.LazyAggregator = v.GetBool(flagLazyAggregator)
nsID := v.GetString(flagNamespaceID)
nc.FraudProofs = v.GetBool(flagFraudProofs)
nc.Light = v.GetBool(flagLight)
Expand All @@ -84,6 +87,7 @@ func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error {
func AddFlags(cmd *cobra.Command) {
def := DefaultNodeConfig
cmd.Flags().Bool(flagAggregator, def.Aggregator, "run node in aggregator mode")
cmd.Flags().Bool(flagLazyAggregator, def.LazyAggregator, "wait for transactions, don't build empty blocks")
cmd.Flags().String(flagDALayer, def.DALayer, "Data Availability Layer Client name (mock or grpc")
cmd.Flags().String(flagDAConfig, def.DAConfig, "Data Availability Layer Client config")
cmd.Flags().Duration(flagBlockTime, def.BlockTime, "block time (for aggregator mode)")
Expand Down
3 changes: 2 additions & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ var DefaultNodeConfig = NodeConfig{
ListenAddress: DefaultListenAddress,
Seeds: "",
},
Aggregator: false,
Aggregator: false,
LazyAggregator: false,
Comment thread
S1nus marked this conversation as resolved.
BlockManagerConfig: BlockManagerConfig{
BlockTime: 30 * time.Second,
NamespaceID: types.NamespaceID{},
Expand Down
44 changes: 25 additions & 19 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type FullNode struct {
ctx context.Context

cancel context.CancelFunc

// For use in Lazy Aggregator
DoneBuildingBlock chan struct{}
}

// newFullNode creates a new Rollkit full node.
Expand Down Expand Up @@ -140,8 +143,10 @@ func newFullNode(

mp := mempoolv1.NewTxMempool(logger, llcfg.DefaultMempoolConfig(), appClient, 0)
mpIDs := newMempoolIDs()
mp.EnableTxsAvailable()

blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, appClient, dalc, eventBus, logger.With("module", "BlockManager"))
doneBuildingChannel := make(chan struct{})
blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, appClient, dalc, eventBus, logger.With("module", "BlockManager"), doneBuildingChannel)
if err != nil {
return nil, fmt.Errorf("BlockManager initialization error: %w", err)
}
Expand All @@ -154,23 +159,24 @@ func newFullNode(
ctx, cancel := context.WithCancel(ctx)

node := &FullNode{
appClient: appClient,
eventBus: eventBus,
genesis: genesis,
conf: conf,
P2P: client,
blockManager: blockManager,
dalc: dalc,
Mempool: mp,
mempoolIDs: mpIDs,
incomingTxCh: make(chan *p2p.GossipMessage),
Store: s,
TxIndexer: txIndexer,
IndexerService: indexerService,
BlockIndexer: blockIndexer,
hExService: headerExchangeService,
ctx: ctx,
cancel: cancel,
appClient: appClient,
eventBus: eventBus,
genesis: genesis,
conf: conf,
P2P: client,
blockManager: blockManager,
dalc: dalc,
Mempool: mp,
mempoolIDs: mpIDs,
incomingTxCh: make(chan *p2p.GossipMessage),
Store: s,
TxIndexer: txIndexer,
IndexerService: indexerService,
BlockIndexer: blockIndexer,
hExService: headerExchangeService,
ctx: ctx,
cancel: cancel,
DoneBuildingBlock: doneBuildingChannel,
}

node.BaseService = *service.NewBaseService(logger, "Node", node)
Expand Down Expand Up @@ -268,7 +274,7 @@ func (n *FullNode) OnStart() error {
}
if n.conf.Aggregator {
n.Logger.Info("working in aggregator mode", "block time", n.conf.BlockTime)
go n.blockManager.AggregationLoop(n.ctx)
go n.blockManager.AggregationLoop(n.ctx, n.conf.LazyAggregator)
go n.headerPublishLoop(n.ctx)
}
go n.blockManager.RetrieveLoop(n.ctx)
Expand Down
58 changes: 58 additions & 0 deletions node/full_node_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,64 @@ func TestTxGossipingAndAggregation(t *testing.T) {
}
}

func TestLazyAggregator(t *testing.T) {
Comment thread
S1nus marked this conversation as resolved.
assert := assert.New(t)
require := require.New(t)

app := &mocks.Application{}
app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{})
app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{})
app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{})
app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{})
app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{})
app.On("Commit", mock.Anything).Return(abci.ResponseCommit{})
app.On("GetAppHash", mock.Anything).Return(abci.ResponseGetAppHash{})

key, _, _ := crypto.GenerateEd25519Key(rand.Reader)
signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)

blockManagerConfig := config.BlockManagerConfig{
BlockTime: 1 * time.Second,
NamespaceID: types.NamespaceID{1, 2, 3, 4, 5, 6, 7, 8},
}

node, err := NewNode(context.Background(), config.NodeConfig{
DALayer: "mock",
Aggregator: true,
BlockManagerConfig: blockManagerConfig,
LazyAggregator: true,
}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger())
assert.False(node.IsRunning())
assert.NoError(err)
err = node.Start()
assert.NoError(err)
defer func() {
err := node.Stop()
assert.NoError(err)
}()
assert.True(node.IsRunning())

require.NoError(err)

client := node.GetClient()

_, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 1})
assert.NoError(err)
time.Sleep(2 * time.Second)
assert.Equal(node.(*FullNode).Store.Height(), uint64(1))

_, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 2})
assert.NoError(err)
time.Sleep(2 * time.Second)
assert.Equal(node.(*FullNode).Store.Height(), uint64(2))

_, err = client.BroadcastTxCommit(context.Background(), []byte{0, 0, 0, 3})
assert.NoError(err)
time.Sleep(2 * time.Second)
assert.Equal(node.(*FullNode).Store.Height(), uint64(3))

}

func TestHeaderExchange(t *testing.T) {
testSingleAggreatorSingleFullNode(t)
testSingleAggreatorTwoFullNode(t)
Expand Down
1 change: 1 addition & 0 deletions state/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (e *BlockExecutor) Commit(ctx context.Context, state types.State, block *ty
if err != nil {
e.logger.Error("failed to fire block events", "error", err)
}

return appHash, retainHeight, nil
}

Expand Down