Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 93 additions & 58 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ type Manager struct {
// daHeight is the height of the latest processed DA block
daHeight uint64

HeaderOutCh chan *types.Header
HeaderInCh chan *types.Header
HeaderOutCh chan *types.SignedHeader
HeaderInCh chan *types.SignedHeader

CommitInCh chan *types.Commit
CommitOutCh chan *types.Commit
lastCommit *types.Commit
CommitInCh chan *types.Commit
lastCommit atomic.Value

syncTarget uint64
blockInCh chan newBlockEvent
Expand Down Expand Up @@ -138,10 +137,9 @@ func NewManager(
retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP)
daHeight: s.DAHeight,
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
HeaderOutCh: make(chan *types.Header, 100),
HeaderInCh: make(chan *types.Header, 100),
HeaderOutCh: make(chan *types.SignedHeader, 100),
HeaderInCh: make(chan *types.SignedHeader, 100),
CommitInCh: make(chan *types.Commit, 100),
CommitOutCh: make(chan *types.Commit, 100),
blockInCh: make(chan newBlockEvent, 100),
retrieveMtx: new(sync.Mutex),
syncCache: make(map[uint64]*types.Block),
Expand Down Expand Up @@ -195,8 +193,8 @@ func (m *Manager) SyncLoop(ctx context.Context) {
case <-daTicker.C:
m.retrieveCond.Signal()
case header := <-m.HeaderInCh:
m.logger.Debug("block header received", "height", header.Height, "hash", header.Hash())
newHeight := header.Height
m.logger.Debug("block header received", "height", header.Header.Height, "hash", header.Header.Hash())
newHeight := header.Header.Height
currentHeight := m.store.Height()
// in case of client reconnecting after being offline
// newHeight may be significantly larger than currentHeight
Expand All @@ -205,9 +203,16 @@ func (m *Manager) SyncLoop(ctx context.Context) {
atomic.StoreUint64(&m.syncTarget, newHeight)
m.retrieveCond.Signal()
}
m.CommitInCh <- &header.Commit
case commit := <-m.CommitInCh:
// TODO(tzdybal): check if it's from right aggregator
m.lastCommit = commit
m.lastCommit.Store(commit)
err := m.trySyncNextBlock(ctx, 0)
Comment thread
Wondertan marked this conversation as resolved.
if err != nil {
m.logger.Info("failed to sync next block", "error", err)
} else {
m.logger.Debug("synced using gossiped commit", "height", commit.Height)
}
case blockEvent := <-m.blockInCh:
block := blockEvent.block
daHeight := blockEvent.daHeight
Expand All @@ -218,49 +223,86 @@ func (m *Manager) SyncLoop(ctx context.Context) {
)
m.syncCache[block.Header.Height] = block
m.retrieveCond.Signal()
currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory
b1, ok1 := m.syncCache[currentHeight+1]
b2, ok2 := m.syncCache[currentHeight+2]
if ok1 && ok2 {
m.logger.Info("Syncing block", "height", b1.Header.Height)
newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, b1)
if err != nil {
m.logger.Error("failed to ApplyBlock", "error", err)
continue
}
err = m.store.SaveBlock(b1, &b2.LastCommit)
if err != nil {
m.logger.Error("failed to save block", "error", err)
continue
}
_, _, err = m.executor.Commit(ctx, newState, b1, responses)
if err != nil {
m.logger.Error("failed to Commit", "error", err)
continue
}
m.store.SetHeight(b1.Header.Height)

err = m.store.SaveBlockResponses(b1.Header.Height, responses)
if err != nil {
m.logger.Error("failed to save block responses", "error", err)
continue
}

newState.DAHeight = daHeight
m.lastState = newState
err = m.store.UpdateState(m.lastState)
if err != nil {
m.logger.Error("failed to save updated state", "error", err)
continue
}
delete(m.syncCache, currentHeight+1)
err := m.trySyncNextBlock(ctx, daHeight)
if err != nil {
m.logger.Info("failed to sync next block", "error", err)
}
case <-ctx.Done():
return
}
}
}

// trySyncNextBlock tries to progress one step (one block) in sync process.
//
// To be able to apply block and height h, we need to have its Commit. It is contained in block at height h+1.
// If block at height h+1 is not available, value of last gossiped commit is checked.
// If commit for block h is available, we proceed with sync process, and remove synced block from sync cache.
Comment on lines +237 to +241
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏🏻

