Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
338d8ae
separate server for centralized sequencer user facing inputs
Jan 25, 2023
806639e
handler
Jan 26, 2023
fc6bba4
directTx now works with bytes arrays instead of p2p.GossipMessage. ne…
Jan 29, 2023
d7b6090
test for direct sequencer server
Jan 29, 2023
98a6d61
progressive block build
Jan 30, 2023
675e547
test almost done
Jan 30, 2023
12f01ad
test for multiple transactions sent directly
Jan 30, 2023
1dfb882
fixed bug in test
Jan 31, 2023
0623f6e
ProgressiveAggregation test works.
Feb 2, 2023
539cf74
clean up
Feb 2, 2023
b3a5ba2
merged main
Feb 2, 2023
3c752f8
began adding receive_direct_tx to the existing server, to phase out t…
Feb 6, 2023
fa6c6e4
made modificiations necessary to reuse existing server, test in the w…
Feb 7, 2023
2d4d7ec
working on test for using existing server
Feb 8, 2023
beee58c
wsConn was the issue
Feb 9, 2023
43eabae
cleaned up
Feb 9, 2023
d2da042
changed bool to ResultDirectTx
Feb 10, 2023
315e7ae
working test
Feb 10, 2023
7b34fdb
consolidate progressive and regular aggregation loops
Feb 10, 2023
791c633
changed var to const
Feb 10, 2023
3043a70
merge main
Feb 10, 2023
87c0bbd
goimports
Feb 10, 2023
9763b8b
removed some unnecessary calls
Feb 11, 2023
0038a55
simplified err handling
Feb 11, 2023
886a514
fixed comments
Feb 11, 2023
3ef0521
added option to reap all transactions from mempool
Feb 13, 2023
2a90334
reoved TestProgressiveAggregation, as the Sequencer Server test shoul…
Feb 14, 2023
b4598ef
signal for when mempool still has transactions after clearing it
Feb 16, 2023
b536949
moved moreTxsAvailable
Feb 16, 2023
05eaf2c
removed unnecessary logs
Feb 16, 2023
ced5da3
removed unnecessary channel argument
Feb 16, 2023
5399282
removed the timer field, replaced with a local timer
Feb 21, 2023
37a6fc3
remove some direct tx things
Feb 21, 2023
af1ff44
removed more direct tx things
Feb 21, 2023
2448735
test for lazy sequencer
Feb 21, 2023
15becf5
renamed Progressive to Lazy
Feb 21, 2023
241b457
removed more direct things
Feb 21, 2023
7cd1a0b
removed more direct
Feb 21, 2023
cd95c42
removed more direct tx stuff so it can be a separate pr
Feb 21, 2023
7a5efa2
fix lints
Feb 21, 2023
b855ced
Merge branch 'main' into connor/centralized-sequencer
Feb 21, 2023
bd4def1
fixed test
Feb 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 85 additions & 36 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type Manager struct {
// retrieveCond is used to notify sync goroutine (SyncLoop) that it needs to retrieve data
retrieveCond *sync.Cond

// whether or not the centralized sequencer is currently building a block
buildingBlock bool
txsAvailable <-chan struct{}
moreTxsAvailable chan struct{}
doneBuildingBlock chan struct{}

logger log.Logger
}

