Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 19 additions & 66 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,12 @@ type Manager struct {
// daHeight is the height of the latest processed DA block
daHeight uint64

HeaderOutCh chan *types.SignedHeader
HeaderInCh chan *types.SignedHeader
SyncedHeadersCh chan *types.SignedHeader

lastCommit atomic.Value
HeaderCh chan *types.SignedHeader

FraudProofInCh chan *abci.FraudProof

syncTarget uint64
blockInCh chan newBlockEvent
syncCache map[uint64]*types.Block
blockInCh chan newBlockEvent
syncCache map[uint64]*types.Block

// retrieveMtx is used by retrieveCond
retrieveMtx *sync.Mutex
Expand Down Expand Up @@ -152,9 +147,7 @@ func NewManager(
retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP)
daHeight: s.DAHeight,
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
HeaderOutCh: make(chan *types.SignedHeader, 100),
HeaderInCh: make(chan *types.SignedHeader, 100),
SyncedHeadersCh: make(chan *types.SignedHeader, 100),
HeaderCh: make(chan *types.SignedHeader, 100),
blockInCh: make(chan newBlockEvent, 100),
FraudProofInCh: make(chan *abci.FraudProof, 100),
retrieveMtx: new(sync.Mutex),
Expand Down Expand Up @@ -262,27 +255,6 @@ func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) {
select {
case <-daTicker.C:
m.retrieveCond.Signal()
case header := <-m.HeaderInCh:
m.logger.Debug("block header received", "height", header.Header.Height(), "hash", header.Header.Hash())
m.SyncedHeadersCh <- header
newHeight := header.Header.BaseHeader.Height
currentHeight := m.store.Height()
// in case of client reconnecting after being offline
// newHeight may be significantly larger than currentHeight
// it's handled gently in RetrieveLoop
if newHeight > currentHeight {
atomic.StoreUint64(&m.syncTarget, newHeight)
m.retrieveCond.Signal()
}
commit := &header.Commit
// TODO(tzdybal): check if it's from right aggregator
m.lastCommit.Store(commit)
err := m.trySyncNextBlock(ctx, 0)
if err != nil {
m.logger.Info("failed to sync next block", "error", err)
} else {
m.logger.Debug("synced using signed header", "height", commit.Height)
}
case blockEvent := <-m.blockInCh:
block := blockEvent.block
daHeight := blockEvent.daHeight
Expand Down Expand Up @@ -335,43 +307,36 @@ func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) {
// If block at height h+1 is not available, value of last gossiped commit is checked.
// If commit for block h is available, we proceed with sync process, and remove synced block from sync cache.
func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
var b1 *types.Block
var commit *types.Commit
currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory

b1, ok1 := m.syncCache[currentHeight+1]
if !ok1 {
b, ok := m.syncCache[currentHeight+1]
if !ok {
return nil
}
b2, ok2 := m.syncCache[currentHeight+2]
if ok2 {
m.logger.Debug("using last commit from next block")
commit = &b2.SignedHeader.Commit
} else {
lastCommit := m.getLastCommit()
if lastCommit != nil && lastCommit.Height == currentHeight+1 {
m.logger.Debug("using gossiped commit")
commit = lastCommit
}

signedHeader := &b.SignedHeader
if signedHeader != nil {
commit = &b.SignedHeader.Commit
}

if b1 != nil && commit != nil {
m.logger.Info("Syncing block", "height", b1.SignedHeader.Header.Height())
newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, b1)
if b != nil && commit != nil {
m.logger.Info("Syncing block", "height", b.SignedHeader.Header.Height())
newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, b)
if err != nil {
return fmt.Errorf("failed to ApplyBlock: %w", err)
}
err = m.store.SaveBlock(b1, commit)
err = m.store.SaveBlock(b, commit)
if err != nil {
return fmt.Errorf("failed to save block: %w", err)
}
_, _, err = m.executor.Commit(ctx, newState, b1, responses)
_, _, err = m.executor.Commit(ctx, newState, b, responses)
if err != nil {
return fmt.Errorf("failed to Commit: %w", err)
}
m.store.SetHeight(uint64(b1.SignedHeader.Header.Height()))
m.store.SetHeight(uint64(b.SignedHeader.Header.Height()))

err = m.store.SaveBlockResponses(uint64(b1.SignedHeader.Header.Height()), responses)
err = m.store.SaveBlockResponses(uint64(b.SignedHeader.Header.Height()), responses)
if err != nil {
return fmt.Errorf("failed to save block responses: %w", err)
}
Expand All @@ -390,14 +355,6 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
return nil
}

func (m *Manager) getLastCommit() *types.Commit {
ptr := m.lastCommit.Load()
if ptr == nil {
return nil
}
return ptr.(*types.Commit)
}

// RetrieveLoop is responsible for interacting with DA layer.
func (m *Manager) RetrieveLoop(ctx context.Context) {
// waitCh is used to signal the retrieve loop, that it should process next blocks
Expand Down Expand Up @@ -602,7 +559,8 @@ func (m *Manager) publishBlock(ctx context.Context) error {
// Only update the stored height after successfully submitting to DA layer and committing to the DB
m.store.SetHeight(uint64(block.SignedHeader.Header.Height()))

m.publishSignedHeader(block, commit)
// Publish header to channel so that header exchange service can broadcast
m.HeaderCh <- &block.SignedHeader

return nil
}
Expand Down Expand Up @@ -639,11 +597,6 @@ func (m *Manager) exponentialBackoff(backoff time.Duration) time.Duration {
return backoff
}

// TODO(tzdybal): consider inlining
func (m *Manager) publishSignedHeader(block *types.Block, commit *types.Commit) {
m.HeaderOutCh <- &types.SignedHeader{Header: block.SignedHeader.Header, Commit: *commit}
}

func updateState(s *types.State, res *abci.ResponseInitChain) {
// If the app did not return an app hash, we keep the one set from the genesis doc in
// the state. We don't set appHash since we don't want the genesis doc app hash
Expand Down
35 changes: 2 additions & 33 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/rollkit/rollkit/state/txindex"
"github.com/rollkit/rollkit/state/txindex/kv"
"github.com/rollkit/rollkit/store"
"github.com/rollkit/rollkit/types"
)

// prefixes used in KV store to separate main node data from DALC data
Expand Down Expand Up @@ -151,7 +150,7 @@ func newFullNode(
return nil, fmt.Errorf("BlockManager initialization error: %w", err)
}

headerExchangeService, err := NewHeaderExchangeService(ctx, mainKV, conf, genesis, client, blockManager.SyncedHeadersCh, logger.With("module", "HeaderExchangeService"))
headerExchangeService, err := NewHeaderExchangeService(ctx, mainKV, conf, genesis, client, logger.With("module", "HeaderExchangeService"))
if err != nil {
return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err)
}
Expand Down Expand Up @@ -182,7 +181,6 @@ func newFullNode(
node.BaseService = *service.NewBaseService(logger, "Node", node)

node.P2P.SetTxValidator(node.newTxValidator())
node.P2P.SetHeaderValidator(node.newHeaderValidator())
node.P2P.SetFraudProofValidator(node.newFraudProofValidator())

return node, nil
Expand Down Expand Up @@ -220,16 +218,8 @@ func (n *FullNode) initGenesisChunks() error {
func (n *FullNode) headerPublishLoop(ctx context.Context) {
for {
select {
case signedHeader := <-n.blockManager.HeaderOutCh:
case signedHeader := <-n.blockManager.HeaderCh:
n.hExService.writeToHeaderStoreAndBroadcast(ctx, signedHeader)
headerBytes, err := signedHeader.MarshalBinary()
if err != nil {
n.Logger.Error("failed to serialize signed block header", "error", err)
}
err = n.P2P.GossipSignedHeader(ctx, headerBytes)
if err != nil {
n.Logger.Error("failed to gossip signed block header", "error", err)
}
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -363,27 +353,6 @@ func (n *FullNode) newTxValidator() p2p.GossipValidator {
}
}

// newHeaderValidator returns a pubsub validator that runs basic checks and forwards
// the deserialized header for further processing
func (n *FullNode) newHeaderValidator() p2p.GossipValidator {
return func(headerMsg *p2p.GossipMessage) bool {
n.Logger.Debug("header received", "from", headerMsg.From, "bytes", len(headerMsg.Data))
var header types.SignedHeader
err := header.UnmarshalBinary(headerMsg.Data)
if err != nil {
n.Logger.Error("failed to deserialize header", "error", err)
return false
}
err = header.ValidateBasic()
if err != nil {
n.Logger.Error("failed to validate header", "error", err)
return false
}
n.blockManager.HeaderInCh <- &header
return true
}
}

// newFraudProofValidator returns a pubsub validator that validates a fraud proof and forwards
// it to be verified
func (n *FullNode) newFraudProofValidator() p2p.GossipValidator {
Expand Down
1 change: 1 addition & 0 deletions node/full_node_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, d
ListenAddress: "/ip4/127.0.0.1/tcp/" + strconv.Itoa(startPort+n),
}
bmConfig := config.BlockManagerConfig{
DABlockTime: 100 * time.Millisecond,
BlockTime: 1 * time.Second, // blocks must be at least 1 sec apart for adjacent headers to get verified correctly
NamespaceID: types.NamespaceID{8, 7, 6, 5, 4, 3, 2, 1},
FraudProofs: true,
Expand Down
41 changes: 16 additions & 25 deletions node/header_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,21 @@ import (
)

type HeaderExchangeService struct {
conf config.NodeConfig
genesis *tmtypes.GenesisDoc
p2p *p2p.Client
ex *goheaderp2p.Exchange[*types.SignedHeader]
syncer *sync.Syncer[*types.SignedHeader]
sub *goheaderp2p.Subscriber[*types.SignedHeader]
p2pServer *goheaderp2p.ExchangeServer[*types.SignedHeader]
headerStore *goheaderstore.Store[*types.SignedHeader]
syncerStarted bool
syncedHeadersCh chan *types.SignedHeader
conf config.NodeConfig
genesis *tmtypes.GenesisDoc
p2p *p2p.Client
ex *goheaderp2p.Exchange[*types.SignedHeader]
syncer *sync.Syncer[*types.SignedHeader]
sub *goheaderp2p.Subscriber[*types.SignedHeader]
p2pServer *goheaderp2p.ExchangeServer[*types.SignedHeader]
headerStore *goheaderstore.Store[*types.SignedHeader]
syncerStarted bool

logger log.Logger
ctx context.Context
}

func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *tmtypes.GenesisDoc, p2p *p2p.Client, syncedHeadersCh chan *types.SignedHeader, logger log.Logger) (*HeaderExchangeService, error) {
func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *tmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*HeaderExchangeService, error) {
// store is TxnDatastore, but we require Batching, hence the type assertion
// note, the badger datastore impl that is used in the background implements both
storeBatch, ok := store.(ds.Batching)
Expand All @@ -54,13 +53,12 @@ func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf c
}

return &HeaderExchangeService{
conf: conf,
genesis: genesis,
p2p: p2p,
ctx: ctx,
headerStore: ss,
syncedHeadersCh: syncedHeadersCh,
logger: logger,
conf: conf,
genesis: genesis,
p2p: p2p,
ctx: ctx,
headerStore: ss,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -92,13 +90,6 @@ func (hExService *HeaderExchangeService) tryInitHeaderStoreAndStartSyncer(ctx co
if err := hExService.initHeaderStoreAndStartSyncer(ctx, trustedHeader); err != nil {
hExService.logger.Error("failed to initialize the headerstore and start syncer", "error", err)
}
} else {
signedHeader := <-hExService.syncedHeadersCh
if signedHeader.Header.Height() == hExService.genesis.InitialHeight {
if err := hExService.initHeaderStoreAndStartSyncer(ctx, signedHeader); err != nil {
hExService.logger.Error("failed to initialize the headerstore and start syncer", "error", err)
}
}
}
}

Expand Down
3 changes: 1 addition & 2 deletions node/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/rollkit/rollkit/config"
"github.com/rollkit/rollkit/p2p"
"github.com/rollkit/rollkit/store"
"github.com/rollkit/rollkit/types"
)

var _ Node = &LightNode{}
Expand Down Expand Up @@ -56,7 +55,7 @@ func newLightNode(
return nil, err
}

headerExchangeService, err := NewHeaderExchangeService(ctx, datastore, conf, genesis, client, make(chan *types.SignedHeader), logger.With("module", "HeaderExchangeService"))
headerExchangeService, err := NewHeaderExchangeService(ctx, datastore, conf, genesis, client, logger.With("module", "HeaderExchangeService"))
if err != nil {
return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err)
}
Expand Down
27 changes: 26 additions & 1 deletion types/signed_header.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package types

import "github.com/celestiaorg/go-header"
import (
"fmt"

"github.com/celestiaorg/go-header"
)

func (sH *SignedHeader) New() header.Header {
return new(SignedHeader)
Expand All @@ -9,3 +13,24 @@ func (sH *SignedHeader) New() header.Header {
func (sH *SignedHeader) IsZero() bool {
return sH == nil
}

func (sH *SignedHeader) VerifyAdjacent(untrst header.Header) error {
// Explicit type checks are required due to embedded Header which also does the explicit type check
untrstH, ok := untrst.(*SignedHeader)
if !ok {
return &header.VerifyError{
Reason: fmt.Errorf("%T is not of type %T", untrst, sH),
}
}
return sH.Header.VerifyAdjacent(&untrstH.Header)
}

func (sH *SignedHeader) VerifyNonAdjacent(untrst header.Header) error {
untrstH, ok := untrst.(*SignedHeader)
Comment thread
gupadhyaya marked this conversation as resolved.
if !ok {
return &header.VerifyError{
Reason: fmt.Errorf("%T is not of type %T", untrst, sH),
}
}
return sH.Header.VerifyNonAdjacent(&untrstH.Header)
}