func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
Comment thread
Wondertan marked this conversation as resolved.
var b1 *types.Block
var commit *types.Commit
currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory

b1, ok1 := m.syncCache[currentHeight+1]
if !ok1 {
return nil
}
b2, ok2 := m.syncCache[currentHeight+2]
if ok2 {
m.logger.Debug("using last commit from next block")
commit = &b2.LastCommit
} else {
lastCommit := m.getLastCommit()
if lastCommit != nil && lastCommit.Height == currentHeight+1 {
m.logger.Debug("using gossiped commit")
commit = lastCommit
}
}

if b1 != nil && commit != nil {
m.logger.Info("Syncing block", "height", b1.Header.Height)
newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, b1)
if err != nil {
return fmt.Errorf("failed to ApplyBlock: %w", err)
}
err = m.store.SaveBlock(b1, commit)
if err != nil {
return fmt.Errorf("failed to save block: %w", err)
}
_, _, err = m.executor.Commit(ctx, newState, b1, responses)
if err != nil {
return fmt.Errorf("failed to Commit: %w", err)
}
m.store.SetHeight(b1.Header.Height)

err = m.store.SaveBlockResponses(b1.Header.Height, responses)
if err != nil {
return fmt.Errorf("failed to save block responses: %w", err)
}

if daHeight > newState.DAHeight {
newState.DAHeight = daHeight
}
m.lastState = newState
err = m.store.UpdateState(m.lastState)
if err != nil {
m.logger.Error("failed to save updated state", "error", err)
}
delete(m.syncCache, currentHeight+1)
}

return nil
}

func (m *Manager) getLastCommit() *types.Commit {
ptr := m.lastCommit.Load()
if ptr == nil {
return nil
}
return ptr.(*types.Commit)
}

// RetrieveLoop is responsible for interacting with DA layer.
func (m *Manager) RetrieveLoop(ctx context.Context) {
// waitCh is used to signal the retrieve loop, that it should process next blocks
Expand Down Expand Up @@ -365,6 +407,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
}

var block *types.Block
var commit *types.Commit

// Check if there's an already stored block at a newer height
// If there is use that instead of creating a new block
Expand All @@ -385,7 +428,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
if err != nil {
return err
}
commit := &types.Commit{
commit = &types.Commit{
Height: block.Header.Height,
HeaderHash: block.Header.Hash(),
Signatures: []types.Signature{sign},
Expand Down Expand Up @@ -441,8 +484,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
// Only update the stored height after successfully submitting to DA layer and committing to the DB
m.store.SetHeight(block.Header.Height)

m.publishHeader(block)
m.publishCommit(lastCommit)
m.publishSignedHeader(block, commit)

return nil
}
Expand All @@ -468,8 +510,6 @@ func (m *Manager) submitBlockToDA(ctx context.Context, block *types.Block) error
return fmt.Errorf("Failed to submit block to DA layer after %d attempts", maxSubmitAttempts)
}

m.HeaderOutCh <- &block.Header

return nil
}

Expand All @@ -482,13 +522,8 @@ func (m *Manager) exponentialBackoff(backoff time.Duration) time.Duration {
}

// TODO(tzdybal): consider inlining
func (m *Manager) publishHeader(block *types.Block) {
m.HeaderOutCh <- &block.Header
}

