Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,11 @@ func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) {
block := blockEvent.block
daHeight := blockEvent.daHeight
m.logger.Debug("block body retrieved from DALC",
"height", block.Header.Height(),
"height", block.SignedHeader.Header.Height(),
"daHeight", daHeight,
"hash", block.Hash(),
)
m.syncCache[block.Header.BaseHeader.Height] = block
m.syncCache[block.SignedHeader.Header.BaseHeader.Height] = block
m.retrieveCond.Signal()

err := m.trySyncNextBlock(ctx, daHeight)
Expand Down Expand Up @@ -346,7 +346,7 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
b2, ok2 := m.syncCache[currentHeight+2]
if ok2 {
m.logger.Debug("using last commit from next block")
commit = &b2.LastCommit
commit = &b2.SignedHeader.Commit
} else {
lastCommit := m.getLastCommit()
if lastCommit != nil && lastCommit.Height == currentHeight+1 {
Expand All @@ -356,7 +356,7 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
}

if b1 != nil && commit != nil {
m.logger.Info("Syncing block", "height", b1.Header.Height())
m.logger.Info("Syncing block", "height", b1.SignedHeader.Header.Height())
newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, b1)
if err != nil {
return fmt.Errorf("failed to ApplyBlock: %w", err)
Expand All @@ -369,9 +369,9 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
if err != nil {
return fmt.Errorf("failed to Commit: %w", err)
}
m.store.SetHeight(uint64(b1.Header.Height()))
m.store.SetHeight(uint64(b1.SignedHeader.Header.Height()))

err = m.store.SaveBlockResponses(uint64(b1.Header.Height()), responses)
err = m.store.SaveBlockResponses(uint64(b1.SignedHeader.Header.Height()), responses)
if err != nil {
return fmt.Errorf("failed to save block responses: %w", err)
}
Expand Down Expand Up @@ -514,7 +514,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
if err != nil {
return fmt.Errorf("error while loading last block: %w", err)
}
lastHeaderHash = lastBlock.Header.Hash()
lastHeaderHash = lastBlock.SignedHeader.Header.Hash()
}

var block *types.Block
Expand All @@ -531,11 +531,14 @@ func (m *Manager) publishBlock(ctx context.Context) error {
block = m.executor.CreateBlock(newHeight, lastCommit, lastHeaderHash, m.lastState)
m.logger.Debug("block info", "num_tx", len(block.Data.Txs))

commit, err = m.getCommit(block.Header)
commit, err = m.getCommit(block.SignedHeader.Header)
if err != nil {
return err
}

// set the commit to current block's signed header
block.SignedHeader.Commit = *commit

// SaveBlock commits the DB tx
err = m.store.SaveBlock(block, commit)
if err != nil {
Expand All @@ -550,7 +553,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
}

if commit == nil {
commit, err = m.getCommit(block.Header)
commit, err = m.getCommit(block.SignedHeader.Header)
if err != nil {
return err
}
Expand All @@ -575,7 +578,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
}

// SaveBlockResponses commits the DB tx
err = m.store.SaveBlockResponses(uint64(block.Header.Height()), responses)
err = m.store.SaveBlockResponses(uint64(block.SignedHeader.Header.Height()), responses)
if err != nil {
return err
}
Expand All @@ -591,28 +594,28 @@ func (m *Manager) publishBlock(ctx context.Context) error {
}

// SaveValidators commits the DB tx
err = m.store.SaveValidators(uint64(block.Header.Height()), m.lastState.Validators)
err = m.store.SaveValidators(uint64(block.SignedHeader.Header.Height()), m.lastState.Validators)
if err != nil {
return err
}

// Only update the stored height after successfully submitting to DA layer and committing to the DB
m.store.SetHeight(uint64(block.Header.Height()))
m.store.SetHeight(uint64(block.SignedHeader.Header.Height()))

m.publishSignedHeader(block, commit)

return nil
}

func (m *Manager) submitBlockToDA(ctx context.Context, block *types.Block) error {
m.logger.Info("submitting block to DA layer", "height", block.Header.Height())
m.logger.Info("submitting block to DA layer", "height", block.SignedHeader.Header.Height())

submitted := false
backoff := initialBackoff
for attempt := 1; ctx.Err() == nil && !submitted && attempt <= maxSubmitAttempts; attempt++ {
res := m.dalc.SubmitBlock(ctx, block)
if res.Code == da.StatusSuccess {
m.logger.Info("successfully submitted Rollkit block to DA layer", "rollkitHeight", block.Header.Height(), "daHeight", res.DAHeight)
m.logger.Info("successfully submitted Rollkit block to DA layer", "rollkitHeight", block.SignedHeader.Header.Height(), "daHeight", res.DAHeight)
submitted = true
} else {
m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt)
Expand All @@ -638,7 +641,7 @@ func (m *Manager) exponentialBackoff(backoff time.Duration) time.Duration {

// TODO(tzdybal): consider inlining
func (m *Manager) publishSignedHeader(block *types.Block, commit *types.Commit) {
m.HeaderOutCh <- &types.SignedHeader{Header: block.Header, Commit: *commit}
m.HeaderOutCh <- &types.SignedHeader{Header: block.SignedHeader.Header, Commit: *commit}
}

func updateState(s *types.State, res *abci.ResponseInitChain) {
Expand Down
8 changes: 4 additions & 4 deletions conv/abci/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ func ToABCIHeader(header *types.Header) (tmtypes.Header, error) {
// ToABCIBlock converts Rolkit block into block format defined by ABCI.
// Returned block should pass `ValidateBasic`.
func ToABCIBlock(block *types.Block) (*tmtypes.Block, error) {
abciHeader, err := ToABCIHeader(&block.Header)
abciHeader, err := ToABCIHeader(&block.SignedHeader.Header)
if err != nil {
return nil, err
}
abciCommit := ToABCICommit(&block.LastCommit)
abciCommit := ToABCICommit(&block.SignedHeader.Commit)
// This assumes that we have only one signature
if len(abciCommit.Signatures) == 1 {
abciCommit.Signatures[0].ValidatorAddress = block.Header.ProposerAddress
abciCommit.Signatures[0].ValidatorAddress = block.SignedHeader.Header.ProposerAddress
}
abciBlock := tmtypes.Block{
Header: abciHeader,
Expand All @@ -92,7 +92,7 @@ func ToABCIBlock(block *types.Block) (*tmtypes.Block, error) {
for i := range block.Data.Txs {
abciBlock.Data.Txs[i] = tmtypes.Tx(block.Data.Txs[i])
}
abciBlock.Header.DataHash = tmbytes.HexBytes(block.Header.DataHash)
abciBlock.Header.DataHash = tmbytes.HexBytes(block.SignedHeader.Header.DataHash)

return &abciBlock, nil
}
Expand Down
6 changes: 3 additions & 3 deletions da/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ func (m *DataAvailabilityLayerClient) Stop() error {
// triggers a state transition in the DA layer.
func (m *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *types.Block) da.ResultSubmitBlock {
daHeight := atomic.LoadUint64(&m.daHeight)
m.logger.Debug("Submitting block to DA layer!", "height", block.Header.Height(), "dataLayerHeight", daHeight)
m.logger.Debug("Submitting block to DA layer!", "height", block.SignedHeader.Header.Height(), "dataLayerHeight", daHeight)

hash := block.Header.Hash()
hash := block.SignedHeader.Header.Hash()
blob, err := block.MarshalBinary()
if err != nil {
return da.ResultSubmitBlock{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}

err = m.dalcKV.Put(ctx, getKey(daHeight, uint64(block.Header.Height())), hash[:])
err = m.dalcKV.Put(ctx, getKey(daHeight, uint64(block.SignedHeader.Header.Height())), hash[:])
if err != nil {
return da.ResultSubmitBlock{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}
Expand Down
15 changes: 8 additions & 7 deletions da/test/da_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,20 +240,21 @@ func doTestRetrieve(t *testing.T, dalc da.DataAvailabilityLayerClient) {
// copy-pasted from store/store_test.go
func getRandomBlock(height uint64, nTxs int) *types.Block {
block := &types.Block{
Header: types.Header{
BaseHeader: types.BaseHeader{
Height: height,
},
AggregatorsHash: make([]byte, 32),
},
SignedHeader: types.SignedHeader{
Header: types.Header{
BaseHeader: types.BaseHeader{
Height: height,
},
AggregatorsHash: make([]byte, 32),
}},
Data: types.Data{
Txs: make(types.Txs, nTxs),
IntermediateStateRoots: types.IntermediateStateRoots{
RawRootsList: make([][]byte, nTxs),
},
},
}
block.Header.AppHash = getRandomBytes(32)
block.SignedHeader.Header.AppHash = getRandomBytes(32)

for i := 0; i < nTxs; i++ {
block.Data.Txs[i] = getRandomTx()
Expand Down
20 changes: 10 additions & 10 deletions node/full_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,11 +703,11 @@ func (c *FullClient) Status(ctx context.Context) (*ctypes.ResultStatus, error) {
return nil, fmt.Errorf("failed to find earliest block: %w", err)
}

validators, err := c.node.Store.LoadValidators(uint64(latest.Header.Height()))
validators, err := c.node.Store.LoadValidators(uint64(latest.SignedHeader.Header.Height()))
if err != nil {
return nil, fmt.Errorf("failed to fetch the validator info at latest block: %w", err)
}
_, validator := validators.GetByAddress(latest.Header.ProposerAddress)
_, validator := validators.GetByAddress(latest.SignedHeader.Header.ProposerAddress)

state, err := c.node.Store.LoadState()
if err != nil {
Expand Down Expand Up @@ -735,14 +735,14 @@ func (c *FullClient) Status(ctx context.Context) (*ctypes.ResultStatus, error) {
},
},
SyncInfo: ctypes.SyncInfo{
LatestBlockHash: tmbytes.HexBytes(latest.Header.DataHash),
LatestAppHash: tmbytes.HexBytes(latest.Header.AppHash),
LatestBlockHeight: latest.Header.Height(),
LatestBlockTime: latest.Header.Time(),
EarliestBlockHash: tmbytes.HexBytes(initial.Header.DataHash),
EarliestAppHash: tmbytes.HexBytes(initial.Header.AppHash),
EarliestBlockHeight: initial.Header.Height(),
EarliestBlockTime: initial.Header.Time(),
LatestBlockHash: tmbytes.HexBytes(latest.SignedHeader.Header.DataHash),
LatestAppHash: tmbytes.HexBytes(latest.SignedHeader.Header.AppHash),
LatestBlockHeight: latest.SignedHeader.Header.Height(),
LatestBlockTime: latest.SignedHeader.Header.Time(),
EarliestBlockHash: tmbytes.HexBytes(initial.SignedHeader.Header.DataHash),
EarliestAppHash: tmbytes.HexBytes(initial.SignedHeader.Header.AppHash),
EarliestBlockHeight: initial.SignedHeader.Header.Height(),
EarliestBlockTime: initial.SignedHeader.Header.Time(),
CatchingUp: true, // the client is always syncing in the background to the latest height
},
ValidatorInfo: ctypes.ValidatorInfo{
Expand Down
51 changes: 26 additions & 25 deletions node/full_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func TestGetBlock(t *testing.T) {

block := getRandomBlock(1, 10)
err = rpc.node.Store.SaveBlock(block, &types.Commit{})
rpc.node.Store.SetHeight(uint64(block.Header.Height()))
rpc.node.Store.SetHeight(uint64(block.SignedHeader.Header.Height()))
require.NoError(err)

blockResp, err := rpc.Block(context.Background(), nil)
Expand All @@ -273,25 +273,25 @@ func TestGetCommit(t *testing.T) {
require.NoError(err)

for _, b := range blocks {
err = rpc.node.Store.SaveBlock(b, &types.Commit{Height: uint64(b.Header.Height())})
rpc.node.Store.SetHeight(uint64(b.Header.Height()))
err = rpc.node.Store.SaveBlock(b, &types.Commit{Height: uint64(b.SignedHeader.Header.Height())})
rpc.node.Store.SetHeight(uint64(b.SignedHeader.Header.Height()))
require.NoError(err)
}
t.Run("Fetch all commits", func(t *testing.T) {
for _, b := range blocks {
h := b.Header.Height()
h := b.SignedHeader.Header.Height()
commit, err := rpc.Commit(context.Background(), &h)
require.NoError(err)
require.NotNil(commit)
assert.Equal(b.Header.Height(), commit.Height)
assert.Equal(b.SignedHeader.Header.Height(), commit.Height)
}
})

t.Run("Fetch commit for nil height", func(t *testing.T) {
commit, err := rpc.Commit(context.Background(), nil)
require.NoError(err)
require.NotNil(commit)
assert.Equal(blocks[3].Header.Height(), commit.Height)
assert.Equal(blocks[3].SignedHeader.Header.Height(), commit.Height)
})

err = rpc.node.Stop()
Expand All @@ -310,7 +310,7 @@ func TestBlockSearch(t *testing.T) {
block := getRandomBlock(uint64(h), 5)
err := rpc.node.Store.SaveBlock(block, &types.Commit{
Height: uint64(h),
HeaderHash: block.Header.Hash(),
HeaderHash: block.SignedHeader.Header.Hash(),
})
require.NoError(err)
}
Expand Down Expand Up @@ -377,14 +377,14 @@ func TestGetBlockByHash(t *testing.T) {
abciBlock, err := abciconv.ToABCIBlock(block)
require.NoError(err)

height := block.Header.Height()
height := block.SignedHeader.Header.Height()
retrievedBlock, err := rpc.Block(context.Background(), &height)
require.NoError(err)
require.NotNil(retrievedBlock)
assert.Equal(abciBlock, retrievedBlock.Block)
assert.Equal(abciBlock.Hash(), retrievedBlock.Block.Hash())

blockHash := block.Header.Hash()
blockHash := block.SignedHeader.Header.Hash()
blockResp, err := rpc.BlockByHash(context.Background(), blockHash[:])
require.NoError(err)
require.NotNil(blockResp)
Expand Down Expand Up @@ -564,9 +564,9 @@ func TestBlockchainInfo(t *testing.T) {
block := getRandomBlock(uint64(h), 5)
err := rpc.node.Store.SaveBlock(block, &types.Commit{
Height: uint64(h),
HeaderHash: block.Header.Hash(),
HeaderHash: block.SignedHeader.Header.Hash(),
})
rpc.node.Store.SetHeight(uint64(block.Header.Height()))
rpc.node.Store.SetHeight(uint64(block.SignedHeader.Header.Height()))
require.NoError(err)
}

Expand Down Expand Up @@ -719,22 +719,23 @@ func getRandomBlock(height uint64, nTxs int) *types.Block {

func getRandomBlockWithProposer(height uint64, nTxs int, proposerAddr []byte) *types.Block {
block := &types.Block{
Header: types.Header{
BaseHeader: types.BaseHeader{
Height: height,
},
Version: types.Version{Block: types.InitStateVersion.Consensus.Block},
ProposerAddress: proposerAddr,
AggregatorsHash: make([]byte, 32),
},
SignedHeader: types.SignedHeader{
Header: types.Header{
BaseHeader: types.BaseHeader{
Height: height,
},
Version: types.Version{Block: types.InitStateVersion.Consensus.Block},
ProposerAddress: proposerAddr,
AggregatorsHash: make([]byte, 32),
}},
Data: types.Data{
Txs: make(types.Txs, nTxs),
IntermediateStateRoots: types.IntermediateStateRoots{
RawRootsList: make([][]byte, nTxs),
},
},
}
block.Header.AppHash = getRandomBytes(32)
block.SignedHeader.Header.AppHash = getRandomBytes(32)

for i := 0; i < nTxs; i++ {
block.Data.Txs[i] = getRandomTx()
Expand All @@ -753,7 +754,7 @@ func getRandomBlockWithProposer(height uint64, nTxs int, proposerAddr []byte) *t
}
lastCommitHash := make(types.Hash, 32)
copy(lastCommitHash, tmprotoLC.Hash().Bytes())
block.Header.LastCommitHash = lastCommitHash
block.SignedHeader.Header.LastCommitHash = lastCommitHash

return block
}
Expand Down Expand Up @@ -987,13 +988,13 @@ func TestStatus(t *testing.T) {
assert.NotNil(rpc)

earliestBlock := getRandomBlockWithProposer(1, 1, validators[0].Address.Bytes())
err = rpc.node.Store.SaveBlock(earliestBlock, &types.Commit{Height: uint64(earliestBlock.Header.Height())})
rpc.node.Store.SetHeight(uint64(earliestBlock.Header.Height()))
err = rpc.node.Store.SaveBlock(earliestBlock, &types.Commit{Height: uint64(earliestBlock.SignedHeader.Header.Height())})
rpc.node.Store.SetHeight(uint64(earliestBlock.SignedHeader.Header.Height()))
require.NoError(err)

latestBlock := getRandomBlockWithProposer(2, 1, validators[1].Address.Bytes())
err = rpc.node.Store.SaveBlock(latestBlock, &types.Commit{Height: uint64(latestBlock.Header.Height())})
rpc.node.Store.SetHeight(uint64(latestBlock.Header.Height()))
err = rpc.node.Store.SaveBlock(latestBlock, &types.Commit{Height: uint64(latestBlock.SignedHeader.Header.Height())})
rpc.node.Store.SetHeight(uint64(latestBlock.SignedHeader.Header.Height()))
require.NoError(err)

err = node.Start()
Expand Down
Loading