Expand All @@ -95,6 +101,7 @@ func NewManager(
dalc da.DataAvailabilityLayerClient,
eventBus *tmtypes.EventBus,
logger log.Logger,
doneBuildingCh chan struct{},
) (*Manager, error) {
s, err := getInitialState(store, genesis)
if err != nil {
Expand Down Expand Up @@ -127,6 +134,13 @@ func NewManager(
}
}

var txsAvailableChan <-chan struct{}
if mempool != nil {
txsAvailableChan = mempool.TxsAvailable()
} else {
txsAvailableChan = nil
}

agg := &Manager{
proposerKey: proposerKey,
conf: conf,
Expand All @@ -138,13 +152,17 @@ func NewManager(
retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP)
daHeight: s.DAHeight,
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
HeaderOutCh: make(chan *types.SignedHeader, 100),
HeaderInCh: make(chan *types.SignedHeader, 100),
blockInCh: make(chan newBlockEvent, 100),
FraudProofInCh: make(chan *abci.FraudProof, 100),
retrieveMtx: new(sync.Mutex),
syncCache: make(map[uint64]*types.Block),
logger: logger,
HeaderOutCh: make(chan *types.SignedHeader, 100),
HeaderInCh: make(chan *types.SignedHeader, 100),
blockInCh: make(chan newBlockEvent, 100),
FraudProofInCh: make(chan *abci.FraudProof, 100),
retrieveMtx: new(sync.Mutex),
syncCache: make(map[uint64]*types.Block),
buildingBlock: false,
txsAvailable: txsAvailableChan,
moreTxsAvailable: make(chan struct{}),
doneBuildingBlock: doneBuildingCh,
logger: logger,
}
agg.retrieveCond = sync.NewCond(agg.retrieveMtx)

Expand All @@ -170,36 +188,67 @@ func (m *Manager) GetFraudProofOutChan() chan *abci.FraudProof {
}

// AggregationLoop is responsible for aggregating transactions into rollup-blocks.
func (m *Manager) AggregationLoop(ctx context.Context) {
initialHeight := uint64(m.genesis.InitialHeight)
height := m.store.Height()
var delay time.Duration

// TODO(tzdybal): double-check when https://github.com/celestiaorg/rollmint/issues/699 is resolved
if height < initialHeight {
delay = time.Until(m.genesis.GenesisTime)
} else {
delay = time.Until(m.lastState.LastBlockTime.Add(m.conf.BlockTime))
}
func (m *Manager) AggregationLoop(ctx context.Context, ingress chan []byte, lazy bool) {
var timer time.Timer
if !lazy {
initialHeight := uint64(m.genesis.InitialHeight)
height := m.store.Height()
var delay time.Duration

// TODO(tzdybal): double-check when https://github.com/celestiaorg/rollmint/issues/699 is resolved
if height < initialHeight {
delay = time.Until(m.genesis.GenesisTime)
} else {
delay = time.Until(m.lastState.LastBlockTime.Add(m.conf.BlockTime))
}

if delay > 0 {
m.logger.Info("Waiting to produce block", "delay", delay)
time.Sleep(delay)
}
if delay > 0 {
m.logger.Info("Waiting to produce block", "delay", delay)
time.Sleep(delay)
}

timer := time.NewTimer(0)
timer = *time.NewTimer(0)

for {
select {
case <-ctx.Done():
return
case <-timer.C:
start := time.Now()
err := m.publishBlock(ctx)
if err != nil {
m.logger.Error("error while publishing block", "error", err)
for {
select {
case <-ctx.Done():
return
case <-timer.C:
start := time.Now()
err := m.publishBlock(ctx, false, m.moreTxsAvailable)
if err != nil {
m.logger.Error("error while publishing block", "error", err)
}
timer.Reset(m.getRemainingSleep(start))
}
}
} else {
for {
select {
case <-ctx.Done():
return
case <-m.txsAvailable:
m.logger.Debug("Txs available! Starting block building...")
if !m.buildingBlock {
Comment thread
gupadhyaya marked this conversation as resolved.
m.buildingBlock = true
timer = *time.NewTimer(1 * time.Second)
}
case <-m.moreTxsAvailable:
m.logger.Debug("Txs remained in mempool after building last block. Building another block.")
if !m.buildingBlock {
m.buildingBlock = true
timer = *time.NewTimer(1 * time.Second)
}
case <-timer.C:
m.logger.Debug("Block building time elapsed.")
m.buildingBlock = false
err := m.publishBlock(ctx, true, m.moreTxsAvailable)
if err != nil {
m.logger.Error("error while publishing block", "error", err)
}
close(m.doneBuildingBlock)
Comment thread
S1nus marked this conversation as resolved.
m.doneBuildingBlock = make(chan struct{})
}
timer.Reset(m.getRemainingSleep(start))
}
}
}
Expand Down Expand Up @@ -316,7 +365,7 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
if err != nil {
return fmt.Errorf("failed to save block: %w", err)
}
_, _, err = m.executor.Commit(ctx, newState, b1, responses)
_, _, err = m.executor.Commit(ctx, newState, b1, responses, nil)
if err != nil {
return fmt.Errorf("failed to Commit: %w", err)
}
Expand Down Expand Up @@ -446,7 +495,7 @@ func (m *Manager) getCommit(header types.Header) (*types.Commit, error) {
}, nil
}

func (m *Manager) publishBlock(ctx context.Context) error {
func (m *Manager) publishBlock(ctx context.Context, maxTx bool, moreTxsAvailable chan struct{}) error {
var lastCommit *types.Commit
var lastHeaderHash types.Hash
var err error
Expand Down Expand Up @@ -521,7 +570,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
}

// Commit the new state and block which writes to disk on the proxy app
_, _, err = m.executor.Commit(ctx, newState, block, responses)
_, _, err = m.executor.Commit(ctx, newState, block, responses, moreTxsAvailable)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func TestInitialState(t *testing.T) {
assert := assert.New(t)
logger := log.TestingLogger()
dalc := getMockDALC(logger)
agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, nil, logger)
dumbChan := make(chan struct{})
agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, nil, logger, dumbChan)
assert.NoError(err)
assert.NotNil(agg)
assert.Equal(c.expectedChainID, agg.lastState.ChainID)
Expand Down
9 changes: 8 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
flagNamespaceID = "rollkit.namespace_id"
flagFraudProofs = "rollkit.experimental_insecure_fraud_proofs"
flagLight = "rollkit.light"

flagLazyAggregator = "rollkit.lazy_aggregator"
)

// NodeConfig stores Rollkit node configuration.
Expand All @@ -30,11 +32,14 @@ type NodeConfig struct {
P2P P2PConfig
RPC RPCConfig
// parameters below are Rollkit specific and read from config
Aggregator bool `mapstructure:"aggregator"`
BlockManagerConfig `mapstructure:",squash"`
DALayer string `mapstructure:"da_layer"`
DAConfig string `mapstructure:"da_config"`
Light bool `mapstructure:"light"`

// Sequencer-specific settings
Aggregator bool `mapstructure:"aggregator"`
LazyAggregator bool `mapstructure:"lazy_aggregator"`
}

// BlockManagerConfig consists of all parameters required by BlockManagerConfig
Expand Down Expand Up @@ -62,6 +67,7 @@ func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error {
nsID := v.GetString(flagNamespaceID)
nc.FraudProofs = v.GetBool(flagFraudProofs)
nc.Light = v.GetBool(flagLight)
nc.LazyAggregator = v.GetBool(flagLazyAggregator)
bytes, err := hex.DecodeString(nsID)
if err != nil {
return err
Expand All @@ -84,4 +90,5 @@ func AddFlags(cmd *cobra.Command) {
cmd.Flags().BytesHex(flagNamespaceID, def.NamespaceID[:], "namespace identifies (8 bytes in hex)")
cmd.Flags().Bool(flagFraudProofs, def.FraudProofs, "enable fraud proofs (experimental & insecure)")
cmd.Flags().Bool(flagLight, def.Light, "run light client")
cmd.Flags().Bool(flagLazyAggregator, def.LazyAggregator, "progressive sequencer")
}
7 changes: 4 additions & 3 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ var DefaultNodeConfig = NodeConfig{
NamespaceID: types.NamespaceID{},
FraudProofs: false,
},
DALayer: "mock",
DAConfig: "",
Light: false,
DALayer: "mock",
DAConfig: "",
Light: false,
LazyAggregator: false,
}
52 changes: 31 additions & 21 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ type FullNode struct {
P2P *p2p.Client

// TODO(tzdybal): consider extracting "mempool reactor"
Mempool mempool.Mempool
mempoolIDs *mempoolIDs
incomingTxCh chan *p2p.GossipMessage
Mempool mempool.Mempool
mempoolIDs *mempoolIDs
incomingTxCh chan []byte
DoneBuildingBlock chan struct{}

Store store.Store
blockManager *block.Manager
Expand Down Expand Up @@ -138,31 +139,35 @@ func newFullNode(

mp := mempoolv1.NewTxMempool(logger, llcfg.DefaultMempoolConfig(), appClient, 0)
mpIDs := newMempoolIDs()
mp.EnableTxsAvailable()
Comment thread
S1nus marked this conversation as resolved.

blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, appClient, dalc, eventBus, logger.With("module", "BlockManager"))
doneBuildingChannel := make(chan struct{})

blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, appClient, dalc, eventBus, logger.With("module", "BlockManager"), doneBuildingChannel)
if err != nil {
return nil, fmt.Errorf("BlockManager initialization error: %w", err)
}

ctx, cancel := context.WithCancel(ctx)

node := &FullNode{
appClient: appClient,
eventBus: eventBus,
genesis: genesis,
conf: conf,
P2P: client,
blockManager: blockManager,
dalc: dalc,
Mempool: mp,
mempoolIDs: mpIDs,
incomingTxCh: make(chan *p2p.GossipMessage),
Store: s,
TxIndexer: txIndexer,
IndexerService: indexerService,
BlockIndexer: blockIndexer,
ctx: ctx,
cancel: cancel,
appClient: appClient,
eventBus: eventBus,
genesis: genesis,
conf: conf,
P2P: client,
blockManager: blockManager,
dalc: dalc,
Mempool: mp,
mempoolIDs: mpIDs,
incomingTxCh: make(chan []byte),
Store: s,
TxIndexer: txIndexer,
IndexerService: indexerService,
BlockIndexer: blockIndexer,
DoneBuildingBlock: doneBuildingChannel,
ctx: ctx,
cancel: cancel,
}

node.BaseService = *service.NewBaseService(logger, "Node", node)
Expand Down Expand Up @@ -255,7 +260,7 @@ func (n *FullNode) OnStart() error {
}
if n.conf.Aggregator {
n.Logger.Info("working in aggregator mode", "block time", n.conf.BlockTime)
go n.blockManager.AggregationLoop(n.ctx)
go n.blockManager.AggregationLoop(n.ctx, n.incomingTxCh, n.conf.LazyAggregator)
go n.headerPublishLoop(n.ctx)
}
go n.blockManager.RetrieveLoop(n.ctx)
Expand Down Expand Up @@ -313,6 +318,11 @@ func (n *FullNode) AppClient() abciclient.Client {
return n.appClient
}

type ResultDirectTx struct {
Included bool `json:"included"`
Height uint64 `json:"height"`
}

// newTxValidator creates a pubsub validator that uses the node's mempool to check the
// transaction. If the transaction is valid, then it is added to the mempool
func (n *FullNode) newTxValidator() p2p.GossipValidator {
Expand Down
Loading