// TODO(tzdybal): consider inlining
func (m *Manager) publishCommit(commit *types.Commit) {
m.CommitOutCh <- commit
func (m *Manager) publishSignedHeader(block *types.Block, commit *types.Commit) {
m.HeaderOutCh <- &types.SignedHeader{Header: block.Header, Commit: *commit}
}
Comment thread
Wondertan marked this conversation as resolved.

func updateState(s *types.State, res *abci.ResponseInitChain) {
Expand Down
7 changes: 4 additions & 3 deletions node/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func TestTxGossipingAndAggregation(t *testing.T) {
for _, n := range nodes {
require.NoError(n.Stop())
}
time.Sleep(100 * time.Millisecond)
aggApp := apps[0]
apps = apps[1:]

Expand Down Expand Up @@ -150,10 +151,10 @@ func TestTxGossipingAndAggregation(t *testing.T) {

// assert that all blocks known to node are same as produced by aggregator
for h := uint64(1); h <= nodes[i].Store.Height(); h++ {
nodeBlock, err := nodes[i].Store.LoadBlock(h)
require.NoError(err)
aggBlock, err := nodes[0].Store.LoadBlock(h)
require.NoError(err)
nodeBlock, err := nodes[i].Store.LoadBlock(h)
require.NoError(err)
assert.Equal(aggBlock, nodeBlock)
}
}
Expand Down Expand Up @@ -192,7 +193,7 @@ func createNode(n int, aggregator bool, dalc da.DataAvailabilityLayerClient, key
ListenAddress: "/ip4/127.0.0.1/tcp/" + strconv.Itoa(startPort+n),
}
bmConfig := config.BlockManagerConfig{
BlockTime: 1 * time.Second,
BlockTime: 300 * time.Millisecond,
NamespaceID: [8]byte{8, 7, 6, 5, 4, 3, 2, 1},
}
for i := 0; i < len(keys); i++ {
Expand Down
13 changes: 7 additions & 6 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,14 @@ func (n *Node) initGenesisChunks() error {
func (n *Node) headerPublishLoop(ctx context.Context) {
for {
select {
case header := <-n.blockManager.HeaderOutCh:
headerBytes, err := header.MarshalBinary()
case signedHeader := <-n.blockManager.HeaderOutCh:
headerBytes, err := signedHeader.MarshalBinary()
if err != nil {
n.Logger.Error("failed to serialize block header", "error", err)
n.Logger.Error("failed to serialize signed block header", "error", err)
}
err = n.P2P.GossipHeader(ctx, headerBytes)
err = n.P2P.GossipSignedHeader(ctx, headerBytes)
if err != nil {
n.Logger.Error("failed to gossip block header", "error", err)
n.Logger.Error("failed to gossip signed block header", "error", err)
}
case <-ctx.Done():
return
Expand Down Expand Up @@ -309,7 +309,7 @@ func (n *Node) newTxValidator() p2p.GossipValidator {
func (n *Node) newHeaderValidator() p2p.GossipValidator {
return func(headerMsg *p2p.GossipMessage) bool {
n.Logger.Debug("header received", "from", headerMsg.From, "bytes", len(headerMsg.Data))
var header types.Header
var header types.SignedHeader
err := header.UnmarshalBinary(headerMsg.Data)
if err != nil {
n.Logger.Error("failed to deserialize header", "error", err)
Expand Down Expand Up @@ -341,6 +341,7 @@ func (n *Node) newCommitValidator() p2p.GossipValidator {
n.Logger.Error("failed to validate commit", "error", err)
return false
}
n.Logger.Debug("commit received", "height", commit.Height)
n.blockManager.CommitInCh <- &commit
return true
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ func (c *Client) SetTxValidator(val GossipValidator) {
c.txValidator = val
}

// GossipHeader sends the block header to the P2P network.
func (c *Client) GossipHeader(ctx context.Context, headerBytes []byte) error {
// GossipSignedHeader sends the block header to the P2P network.
func (c *Client) GossipSignedHeader(ctx context.Context, headerBytes []byte) error {
c.logger.Debug("Gossiping block header", "len", len(headerBytes))
return c.headerGossiper.Publish(ctx, headerBytes)
}
Expand Down
5 changes: 5 additions & 0 deletions proto/optimint/optimint.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ message Commit {
repeated bytes signatures = 3;
}

message SignedHeader {
Header header = 1;
Commit commit = 2;
}

message Data {
repeated bytes txs = 1;
repeated bytes intermediate_state_roots = 2;
Expand Down
10 changes: 9 additions & 1 deletion types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,21 @@ type EvidenceData struct {
Evidence []Evidence
}

// Commit cointains evidence of block creation.
// Commit contains evidence of block creation.
type Commit struct {
Height uint64
HeaderHash [32]byte
Signatures []Signature // most of the time this is a single signature
}

// SignedHeader combines Header and its Commit.
//
// Used mostly for gossiping.
type SignedHeader struct {
Header Header
Commit Commit
}

// Signature represents signature of block creator.
type Signature []byte

Expand Down
6 changes: 3 additions & 3 deletions types/pb/dalc/dalc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading