Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions node/header_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ type HeaderExchangeService struct {
conf config.NodeConfig
genesis *tmtypes.GenesisDoc
p2p *p2p.Client
ex *goheaderp2p.Exchange[*types.Header]
syncer *sync.Syncer[*types.Header]
sub *goheaderp2p.Subscriber[*types.Header]
p2pServer *goheaderp2p.ExchangeServer[*types.Header]
headerStore *goheaderstore.Store[*types.Header]
ex *goheaderp2p.Exchange[*types.SignedHeader]
syncer *sync.Syncer[*types.SignedHeader]
sub *goheaderp2p.Subscriber[*types.SignedHeader]
p2pServer *goheaderp2p.ExchangeServer[*types.SignedHeader]
headerStore *goheaderstore.Store[*types.SignedHeader]
syncerStarted bool
syncedHeadersCh chan *types.SignedHeader

Expand All @@ -48,7 +48,7 @@ func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf c
if !ok {
return nil, errors.New("failed to access the datastore")
}
ss, err := goheaderstore.NewStore[*types.Header](storeBatch)
ss, err := goheaderstore.NewStore[*types.SignedHeader](storeBatch)
if err != nil {
return nil, fmt.Errorf("failed to initialize the header store: %w", err)
}
Expand All @@ -64,7 +64,7 @@ func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf c
}, nil
}

func (hExService *HeaderExchangeService) initOrAppendHeaderStore(ctx context.Context, header *types.Header) error {
func (hExService *HeaderExchangeService) initOrAppendHeaderStore(ctx context.Context, header *types.SignedHeader) error {
var err error

// Init the header store if first block, else append to store
Expand All @@ -76,7 +76,7 @@ func (hExService *HeaderExchangeService) initOrAppendHeaderStore(ctx context.Con
return err
}

func (hExService *HeaderExchangeService) initHeaderStoreAndStartSyncer(ctx context.Context, initial *types.Header) error {
func (hExService *HeaderExchangeService) initHeaderStoreAndStartSyncer(ctx context.Context, initial *types.SignedHeader) error {
if err := hExService.headerStore.Init(ctx, initial); err != nil {
return err
}
Expand All @@ -87,15 +87,15 @@ func (hExService *HeaderExchangeService) initHeaderStoreAndStartSyncer(ctx conte
return nil
}

func (hExService *HeaderExchangeService) tryInitHeaderStoreAndStartSyncer(ctx context.Context, trustedHeader *types.Header) {
func (hExService *HeaderExchangeService) tryInitHeaderStoreAndStartSyncer(ctx context.Context, trustedHeader *types.SignedHeader) {
if trustedHeader != nil {
if err := hExService.initHeaderStoreAndStartSyncer(ctx, trustedHeader); err != nil {
hExService.logger.Error("failed to initialize the headerstore and start syncer", "error", err)
}
} else {
signedHeader := <-hExService.syncedHeadersCh
if signedHeader.Header.Height() == hExService.genesis.InitialHeight {
if err := hExService.initHeaderStoreAndStartSyncer(ctx, &signedHeader.Header); err != nil {
if err := hExService.initHeaderStoreAndStartSyncer(ctx, signedHeader); err != nil {
hExService.logger.Error("failed to initialize the headerstore and start syncer", "error", err)
}
}
Expand All @@ -104,12 +104,12 @@ func (hExService *HeaderExchangeService) tryInitHeaderStoreAndStartSyncer(ctx co

func (hExService *HeaderExchangeService) writeToHeaderStoreAndBroadcast(ctx context.Context, signedHeader *types.SignedHeader) {
// Init the header store if first block, else append to store
if err := hExService.initOrAppendHeaderStore(ctx, &signedHeader.Header); err != nil {
if err := hExService.initOrAppendHeaderStore(ctx, signedHeader); err != nil {
hExService.logger.Error("failed to write block header to header store", "error", err)
}

// Broadcast for subscribers
if err := hExService.sub.Broadcast(ctx, &signedHeader.Header); err != nil {
if err := hExService.sub.Broadcast(ctx, signedHeader); err != nil {
hExService.logger.Error("failed to broadcast block header", "error", err)
}
}
Expand All @@ -119,7 +119,7 @@ func (hExService *HeaderExchangeService) Start() error {
var err error
// have to do the initializations here to utilize the p2p node which is created on start
ps := hExService.p2p.PubSub()
hExService.sub = goheaderp2p.NewSubscriber[*types.Header](ps, pubsub.DefaultMsgIdFn)
hExService.sub = goheaderp2p.NewSubscriber[*types.SignedHeader](ps, pubsub.DefaultMsgIdFn)
if err = hExService.sub.Start(hExService.ctx); err != nil {
return fmt.Errorf("error while starting subscriber: %w", err)
}
Expand Down Expand Up @@ -167,7 +167,7 @@ func (hExService *HeaderExchangeService) Start() error {
}

// Look to see if trusted hash is passed, if not get the genesis header
var trustedHeader *types.Header
var trustedHeader *types.SignedHeader
// Try fetching the trusted header from peers if exists
if len(peerIDs) > 0 {
if hExService.conf.TrustedHash != "" {
Expand Down Expand Up @@ -208,11 +208,11 @@ func (hExService *HeaderExchangeService) Stop() error {
// newP2PServer constructs a new ExchangeServer using the given Network as a protocolID suffix.
func newP2PServer(
host host.Host,
store *goheaderstore.Store[*types.Header],
store *goheaderstore.Store[*types.SignedHeader],
network string,
opts ...goheaderp2p.Option[goheaderp2p.ServerParameters],
) (*goheaderp2p.ExchangeServer[*types.Header], error) {
return goheaderp2p.NewExchangeServer[*types.Header](host, store, network, opts...)
) (*goheaderp2p.ExchangeServer[*types.SignedHeader], error) {
return goheaderp2p.NewExchangeServer[*types.SignedHeader](host, store, network, opts...)
}

func newP2PExchange(
Expand All @@ -221,16 +221,16 @@ func newP2PExchange(
network string,
conngater *conngater.BasicConnectionGater,
opts ...goheaderp2p.Option[goheaderp2p.ClientParameters],
) (*goheaderp2p.Exchange[*types.Header], error) {
return goheaderp2p.NewExchange[*types.Header](host, peers, network, conngater, opts...)
) (*goheaderp2p.Exchange[*types.SignedHeader], error) {
return goheaderp2p.NewExchange[*types.SignedHeader](host, peers, network, conngater, opts...)
}

// newSyncer constructs new Syncer for headers.
func newSyncer(
ex header.Exchange[*types.Header],
store header.Store[*types.Header],
sub header.Subscriber[*types.Header],
ex header.Exchange[*types.SignedHeader],
store header.Store[*types.SignedHeader],
sub header.Subscriber[*types.SignedHeader],
opt goheadersync.Options,
) (*sync.Syncer[*types.Header], error) {
) (*sync.Syncer[*types.SignedHeader], error) {
return sync.NewSyncer(ex, store, sub, opt)
}
2 changes: 1 addition & 1 deletion types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Commit struct {
//
// Used mostly for gossiping.
type SignedHeader struct {
Header Header
Header
Commit Commit
}

Expand Down
11 changes: 11 additions & 0 deletions types/signed_header.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package types

import "github.com/celestiaorg/go-header"

func (sH *SignedHeader) New() header.Header {
return new(SignedHeader)
}

func (sH *SignedHeader) IsZero() bool {
return sH == nil
}