Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 79 additions & 25 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package block
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/proxy"
tmtypes "github.com/tendermint/tendermint/types"
"go.uber.org/multierr"

"github.com/celestiaorg/optimint/config"
"github.com/celestiaorg/optimint/da"
Expand All @@ -22,6 +24,9 @@ import (
"github.com/celestiaorg/optimint/types"
)

// defaultDABlockTime is used only if DABlockTime is not configured for manager
const defaultDABlockTime = 30 * time.Second
Comment thread
tzdybal marked this conversation as resolved.

// Manager is responsible for aggregating transactions into blocks.
type Manager struct {
lastState state.State
Expand All @@ -36,15 +41,21 @@ type Manager struct {

dalc da.DataAvailabilityLayerClient
retriever da.BlockRetriever
// daHeight is the height of the latest processed DA block
daHeight uint64

HeaderOutCh chan *types.Header
HeaderInCh chan *types.Header

syncTarget uint64
blockInCh chan *types.Block
retrieveCh chan uint64
syncCache map[uint64]*types.Block

// retrieveMtx is used by retrieveCond
retrieveMtx *sync.Mutex
// retrieveCond is used to notify sync goroutine (SyncLoop) that it needs to retrieve data
retrieveCond *sync.Cond

logger log.Logger
}

Expand Down Expand Up @@ -72,12 +83,20 @@ func NewManager(
if err != nil {
return nil, err
}
if s.DAHeight < conf.DAStartHeight {
s.DAHeight = conf.DAStartHeight
}

proposerAddress, err := getAddress(proposerKey)
if err != nil {
return nil, err
}

if conf.DABlockTime == 0 {
logger.Info("WARNING: using default DA block time", "DABlockTime", defaultDABlockTime)
conf.DABlockTime = defaultDABlockTime
}
Comment thread
adlerjohn marked this conversation as resolved.

exec := state.NewBlockExecutor(proposerAddress, conf.NamespaceID, genesis.ChainID, mempool, proxyApp, eventBus, logger)
if s.LastBlockHeight+1 == genesis.InitialHeight {
res, err := exec.InitChain(genesis)
Expand All @@ -100,13 +119,16 @@ func NewManager(
executor: exec,
dalc: dalc,
retriever: dalc.(da.BlockRetriever), // TODO(tzdybal): do it in more gentle way (after MVP)
HeaderOutCh: make(chan *types.Header),
HeaderInCh: make(chan *types.Header),
blockInCh: make(chan *types.Block),
retrieveCh: make(chan uint64),
daHeight: s.DAHeight,
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
HeaderOutCh: make(chan *types.Header, 100),
HeaderInCh: make(chan *types.Header, 100),
blockInCh: make(chan *types.Block, 100),
retrieveMtx: new(sync.Mutex),
syncCache: make(map[uint64]*types.Block),
logger: logger,
}
agg.retrieveCond = sync.NewCond(agg.retrieveMtx)

return agg, nil
}
Expand Down Expand Up @@ -142,8 +164,11 @@ func (m *Manager) AggregationLoop(ctx context.Context) {
}

func (m *Manager) SyncLoop(ctx context.Context) {
daTicker := time.NewTicker(m.conf.DABlockTime)
for {
select {
case <-daTicker.C:
m.retrieveCond.Signal()
case header := <-m.HeaderInCh:
m.logger.Debug("block header received", "height", header.Height, "hash", header.Hash())
newHeight := header.Height
Expand All @@ -153,14 +178,15 @@ func (m *Manager) SyncLoop(ctx context.Context) {
// it's handled gently in RetrieveLoop
if newHeight > currentHeight {
atomic.StoreUint64(&m.syncTarget, newHeight)
m.retrieveCh <- newHeight
m.retrieveCond.Signal()
}
case block := <-m.blockInCh:
m.logger.Debug("block body retrieved from DALC",
"height", block.Header.Height,
"hash", block.Hash(),
)
m.syncCache[block.Header.Height] = block
m.retrieveCond.Signal()
currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory
b1, ok1 := m.syncCache[currentHeight+1]
b2, ok2 := m.syncCache[currentHeight+2]
Expand All @@ -181,6 +207,7 @@ func (m *Manager) SyncLoop(ctx context.Context) {
continue
}

newState.DAHeight = atomic.LoadUint64(&m.daHeight)
m.lastState = newState
err = m.store.UpdateState(m.lastState)
if err != nil {
Expand All @@ -195,50 +222,75 @@ func (m *Manager) SyncLoop(ctx context.Context) {
}
}

// RetrieveLoop is responsible for interacting with DA layer.
func (m *Manager) RetrieveLoop(ctx context.Context) {
// waitCh is used to signal the retrieve loop, that it should process next blocks
// retrieveCond can be signalled in completely async manner, and goroutine below
// works as some kind of "buffer" for those signals
waitCh := make(chan interface{})
go func() {
for {
m.retrieveMtx.Lock()
m.retrieveCond.Wait()
waitCh <- nil
m.retrieveMtx.Unlock()
if ctx.Err() != nil {
return
}
}
}()

for {
select {
case <-m.retrieveCh:
target := atomic.LoadUint64(&m.syncTarget)
for h := m.store.Height() + 1; h <= target; h++ {
m.logger.Debug("trying to retrieve block from DALC", "height", h)
m.mustRetrieveBlock(ctx, h)
case <-waitCh:
for {
daHeight := atomic.LoadUint64(&m.daHeight)
m.logger.Debug("retrieve", "daHeight", daHeight)
err := m.processNextDABlock()
if err != nil {
m.logger.Error("failed to retrieve block from DALC", "daHeight", daHeight, "errors", err.Error())
break
}
atomic.AddUint64(&m.daHeight, 1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, are you sure this is the right use of atomics?

What if &m.daHeight changes in between loading it and adding 1 to it here? Then you'd be adding 2 to it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only place where m.daHeight is changed. It's atomic change, because we're reading this variable in several other places (in other goroutines). TBH the LoadUint64 in the beginning of the loop is not necessary (normal read should be fine), but I decided to handle this variable "atomically" everywhere for consistency.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is if e.g. &m.daHeight is N, then it could be read as N in two separate threads and incremented twice to N+2, completely bypassing N+1. Is that ever possible, if is so, is it a problem?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible right now, as there is only one thread incrementing this variable, and I don't see a reason to add more threads/goroutines.

}
case <-ctx.Done():
return
}
}
}

func (m *Manager) mustRetrieveBlock(ctx context.Context, height uint64) {
func (m *Manager) processNextDABlock() error {
// TODO(tzdybal): extract configuration option
maxRetries := 10
daHeight := atomic.LoadUint64(&m.daHeight)

var err error
m.logger.Debug("trying to retrieve block from DA", "daHeight", daHeight)
for r := 0; r < maxRetries; r++ {
err := m.fetchBlock(ctx, height)
if err == nil {
return
blockResp, fetchErr := m.fetchBlock(daHeight)
if fetchErr != nil {
err = multierr.Append(err, fetchErr)
time.Sleep(100 * time.Millisecond)
Comment thread
adlerjohn marked this conversation as resolved.
} else {
for _, block := range blockResp.Blocks {
m.blockInCh <- block
}
return nil
}
// TODO(tzdybal): configuration option
// TODO(tzdybal): exponential backoff
time.Sleep(100 * time.Millisecond)
}
// TODO(tzdybal): this is only temporary solution, for MVP
panic("failed to retrieve block with DALC")
return err
}

func (m *Manager) fetchBlock(ctx context.Context, height uint64) error {
func (m *Manager) fetchBlock(daHeight uint64) (da.ResultRetrieveBlocks, error) {
var err error
blockRes := m.retriever.RetrieveBlock(height)
blockRes := m.retriever.RetrieveBlocks(daHeight)
switch blockRes.Code {
case da.StatusSuccess:
m.blockInCh <- blockRes.Block
case da.StatusError:
err = fmt.Errorf("failed to retrieve block: %s", blockRes.Message)
case da.StatusTimeout:
err = fmt.Errorf("timeout during retrieve block: %s", blockRes.Message)
}
return err
return blockRes, err
}

func (m *Manager) getRemainingSleep(start time.Time) time.Duration {
Expand Down Expand Up @@ -305,6 +357,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
return err
}

newState.DAHeight = atomic.LoadUint64(&m.daHeight)
m.lastState = newState
err = m.store.UpdateState(m.lastState)
if err != nil {
Expand All @@ -320,6 +373,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
}

func (m *Manager) broadcastBlock(ctx context.Context, block *types.Block) error {
m.logger.Debug("submitting block to DA layer", "height", block.Header.Height)
res := m.dalc.SubmitBlock(block)
if res.Code != da.StatusSuccess {
return fmt.Errorf("DA layer submission failed: %s", res.Message)
Expand Down
9 changes: 7 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ type NodeConfig struct {

// BlockManagerConfig consists of all parameters required by BlockManagerConfig
type BlockManagerConfig struct {
BlockTime time.Duration `mapstructure:"block_time"`
NamespaceID [8]byte `mapstructure:"namespace_id"`
// BlockTime defines how often new blocks are produced
BlockTime time.Duration `mapstructure:"block_time"`
// DABlockTime informs about block time of underlying data availability layer
DABlockTime time.Duration `mapstructure:"da_block_time"`
// DAStartHeight allows skipping first DAStartHeight-1 blocks when querying for blocks.
DAStartHeight uint64 `mapstructure:"da_start_height"`
NamespaceID [8]byte `mapstructure:"namespace_id"`
}

func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error {
Expand Down
12 changes: 7 additions & 5 deletions da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type DAResult struct {
Code StatusCode
// Message may contain DA layer specific information (like DA block height/hash, detailed error message, etc)
Message string
// DAHeight informs about a height on Data Availability Layer for given result.
DAHeight uint64
}

// ResultSubmitBlock contains information returned from DA layer after block submission.
Expand All @@ -43,11 +45,11 @@ type ResultCheckBlock struct {
DataAvailable bool
}

type ResultRetrieveBlock struct {
type ResultRetrieveBlocks struct {
DAResult
// Block is the full block retrieved from Data Availability Layer.
// If Code is not equal to StatusSuccess, it has to be nil.
Block *types.Block
Blocks []*types.Block
}

// DataAvailabilityLayerClient defines generic interface for DA layer block submission.
Expand All @@ -65,12 +67,12 @@ type DataAvailabilityLayerClient interface {
SubmitBlock(block *types.Block) ResultSubmitBlock

// CheckBlockAvailability queries DA layer to check data availability of block corresponding to given header.
CheckBlockAvailability(header *types.Header) ResultCheckBlock
CheckBlockAvailability(dataLayerHeight uint64) ResultCheckBlock
}

// BlockRetriever is additional interface that can be implemented by Data Availability Layer Client that is able to retrieve
// block data from DA layer. This gives the ability to use it for block synchronization.
type BlockRetriever interface {
// RetrieveBlock returns block at given height from data availability layer.
RetrieveBlock(height uint64) ResultRetrieveBlock
// RetrieveBlocks returns blocks at given data layer height from data availability layer.
RetrieveBlocks(dataLayerHeight uint64) ResultRetrieveBlocks
}
38 changes: 25 additions & 13 deletions da/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,16 @@ func (d *DataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.ResultS
}
}
return da.ResultSubmitBlock{
DAResult: da.DAResult{Code: da.StatusCode(resp.Result.Code), Message: resp.Result.Message},
DAResult: da.DAResult{
Code: da.StatusCode(resp.Result.Code),
Message: resp.Result.Message,
DAHeight: resp.Result.DataLayerHeight,
},
}
}

func (d *DataAvailabilityLayerClient) CheckBlockAvailability(header *types.Header) da.ResultCheckBlock {
resp, err := d.client.CheckBlockAvailability(context.TODO(), &dalc.CheckBlockAvailabilityRequest{Header: header.ToProto()})
func (d *DataAvailabilityLayerClient) CheckBlockAvailability(dataLayerHeight uint64) da.ResultCheckBlock {
resp, err := d.client.CheckBlockAvailability(context.TODO(), &dalc.CheckBlockAvailabilityRequest{DataLayerHeight: dataLayerHeight})
if err != nil {
return da.ResultCheckBlock{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}}
}
Expand All @@ -90,19 +94,27 @@ func (d *DataAvailabilityLayerClient) CheckBlockAvailability(header *types.Heade
}
}

func (d *DataAvailabilityLayerClient) RetrieveBlock(height uint64) da.ResultRetrieveBlock {
resp, err := d.client.RetrieveBlock(context.TODO(), &dalc.RetrieveBlockRequest{Height: height})
func (d *DataAvailabilityLayerClient) RetrieveBlocks(dataLayerHeight uint64) da.ResultRetrieveBlocks {
resp, err := d.client.RetrieveBlocks(context.TODO(), &dalc.RetrieveBlocksRequest{DataLayerHeight: dataLayerHeight})
if err != nil {
return da.ResultRetrieveBlock{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}}
return da.ResultRetrieveBlocks{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}}
}

var b types.Block
err = b.FromProto(resp.Block)
if err != nil {
return da.ResultRetrieveBlock{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}}
blocks := make([]*types.Block, len(resp.Blocks))
for i, block := range resp.Blocks {
var b types.Block
err = b.FromProto(block)
if err != nil {
return da.ResultRetrieveBlocks{DAResult: da.DAResult{Code: da.StatusError, Message: err.Error()}}
}
blocks[i] = &b
}
return da.ResultRetrieveBlock{
DAResult: da.DAResult{Code: da.StatusCode(resp.Result.Code), Message: resp.Result.Message},
Block: &b,
return da.ResultRetrieveBlocks{
DAResult: da.DAResult{
Code: da.StatusCode(resp.Result.Code),
Message: resp.Result.Message,
DAHeight: dataLayerHeight,
},
Blocks: blocks,
}
}
2 changes: 1 addition & 1 deletion da/grpc/mockserv/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func main() {
log.Panic(err)
}
log.Println("Listening on:", lis.Addr())
srv := mockserv.GetServer(kv, conf)
srv := mockserv.GetServer(kv, conf, nil)
if err := srv.Serve(lis); err != nil {
log.Println("error while serving:", err)
}
Expand Down
Loading