From 205dcc84bfbd6b7a546fe71bc426826d3401fe7f Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Mon, 9 Jan 2023 20:52:46 -0500 Subject: [PATCH 01/22] refactored service.Subscribe to wrap client.Subscribe --- rpc/json/service.go | 44 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/rpc/json/service.go b/rpc/json/service.go index af27c367a6..a8486a718a 100644 --- a/rpc/json/service.go +++ b/rpc/json/service.go @@ -9,8 +9,8 @@ import ( "time" "github.com/gorilla/rpc/v2/json2" - "github.com/tendermint/tendermint/libs/pubsub" - tmquery "github.com/tendermint/tendermint/libs/pubsub/query" + /*"github.com/tendermint/tendermint/libs/pubsub" + tmquery "github.com/tendermint/tendermint/libs/pubsub/query"*/ rpcclient "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -91,10 +91,10 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo // TODO(tzdybal): pass config and check subscriptions limits - q, err := tmquery.New(args.Query) + /*q, err := tmquery.New(args.Query) if err != nil { return nil, fmt.Errorf("failed to parse query: %w", err) - } + }*/ s.logger.Debug("subscribe to query", "remote", addr, "query", args.Query) @@ -104,7 +104,39 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo ctx, cancel := context.WithTimeout(req.Context(), SubscribeTimeout) defer cancel() - sub, err := s.client.EventBus.Subscribe(ctx, addr, q, subBufferSize) + eventCh, err := s.client.Subscribe(ctx, addr, args.Query, subBufferSize) + if err != nil { + return nil, fmt.Errorf("failed to subscribe: %w", err) + } + go func() { + for { + select { + case msg := <-eventCh: + data, err := json.Marshal(msg.Data) + if err != nil { + s.logger.Error("failed to marshal response data", "error", err) + continue + } + if wsConn != nil { + wsConn.queue <- data + } + case <-ctx.Done(): + /*if sub.Err() != pubsub.ErrUnsubscribed { + var reason string + if sub.Err() == nil { + reason = "unknown failure" + } else { + reason = sub.Err().Error() + } + s.logger.Error("subscription was cancelled", "reason", reason) + }*/ + return + } + + } + }() + + /*sub, err := s.client.EventBus.Subscribe(ctx, addr, q, subBufferSize) if err != nil { return nil, fmt.Errorf("failed to subscribe: %w", err) } @@ -134,7 +166,7 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo return } } - }() + }()*/ return &ctypes.ResultSubscribe{}, nil } From eb1f2066419de0f53ce13951909424d4d66eee82 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Mon, 9 Jan 2023 23:21:07 -0500 Subject: [PATCH 02/22] wrap the client.Subscribe method --- rpc/json/service.go | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/rpc/json/service.go b/rpc/json/service.go index a8486a718a..1728fb0ea5 100644 --- a/rpc/json/service.go +++ b/rpc/json/service.go @@ -42,7 +42,8 @@ func newMethod(m interface{}) *method { } type service struct { - client *client.Client + //client *client.Client + client rpcclient.Client methods map[string]*method logger log.Logger } @@ -101,10 +102,10 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo // TODO(tzdybal): extract consts or configs const SubscribeTimeout = 5 * time.Second const subBufferSize = 100 - ctx, cancel := context.WithTimeout(req.Context(), SubscribeTimeout) - defer cancel() + /*ctx, cancel := context.WithTimeout(req.Context(), SubscribeTimeout) + defer cancel()*/ - eventCh, err := s.client.Subscribe(ctx, addr, args.Query, subBufferSize) + eventCh, err := s.client.Subscribe(req.Context(), addr, args.Query, subBufferSize) if err != nil { return nil, fmt.Errorf("failed to subscribe: %w", err) } @@ -120,17 +121,6 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo if wsConn != nil { wsConn.queue <- data } - case <-ctx.Done(): - /*if sub.Err() != pubsub.ErrUnsubscribed { - var reason string - if sub.Err() == nil { - reason = "unknown failure" - } else { - reason = sub.Err().Error() - } - s.logger.Error("subscription was cancelled", "reason", reason) - }*/ - return } } From 12e1c1f8df97c4352134be3e4f3ed2053d2846aa Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Mon, 9 Jan 2023 23:23:20 -0500 Subject: [PATCH 03/22] cleaned up --- rpc/json/service.go | 42 ------------------------------------------ 1 file changed, 42 deletions(-) diff --git a/rpc/json/service.go b/rpc/json/service.go index 1728fb0ea5..155c1d729c 100644 --- a/rpc/json/service.go +++ b/rpc/json/service.go @@ -9,8 +9,6 @@ import ( "time" "github.com/gorilla/rpc/v2/json2" - /*"github.com/tendermint/tendermint/libs/pubsub" - tmquery "github.com/tendermint/tendermint/libs/pubsub/query"*/ rpcclient "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -42,7 +40,6 @@ func newMethod(m interface{}) *method { } type service struct { - //client *client.Client client rpcclient.Client methods map[string]*method logger log.Logger @@ -92,18 +89,11 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo // TODO(tzdybal): pass config and check subscriptions limits - /*q, err := tmquery.New(args.Query) - if err != nil { - return nil, fmt.Errorf("failed to parse query: %w", err) - }*/ - s.logger.Debug("subscribe to query", "remote", addr, "query", args.Query) // TODO(tzdybal): extract consts or configs const SubscribeTimeout = 5 * time.Second const subBufferSize = 100 - /*ctx, cancel := context.WithTimeout(req.Context(), SubscribeTimeout) - defer cancel()*/ eventCh, err := s.client.Subscribe(req.Context(), addr, args.Query, subBufferSize) if err != nil { @@ -126,38 +116,6 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo } }() - /*sub, err := s.client.EventBus.Subscribe(ctx, addr, q, subBufferSize) - if err != nil { - return nil, fmt.Errorf("failed to subscribe: %w", err) - } - - go func() { - for { - select { - case msg := <-sub.Out(): - data, err := json.Marshal(msg.Data()) - if err != nil { - s.logger.Error("failed to marshal response data", "error", err) - continue - } - if wsConn != nil { - wsConn.queue <- data - } - case <-sub.Cancelled(): - if sub.Err() != pubsub.ErrUnsubscribed { - var reason string - if sub.Err() == nil { - reason = "unknown failure" - } else { - reason = sub.Err().Error() - } - s.logger.Error("subscription was cancelled", "reason", reason) - } - return - } - } - }()*/ - return &ctypes.ResultSubscribe{}, nil } From 86b0ba669015a9805fd79eefb1aada0a04eb0268 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Mon, 9 Jan 2023 23:32:59 -0500 Subject: [PATCH 04/22] change functions to accept rpcclient.Client interface --- rpc/json/service.go | 5 ++--- rpc/server.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/rpc/json/service.go b/rpc/json/service.go index 155c1d729c..75bee5b581 100644 --- a/rpc/json/service.go +++ b/rpc/json/service.go @@ -13,11 +13,10 @@ import ( ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/celestiaorg/rollmint/log" - "github.com/celestiaorg/rollmint/rpc/client" ) // GetHTTPHandler returns handler configured to serve Tendermint-compatible RPC. -func GetHTTPHandler(l *client.Client, logger log.Logger) (http.Handler, error) { +func GetHTTPHandler(l rpcclient.Client, logger log.Logger) (http.Handler, error) { return newHandler(newService(l, logger), json2.NewCodec(), logger), nil } @@ -45,7 +44,7 @@ type service struct { logger log.Logger } -func newService(c *client.Client, l log.Logger) *service { +func newService(c rpcclient.Client, l log.Logger) *service { s := service{ client: c, logger: l, diff --git a/rpc/server.go b/rpc/server.go index 3aa1b78381..517f027e69 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -25,7 +25,7 @@ type Server struct { *service.BaseService config *config.RPCConfig - client *client.Client + client rpcclient.Client server http.Server } From bda998b5da1217c881d3fe32c91e2c5598b1d0b1 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Tue, 10 Jan 2023 00:49:06 -0500 Subject: [PATCH 05/22] moved client to node package, renamed Node to FullNode --- {rpc => node}/client/client_test.go | 0 rpc/client/client.go => node/client/full_client.go | 0 node/{node.go => full.go} | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename {rpc => node}/client/client_test.go (100%) rename rpc/client/client.go => node/client/full_client.go (100%) rename node/{node.go => full.go} (100%) diff --git a/rpc/client/client_test.go b/node/client/client_test.go similarity index 100% rename from rpc/client/client_test.go rename to node/client/client_test.go diff --git a/rpc/client/client.go b/node/client/full_client.go similarity index 100% rename from rpc/client/client.go rename to node/client/full_client.go diff --git a/node/node.go b/node/full.go similarity index 100% rename from node/node.go rename to node/full.go From b3cafe0feac4db07e4aabbee5b401397c3f3b941 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Tue, 10 Jan 2023 01:17:34 -0500 Subject: [PATCH 06/22] created and implemented note interface, and GetClient for FullNode --- node/full.go | 38 ++++----- node/{client => }/full_client.go | 85 ++++++++++--------- .../client_test.go => full_client_test.go} | 23 +++-- node/integration_test.go | 10 +-- node/node.go | 10 +++ rpc/json/service_test.go | 11 ++- rpc/server.go | 5 +- 7 files changed, 96 insertions(+), 86 deletions(-) rename node/{client => }/full_client.go (86%) rename node/{client/client_test.go => full_client_test.go} (96%) create mode 100644 node/node.go diff --git a/node/full.go b/node/full.go index 52f434042d..540df00943 100644 --- a/node/full.go +++ b/node/full.go @@ -46,9 +46,9 @@ const ( genesisChunkSize = 16 * 1024 * 1024 // 16 MiB ) -// Node represents a client node in rollmint network. +// FullNode represents a client node in rollmint network. // It connects all the components and orchestrates their work. -type Node struct { +type FullNode struct { service.BaseService eventBus *tmtypes.EventBus appClient abciclient.Client @@ -87,7 +87,7 @@ func NewNode( appClient abciclient.Client, genesis *tmtypes.GenesisDoc, logger log.Logger, -) (*Node, error) { +) (*FullNode, error) { eventBus := tmtypes.NewEventBus() eventBus.SetLogger(logger.With("module", "events")) if err := eventBus.Start(); err != nil { @@ -134,7 +134,7 @@ func NewNode( return nil, fmt.Errorf("BlockManager initialization error: %w", err) } - node := &Node{ + node := &FullNode{ appClient: appClient, eventBus: eventBus, genesis: genesis, @@ -164,7 +164,7 @@ func NewNode( // initGenesisChunks creates a chunked format of the genesis document to make it easier to // iterate through larger genesis structures. -func (n *Node) initGenesisChunks() error { +func (n *FullNode) initGenesisChunks() error { if n.genChunks != nil { return nil } @@ -191,7 +191,7 @@ func (n *Node) initGenesisChunks() error { return nil } -func (n *Node) headerPublishLoop(ctx context.Context) { +func (n *FullNode) headerPublishLoop(ctx context.Context) { for { select { case signedHeader := <-n.blockManager.HeaderOutCh: @@ -210,7 +210,7 @@ func (n *Node) headerPublishLoop(ctx context.Context) { } // OnStart is a part of Service interface. -func (n *Node) OnStart() error { +func (n *FullNode) OnStart() error { n.Logger.Info("starting P2P client") err := n.P2P.Start(n.ctx) if err != nil { @@ -232,12 +232,12 @@ func (n *Node) OnStart() error { } // GetGenesis returns entire genesis doc. -func (n *Node) GetGenesis() *tmtypes.GenesisDoc { +func (n *FullNode) GetGenesis() *tmtypes.GenesisDoc { return n.genesis } // GetGenesisChunks returns chunked version of genesis. -func (n *Node) GetGenesisChunks() ([]string, error) { +func (n *FullNode) GetGenesisChunks() ([]string, error) { err := n.initGenesisChunks() if err != nil { return nil, err @@ -246,40 +246,40 @@ func (n *Node) GetGenesisChunks() ([]string, error) { } // OnStop is a part of Service interface. -func (n *Node) OnStop() { +func (n *FullNode) OnStop() { err := n.dalc.Stop() err = multierr.Append(err, n.P2P.Close()) n.Logger.Error("errors while stopping node:", "errors", err) } // OnReset is a part of Service interface. -func (n *Node) OnReset() error { +func (n *FullNode) OnReset() error { panic("OnReset - not implemented!") } // SetLogger sets the logger used by node. -func (n *Node) SetLogger(logger log.Logger) { +func (n *FullNode) SetLogger(logger log.Logger) { n.Logger = logger } // GetLogger returns logger. -func (n *Node) GetLogger() log.Logger { +func (n *FullNode) GetLogger() log.Logger { return n.Logger } // EventBus gives access to Node's event bus. -func (n *Node) EventBus() *tmtypes.EventBus { +func (n *FullNode) EventBus() *tmtypes.EventBus { return n.eventBus } // AppClient returns ABCI proxy connections to communicate with application. -func (n *Node) AppClient() abciclient.Client { +func (n *FullNode) AppClient() abciclient.Client { return n.appClient } // newTxValidator creates a pubsub validator that uses the node's mempool to check the // transaction. If the transaction is valid, then it is added to the mempool -func (n *Node) newTxValidator() p2p.GossipValidator { +func (n *FullNode) newTxValidator() p2p.GossipValidator { return func(m *p2p.GossipMessage) bool { n.Logger.Debug("transaction received", "bytes", len(m.Data)) checkTxResCh := make(chan *abci.Response, 1) @@ -309,7 +309,7 @@ func (n *Node) newTxValidator() p2p.GossipValidator { // newHeaderValidator returns a pubsub validator that runs basic checks and forwards // the deserialized header for further processing -func (n *Node) newHeaderValidator() p2p.GossipValidator { +func (n *FullNode) newHeaderValidator() p2p.GossipValidator { return func(headerMsg *p2p.GossipMessage) bool { n.Logger.Debug("header received", "from", headerMsg.From, "bytes", len(headerMsg.Data)) var header types.SignedHeader @@ -330,7 +330,7 @@ func (n *Node) newHeaderValidator() p2p.GossipValidator { // newCommitValidator returns a pubsub validator that runs basic checks and forwards // the deserialized commit for further processing -func (n *Node) newCommitValidator() p2p.GossipValidator { +func (n *FullNode) newCommitValidator() p2p.GossipValidator { return func(commitMsg *p2p.GossipMessage) bool { n.Logger.Debug("commit received", "from", commitMsg.From, "bytes", len(commitMsg.Data)) var commit types.Commit @@ -352,7 +352,7 @@ func (n *Node) newCommitValidator() p2p.GossipValidator { // newFraudProofValidator returns a pubsub validator that validates a fraud proof and forwards // it to be verified -func (n *Node) newFraudProofValidator() p2p.GossipValidator { +func (n *FullNode) newFraudProofValidator() p2p.GossipValidator { return func(fraudProofMsg *p2p.GossipMessage) bool { n.Logger.Debug("fraud proof received", "from", fraudProofMsg.From, "bytes", len(fraudProofMsg.Data)) var fraudProof types.FraudProof diff --git a/node/client/full_client.go b/node/full_client.go similarity index 86% rename from node/client/full_client.go rename to node/full_client.go index c51c127c72..45e3b07ec9 100644 --- a/node/client/full_client.go +++ b/node/full_client.go @@ -1,4 +1,4 @@ -package client +package node import ( "context" @@ -25,7 +25,6 @@ import ( rconfig "github.com/celestiaorg/rollmint/config" abciconv "github.com/celestiaorg/rollmint/conv/abci" "github.com/celestiaorg/rollmint/mempool" - "github.com/celestiaorg/rollmint/node" ) const ( @@ -41,29 +40,33 @@ var ( ErrConsensusStateNotAvailable = errors.New("consensus state not available in rollmint") ) -var _ rpcclient.Client = &Client{} +var _ rpcclient.Client = &FullClient{} -// Client implements tendermint RPC client interface. +// FullClient implements tendermint RPC client interface. // // This is the type that is used in communication between cosmos-sdk app and rollmint. -type Client struct { +type FullClient struct { *types.EventBus config *config.RPCConfig - node *node.Node + node *FullNode } // NewClient returns Client working with given node. -func NewClient(node *node.Node) *Client { - return &Client{ +func NewClient(node *FullNode) *FullClient { + return &FullClient{ EventBus: node.EventBus(), config: config.DefaultRPCConfig(), node: node, } } +func (n *FullNode) GetClient() rpcclient.Client { + return NewClient(n) +} + // ABCIInfo returns basic information about application state. -func (c *Client) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) { +func (c *FullClient) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) { resInfo, err := c.appClient().InfoSync(proxy.RequestInfo) if err != nil { return nil, err @@ -72,12 +75,12 @@ func (c *Client) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) { } // ABCIQuery queries for data from application. -func (c *Client) ABCIQuery(ctx context.Context, path string, data tmbytes.HexBytes) (*ctypes.ResultABCIQuery, error) { +func (c *FullClient) ABCIQuery(ctx context.Context, path string, data tmbytes.HexBytes) (*ctypes.ResultABCIQuery, error) { return c.ABCIQueryWithOptions(ctx, path, data, rpcclient.DefaultABCIQueryOptions) } // ABCIQueryWithOptions queries for data from application. -func (c *Client) ABCIQueryWithOptions(ctx context.Context, path string, data tmbytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { +func (c *FullClient) ABCIQueryWithOptions(ctx context.Context, path string, data tmbytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { resQuery, err := c.appClient().QuerySync(abci.RequestQuery{ Path: path, Data: data, @@ -93,7 +96,7 @@ func (c *Client) ABCIQueryWithOptions(ctx context.Context, path string, data tmb // BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit -func (c *Client) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { +func (c *FullClient) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // This implementation corresponds to Tendermints implementation from rpc/core/mempool.go. // ctx.RemoteAddr godoc: If neither HTTPReq nor WSConn is set, an empty string is returned. // This code is a local client, so we can assume that subscriber is "" @@ -184,7 +187,7 @@ func (c *Client) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.Re // BroadcastTxAsync returns right away, with no response. Does not wait for // CheckTx nor DeliverTx results. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async -func (c *Client) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +func (c *FullClient) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { err := c.node.Mempool.CheckTx(tx, nil, mempool.TxInfo{}) if err != nil { return nil, err @@ -200,7 +203,7 @@ func (c *Client) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.Res // BroadcastTxSync returns with the response from CheckTx. Does not wait for // DeliverTx result. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync -func (c *Client) BroadcastTxSync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +func (c *FullClient) BroadcastTxSync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *abci.Response, 1) err := c.node.Mempool.CheckTx(tx, func(res *abci.Response) { resCh <- res @@ -236,7 +239,7 @@ func (c *Client) BroadcastTxSync(ctx context.Context, tx types.Tx) (*ctypes.Resu } // Subscribe subscribe given subscriber to a query. -func (c *Client) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { +func (c *FullClient) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { q, err := tmquery.New(query) if err != nil { return nil, fmt.Errorf("failed to parse query: %w", err) @@ -264,7 +267,7 @@ func (c *Client) Subscribe(ctx context.Context, subscriber, query string, outCap } // Unsubscribe unsubscribes given subscriber from a query. -func (c *Client) Unsubscribe(ctx context.Context, subscriber, query string) error { +func (c *FullClient) Unsubscribe(ctx context.Context, subscriber, query string) error { q, err := tmquery.New(query) if err != nil { return fmt.Errorf("failed to parse query: %w", err) @@ -273,12 +276,12 @@ func (c *Client) Unsubscribe(ctx context.Context, subscriber, query string) erro } // Genesis returns entire genesis. -func (c *Client) Genesis(_ context.Context) (*ctypes.ResultGenesis, error) { +func (c *FullClient) Genesis(_ context.Context) (*ctypes.ResultGenesis, error) { return &ctypes.ResultGenesis{Genesis: c.node.GetGenesis()}, nil } // GenesisChunked returns given chunk of genesis. -func (c *Client) GenesisChunked(context context.Context, id uint) (*ctypes.ResultGenesisChunk, error) { +func (c *FullClient) GenesisChunked(context context.Context, id uint) (*ctypes.ResultGenesisChunk, error) { genChunks, err := c.node.GetGenesisChunks() if err != nil { return nil, fmt.Errorf("error while creating chunks of the genesis document: %w", err) @@ -304,7 +307,7 @@ func (c *Client) GenesisChunked(context context.Context, id uint) (*ctypes.Resul } // BlockchainInfo returns ABCI block meta information for given height range. -func (c *Client) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { +func (c *FullClient) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { const limit int64 = 20 // Currently blocks are not pruned and are synced linearly so the base height is 0 @@ -342,7 +345,7 @@ func (c *Client) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) } // NetInfo returns basic information about client P2P connections. -func (c *Client) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) { +func (c *FullClient) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) { res := ctypes.ResultNetInfo{ Listening: true, } @@ -364,19 +367,19 @@ func (c *Client) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) { } // DumpConsensusState always returns error as there is no consensus state in rollmint. -func (c *Client) DumpConsensusState(ctx context.Context) (*ctypes.ResultDumpConsensusState, error) { +func (c *FullClient) DumpConsensusState(ctx context.Context) (*ctypes.ResultDumpConsensusState, error) { return nil, ErrConsensusStateNotAvailable } // ConsensusState always returns error as there is no consensus state in rollmint. -func (c *Client) ConsensusState(ctx context.Context) (*ctypes.ResultConsensusState, error) { +func (c *FullClient) ConsensusState(ctx context.Context) (*ctypes.ResultConsensusState, error) { return nil, ErrConsensusStateNotAvailable } // ConsensusParams returns consensus params at given height. // // Currently, consensus params changes are not supported and this method returns params as defined in genesis. -func (c *Client) ConsensusParams(ctx context.Context, height *int64) (*ctypes.ResultConsensusParams, error) { +func (c *FullClient) ConsensusParams(ctx context.Context, height *int64) (*ctypes.ResultConsensusParams, error) { // TODO(tzdybal): implement consensus params handling: https://github.com/celestiaorg/rollmint/issues/291 params := c.node.GetGenesis().ConsensusParams return &ctypes.ResultConsensusParams{ @@ -403,14 +406,14 @@ func (c *Client) ConsensusParams(ctx context.Context, height *int64) (*ctypes.Re } // Health endpoint returns empty value. It can be used to monitor service availability. -func (c *Client) Health(ctx context.Context) (*ctypes.ResultHealth, error) { +func (c *FullClient) Health(ctx context.Context) (*ctypes.ResultHealth, error) { return &ctypes.ResultHealth{}, nil } // Block method returns BlockID and block itself for given height. // // If height is nil, it returns information about last known block. -func (c *Client) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) { +func (c *FullClient) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) { heightValue := c.normalizeHeight(height) block, err := c.node.Store.LoadBlock(heightValue) if err != nil { @@ -434,7 +437,7 @@ func (c *Client) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, } // BlockByHash returns BlockID and block itself for given hash. -func (c *Client) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlock, error) { +func (c *FullClient) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlock, error) { var h [32]byte copy(h[:], hash) @@ -460,7 +463,7 @@ func (c *Client) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBl } // BlockResults returns information about transactions, events and updates of validator set and consensus params. -func (c *Client) BlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) { +func (c *FullClient) BlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) { var h uint64 if height == nil { h = c.node.Store.Height() @@ -483,7 +486,7 @@ func (c *Client) BlockResults(ctx context.Context, height *int64) (*ctypes.Resul } // Commit returns signed header (aka commit) at given height. -func (c *Client) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) { +func (c *FullClient) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) { heightValue := c.normalizeHeight(height) com, err := c.node.Store.LoadCommit(heightValue) if err != nil { @@ -503,7 +506,7 @@ func (c *Client) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommi } // Validators returns paginated list of validators at given height. -func (c *Client) Validators(ctx context.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*ctypes.ResultValidators, error) { +func (c *FullClient) Validators(ctx context.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*ctypes.ResultValidators, error) { height := c.normalizeHeight(heightPtr) validators, err := c.node.Store.LoadValidators(height) if err != nil { @@ -528,7 +531,7 @@ func (c *Client) Validators(ctx context.Context, heightPtr *int64, pagePtr, perP } // Tx returns detailed information about transaction identified by its hash. -func (c *Client) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { +func (c *FullClient) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { res, err := c.node.TxIndexer.Get(hash) if err != nil { return nil, err @@ -563,7 +566,7 @@ func (c *Client) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.Resul } // TxSearch returns detailed information about transactions matching query. -func (c *Client) TxSearch(ctx context.Context, query string, prove bool, pagePtr, perPagePtr *int, orderBy string) (*ctypes.ResultTxSearch, error) { +func (c *FullClient) TxSearch(ctx context.Context, query string, prove bool, pagePtr, perPagePtr *int, orderBy string) (*ctypes.ResultTxSearch, error) { q, err := tmquery.New(query) if err != nil { return nil, err @@ -631,7 +634,7 @@ func (c *Client) TxSearch(ctx context.Context, query string, prove bool, pagePtr // BlockSearch defines a method to search for a paginated set of blocks by // BeginBlock and EndBlock event search criteria. -func (c *Client) BlockSearch(ctx context.Context, query string, page, perPage *int, orderBy string) (*ctypes.ResultBlockSearch, error) { +func (c *FullClient) BlockSearch(ctx context.Context, query string, page, perPage *int, orderBy string) (*ctypes.ResultBlockSearch, error) { q, err := tmquery.New(query) if err != nil { return nil, err @@ -692,7 +695,7 @@ func (c *Client) BlockSearch(ctx context.Context, query string, page, perPage *i } // Status returns detailed information about current status of the node. -func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { +func (c *FullClient) Status(ctx context.Context) (*ctypes.ResultStatus, error) { latest, err := c.node.Store.LoadBlock(c.node.Store.Height()) if err != nil { return nil, fmt.Errorf("failed to find latest block: %w", err) @@ -755,14 +758,14 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { } // BroadcastEvidence is not yet implemented. -func (c *Client) BroadcastEvidence(ctx context.Context, evidence types.Evidence) (*ctypes.ResultBroadcastEvidence, error) { +func (c *FullClient) BroadcastEvidence(ctx context.Context, evidence types.Evidence) (*ctypes.ResultBroadcastEvidence, error) { return &ctypes.ResultBroadcastEvidence{ Hash: evidence.Hash(), }, nil } // NumUnconfirmedTxs returns information about transactions in mempool. -func (c *Client) NumUnconfirmedTxs(ctx context.Context) (*ctypes.ResultUnconfirmedTxs, error) { +func (c *FullClient) NumUnconfirmedTxs(ctx context.Context) (*ctypes.ResultUnconfirmedTxs, error) { return &ctypes.ResultUnconfirmedTxs{ Count: c.node.Mempool.Size(), Total: c.node.Mempool.Size(), @@ -772,7 +775,7 @@ func (c *Client) NumUnconfirmedTxs(ctx context.Context) (*ctypes.ResultUnconfirm } // UnconfirmedTxs returns transactions in mempool. -func (c *Client) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) { +func (c *FullClient) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) { // reuse per_page validator limit := validatePerPage(limitPtr) @@ -787,7 +790,7 @@ func (c *Client) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*ctypes.Res // CheckTx executes a new transaction against the application to determine its validity. // // If valid, the tx is automatically added to the mempool. -func (c *Client) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { +func (c *FullClient) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { res, err := c.appClient().CheckTxSync(abci.RequestCheckTx{Tx: tx}) if err != nil { return nil, err @@ -795,7 +798,7 @@ func (c *Client) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckT return &ctypes.ResultCheckTx{ResponseCheckTx: *res}, nil } -func (c *Client) eventsRoutine(sub types.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) { +func (c *FullClient) eventsRoutine(sub types.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) { for { select { case msg := <-sub.Out(): @@ -826,7 +829,7 @@ func (c *Client) eventsRoutine(sub types.Subscription, subscriber string, q tmpu } // Try to resubscribe with exponential backoff. -func (c *Client) resubscribe(subscriber string, q tmpubsub.Query) types.Subscription { +func (c *FullClient) resubscribe(subscriber string, q tmpubsub.Query) types.Subscription { attempts := 0 for { if !c.IsRunning() { @@ -843,11 +846,11 @@ func (c *Client) resubscribe(subscriber string, q tmpubsub.Query) types.Subscrip } } -func (c *Client) appClient() abcicli.Client { +func (c *FullClient) appClient() abcicli.Client { return c.node.AppClient() } -func (c *Client) normalizeHeight(height *int64) uint64 { +func (c *FullClient) normalizeHeight(height *int64) uint64 { var heightValue uint64 if height == nil { heightValue = c.node.Store.Height() diff --git a/node/client/client_test.go b/node/full_client_test.go similarity index 96% rename from node/client/client_test.go rename to node/full_client_test.go index 5667e4d417..06ae2f81f4 100644 --- a/node/client/client_test.go +++ b/node/full_client_test.go @@ -1,4 +1,4 @@ -package client +package node import ( "context" @@ -31,7 +31,6 @@ import ( "github.com/celestiaorg/rollmint/config" abciconv "github.com/celestiaorg/rollmint/conv/abci" "github.com/celestiaorg/rollmint/mocks" - "github.com/celestiaorg/rollmint/node" "github.com/celestiaorg/rollmint/types" ) @@ -91,7 +90,7 @@ func TestGenesisChunked(t *testing.T) { mockApp.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) privKey, _, _ := crypto.GenerateEd25519Key(cryptorand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(cryptorand.Reader) - n, _ := node.NewNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, abcicli.NewLocalClient(nil, mockApp), genDoc, log.TestingLogger()) + n, _ := NewNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, abcicli.NewLocalClient(nil, mockApp), genDoc, log.TestingLogger()) rpc := NewClient(n) @@ -404,7 +403,7 @@ func TestTx(t *testing.T) { mockApp.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - node, err := node.NewNode(context.Background(), config.NodeConfig{ + node, err := NewNode(context.Background(), config.NodeConfig{ DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{ @@ -661,7 +660,7 @@ func TestValidatorSetHandling(t *testing.T) { waitCh <- nil }) - node, err := node.NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) + node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) require.NoError(err) require.NotNil(node) @@ -765,7 +764,7 @@ func getRandomBytes(n int) []byte { return data } -func getBlockMeta(rpc *Client, n int64) *tmtypes.BlockMeta { +func getBlockMeta(rpc *FullClient, n int64) *tmtypes.BlockMeta { b, err := rpc.node.Store.LoadBlock(uint64(n)) if err != nil { return nil @@ -778,14 +777,14 @@ func getBlockMeta(rpc *Client, n int64) *tmtypes.BlockMeta { return bmeta } -func getRPC(t *testing.T) (*mocks.Application, *Client) { +func getRPC(t *testing.T) (*mocks.Application, *FullClient) { t.Helper() require := require.New(t) app := &mocks.Application{} app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - node, err := node.NewNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) @@ -796,7 +795,7 @@ func getRPC(t *testing.T) (*mocks.Application, *Client) { } // From state/indexer/block/kv/kv_test -func indexBlocks(t *testing.T, rpc *Client, heights []int64) { +func indexBlocks(t *testing.T, rpc *FullClient, heights []int64) { t.Helper() for _, h := range heights { @@ -850,7 +849,7 @@ func TestMempool2Nodes(t *testing.T) { id1, err := peer.IDFromPrivateKey(key1) require.NoError(err) - node1, err := node.NewNode(context.Background(), config.NodeConfig{ + node1, err := NewNode(context.Background(), config.NodeConfig{ DALayer: "mock", P2P: config.P2PConfig{ ListenAddress: "/ip4/127.0.0.1/tcp/9001", @@ -859,7 +858,7 @@ func TestMempool2Nodes(t *testing.T) { require.NoError(err) require.NotNil(node1) - node2, err := node.NewNode(context.Background(), config.NodeConfig{ + node2, err := NewNode(context.Background(), config.NodeConfig{ DALayer: "mock", P2P: config.P2PConfig{ ListenAddress: "/ip4/127.0.0.1/tcp/9002", @@ -934,7 +933,7 @@ func TestStatus(t *testing.T) { } } - node, err := node.NewNode( + node, err := NewNode( context.Background(), config.NodeConfig{ DALayer: "mock", diff --git a/node/integration_test.go b/node/integration_test.go index 34fa583047..da734e2f2f 100644 --- a/node/integration_test.go +++ b/node/integration_test.go @@ -191,7 +191,7 @@ func TestFraudProofTrigger(t *testing.T) { } // Creates a starts the given number of client nodes along with an aggregator node. Uses the given flag to decide whether to have the aggregator produce malicious blocks. -func createAndStartNodes(clientNodes int, isMalicious bool, t *testing.T) ([]*Node, []*mocks.Application) { +func createAndStartNodes(clientNodes int, isMalicious bool, t *testing.T) ([]*FullNode, []*mocks.Application) { var wg sync.WaitGroup aggCtx, aggCancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background()) @@ -209,7 +209,7 @@ func createAndStartNodes(clientNodes int, isMalicious bool, t *testing.T) ([]*No // Starts the given nodes using the given wait group to synchronize them // and wait for them to gossip transactions -func startNodes(nodes []*Node, wg *sync.WaitGroup, t *testing.T) { +func startNodes(nodes []*FullNode, wg *sync.WaitGroup, t *testing.T) { numNodes := len(nodes) wg.Add((numNodes) * (numNodes - 1)) for _, n := range nodes { @@ -238,7 +238,7 @@ func startNodes(nodes []*Node, wg *sync.WaitGroup, t *testing.T) { } // Creates the given number of nodes the given nodes using the given wait group to synchornize them -func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, wg *sync.WaitGroup, t *testing.T) ([]*Node, []*mocks.Application) { +func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, wg *sync.WaitGroup, t *testing.T) ([]*FullNode, []*mocks.Application) { t.Helper() if aggCtx == nil { @@ -254,7 +254,7 @@ func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, wg *syn keys[i], _, _ = crypto.GenerateEd25519Key(rand.Reader) } - nodes := make([]*Node, num) + nodes := make([]*FullNode, num) apps := make([]*mocks.Application, num) dalc := &mockda.DataAvailabilityLayerClient{} _ = dalc.Init([8]byte{}, nil, store.NewDefaultInMemoryKVStore(), log.TestingLogger()) @@ -267,7 +267,7 @@ func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, wg *syn return nodes, apps } -func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, dalc da.DataAvailabilityLayerClient, keys []crypto.PrivKey, wg *sync.WaitGroup, t *testing.T) (*Node, *mocks.Application) { +func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, dalc da.DataAvailabilityLayerClient, keys []crypto.PrivKey, wg *sync.WaitGroup, t *testing.T) (*FullNode, *mocks.Application) { t.Helper() require := require.New(t) // nodes will listen on consecutive ports on local interface diff --git a/node/node.go b/node/node.go new file mode 100644 index 0000000000..f66fe8c7cf --- /dev/null +++ b/node/node.go @@ -0,0 +1,10 @@ +package node + +import ( + rpcclient "github.com/tendermint/tendermint/rpc/client" +) + +type Node interface { + Start() error + GetClient() rpcclient.Client +} diff --git a/rpc/json/service_test.go b/rpc/json/service_test.go index 2be2a3bf94..dd3a4211e8 100644 --- a/rpc/json/service_test.go +++ b/rpc/json/service_test.go @@ -27,7 +27,6 @@ import ( "github.com/celestiaorg/rollmint/config" "github.com/celestiaorg/rollmint/mocks" "github.com/celestiaorg/rollmint/node" - "github.com/celestiaorg/rollmint/rpc/client" ) func TestHandlerMapping(t *testing.T) { @@ -269,7 +268,7 @@ func TestSubscription(t *testing.T) { } // copied from rpc -func getRPC(t *testing.T) (*mocks.Application, *client.Client) { +func getRPC(t *testing.T) (*mocks.Application, *node.FullClient) { t.Helper() require := require.New(t) app := &mocks.Application{} @@ -292,14 +291,14 @@ func getRPC(t *testing.T) (*mocks.Application, *client.Client) { }) key, _, _ := crypto.GenerateEd25519Key(rand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - node, err := node.NewNode(context.Background(), config.NodeConfig{Aggregator: true, DALayer: "mock", BlockManagerConfig: config.BlockManagerConfig{BlockTime: 1 * time.Second}}, key, signingKey, abciclient.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + n, err := node.NewNode(context.Background(), config.NodeConfig{Aggregator: true, DALayer: "mock", BlockManagerConfig: config.BlockManagerConfig{BlockTime: 1 * time.Second}}, key, signingKey, abciclient.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) - require.NotNil(node) + require.NotNil(n) - err = node.Start() + err = n.Start() require.NoError(err) - local := client.NewClient(node) + local := node.NewClient(n) require.NotNil(local) return app, local diff --git a/rpc/server.go b/rpc/server.go index 517f027e69..b7d611b5e4 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -16,7 +16,6 @@ import ( "golang.org/x/net/netutil" "github.com/celestiaorg/rollmint/node" - "github.com/celestiaorg/rollmint/rpc/client" "github.com/celestiaorg/rollmint/rpc/json" ) @@ -31,10 +30,10 @@ type Server struct { } // NewServer creates new instance of Server with given configuration. -func NewServer(node *node.Node, config *config.RPCConfig, logger log.Logger) *Server { +func NewServer(node *node.FullNode, config *config.RPCConfig, logger log.Logger) *Server { srv := &Server{ config: config, - client: client.NewClient(node), + client: node.GetClient(), } srv.BaseService = service.NewBaseService(logger, "RPC", srv) return srv From f808f8443639724a83c84e98eef157aae4a3cca0 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Tue, 10 Jan 2023 10:49:48 -0500 Subject: [PATCH 07/22] simplified service Subscribe --- rpc/json/service.go | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/rpc/json/service.go b/rpc/json/service.go index 75bee5b581..a8840f57af 100644 --- a/rpc/json/service.go +++ b/rpc/json/service.go @@ -84,34 +84,32 @@ func newService(c rpcclient.Client, l log.Logger) *service { } func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsConn) (*ctypes.ResultSubscribe, error) { - addr := req.RemoteAddr - // TODO(tzdybal): pass config and check subscriptions limits - - s.logger.Debug("subscribe to query", "remote", addr, "query", args.Query) - // TODO(tzdybal): extract consts or configs const SubscribeTimeout = 5 * time.Second const subBufferSize = 100 - eventCh, err := s.client.Subscribe(req.Context(), addr, args.Query, subBufferSize) + addr := req.RemoteAddr + query := args.Query + + ctx, cancel := context.WithTimeout(req.Context(), SubscribeTimeout) + defer cancel() + + sub, err := s.client.Subscribe(ctx, addr, query, subBufferSize) if err != nil { return nil, fmt.Errorf("failed to subscribe: %w", err) } + go func() { - for { - select { - case msg := <-eventCh: - data, err := json.Marshal(msg.Data) - if err != nil { - s.logger.Error("failed to marshal response data", "error", err) - continue - } - if wsConn != nil { - wsConn.queue <- data - } + for msg := range sub { + data, err := json.Marshal(msg.Data) + if err != nil { + s.logger.Error("failed to marshal response data", "error", err) + continue + } + if wsConn != nil { + wsConn.queue <- data } - } }() From a511c26c89016611aad8ee68e45d2f9bdd177d5b Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Tue, 10 Jan 2023 11:25:09 -0500 Subject: [PATCH 08/22] moved full node files into node/full --- node/{ => full}/full.go | 2 +- node/{ => full}/full_client.go | 2 +- node/{ => full}/full_client_test.go | 2 +- node/{ => full}/integration_test.go | 2 +- node/{ => full}/mempool.go | 2 +- node/{ => full}/node_test.go | 2 +- rpc/json/service_test.go | 9 +++++---- rpc/server.go | 2 +- 8 files changed, 12 insertions(+), 11 deletions(-) rename node/{ => full}/full.go (99%) rename node/{ => full}/full_client.go (99%) rename node/{ => full}/full_client_test.go (99%) rename node/{ => full}/integration_test.go (99%) rename node/{ => full}/mempool.go (99%) rename node/{ => full}/node_test.go (99%) diff --git a/node/full.go b/node/full/full.go similarity index 99% rename from node/full.go rename to node/full/full.go index 540df00943..dd10006f4d 100644 --- a/node/full.go +++ b/node/full/full.go @@ -1,4 +1,4 @@ -package node +package full import ( "context" diff --git a/node/full_client.go b/node/full/full_client.go similarity index 99% rename from node/full_client.go rename to node/full/full_client.go index 45e3b07ec9..040ec88645 100644 --- a/node/full_client.go +++ b/node/full/full_client.go @@ -1,4 +1,4 @@ -package node +package full import ( "context" diff --git a/node/full_client_test.go b/node/full/full_client_test.go similarity index 99% rename from node/full_client_test.go rename to node/full/full_client_test.go index 06ae2f81f4..700c1e879d 100644 --- a/node/full_client_test.go +++ b/node/full/full_client_test.go @@ -1,4 +1,4 @@ -package node +package full import ( "context" diff --git a/node/integration_test.go b/node/full/integration_test.go similarity index 99% rename from node/integration_test.go rename to node/full/integration_test.go index da734e2f2f..4103324971 100644 --- a/node/integration_test.go +++ b/node/full/integration_test.go @@ -1,4 +1,4 @@ -package node +package full import ( "context" diff --git a/node/mempool.go b/node/full/mempool.go similarity index 99% rename from node/mempool.go rename to node/full/mempool.go index 9f4be446a2..a903a5dbdd 100644 --- a/node/mempool.go +++ b/node/full/mempool.go @@ -1,4 +1,4 @@ -package node +package full import ( "fmt" diff --git a/node/node_test.go b/node/full/node_test.go similarity index 99% rename from node/node_test.go rename to node/full/node_test.go index d6877945f6..9a0382e912 100644 --- a/node/node_test.go +++ b/node/full/node_test.go @@ -1,4 +1,4 @@ -package node +package full import ( "context" diff --git a/rpc/json/service_test.go b/rpc/json/service_test.go index dd3a4211e8..026b9c4fef 100644 --- a/rpc/json/service_test.go +++ b/rpc/json/service_test.go @@ -22,11 +22,12 @@ import ( abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" + rpcclient "github.com/tendermint/tendermint/rpc/client" "github.com/tendermint/tendermint/types" "github.com/celestiaorg/rollmint/config" "github.com/celestiaorg/rollmint/mocks" - "github.com/celestiaorg/rollmint/node" + "github.com/celestiaorg/rollmint/node/full" ) func TestHandlerMapping(t *testing.T) { @@ -268,7 +269,7 @@ func TestSubscription(t *testing.T) { } // copied from rpc -func getRPC(t *testing.T) (*mocks.Application, *node.FullClient) { +func getRPC(t *testing.T) (*mocks.Application, rpcclient.Client) { t.Helper() require := require.New(t) app := &mocks.Application{} @@ -291,14 +292,14 @@ func getRPC(t *testing.T) (*mocks.Application, *node.FullClient) { }) key, _, _ := crypto.GenerateEd25519Key(rand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - n, err := node.NewNode(context.Background(), config.NodeConfig{Aggregator: true, DALayer: "mock", BlockManagerConfig: config.BlockManagerConfig{BlockTime: 1 * time.Second}}, key, signingKey, abciclient.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + n, err := full.NewNode(context.Background(), config.NodeConfig{Aggregator: true, DALayer: "mock", BlockManagerConfig: config.BlockManagerConfig{BlockTime: 1 * time.Second}}, key, signingKey, abciclient.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(n) err = n.Start() require.NoError(err) - local := node.NewClient(n) + local := n.GetClient() require.NotNil(local) return app, local diff --git a/rpc/server.go b/rpc/server.go index b7d611b5e4..a74f12cbec 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -30,7 +30,7 @@ type Server struct { } // NewServer creates new instance of Server with given configuration. -func NewServer(node *node.FullNode, config *config.RPCConfig, logger log.Logger) *Server { +func NewServer(node node.Node, config *config.RPCConfig, logger log.Logger) *Server { srv := &Server{ config: config, client: node.GetClient(), From db722c65f7c9850936d2810c8185bb9f4f1c7781 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Tue, 10 Jan 2023 12:25:42 -0500 Subject: [PATCH 09/22] renamed full to node --- node/full/{full_client.go => client.go} | 0 node/full/{full_client_test.go => client_test.go} | 0 node/full/{full.go => node.go} | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename node/full/{full_client.go => client.go} (100%) rename node/full/{full_client_test.go => client_test.go} (100%) rename node/full/{full.go => node.go} (100%) diff --git a/node/full/full_client.go b/node/full/client.go similarity index 100% rename from node/full/full_client.go rename to node/full/client.go diff --git a/node/full/full_client_test.go b/node/full/client_test.go similarity index 100% rename from node/full/full_client_test.go rename to node/full/client_test.go diff --git a/node/full/full.go b/node/full/node.go similarity index 100% rename from node/full/full.go rename to node/full/node.go From 69bef93ad205b503b5a067078c0a53bde0520196 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Tue, 10 Jan 2023 20:45:00 -0500 Subject: [PATCH 10/22] NewNode calls NewFullNode based on config --- config/config.go | 4 ++++ config/defaults.go | 1 + node/full/client_test.go | 14 +++++++------- node/full/integration_test.go | 4 ++-- node/full/node.go | 2 +- node/full/node_test.go | 4 ++-- node/node.go | 34 ++++++++++++++++++++++++++++++++++ rpc/json/service_test.go | 4 ++-- 8 files changed, 53 insertions(+), 14 deletions(-) diff --git a/config/config.go b/config/config.go index 8d1e43aac8..1b9b120996 100644 --- a/config/config.go +++ b/config/config.go @@ -19,6 +19,7 @@ const ( flagDAStartHeight = "rollmint.da_start_height" flagNamespaceID = "rollmint.namespace_id" flagFraudProofs = "rollmint.experimental_insecure_fraud_proofs" + flagLight = "rollmint.light" ) // NodeConfig stores rollmint node configuration. @@ -33,6 +34,7 @@ type NodeConfig struct { BlockManagerConfig `mapstructure:",squash"` DALayer string `mapstructure:"da_layer"` DAConfig string `mapstructure:"da_config"` + Light bool `mapstructure:"light"` } // BlockManagerConfig consists of all parameters required by BlockManagerConfig @@ -59,6 +61,7 @@ func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error { nc.BlockTime = v.GetDuration(flagBlockTime) nsID := v.GetString(flagNamespaceID) nc.FraudProofs = v.GetBool(flagFraudProofs) + nc.Light = v.GetBool(flagLight) bytes, err := hex.DecodeString(nsID) if err != nil { return err @@ -80,4 +83,5 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Uint64(flagDAStartHeight, def.DAStartHeight, "starting DA block height (for syncing)") cmd.Flags().BytesHex(flagNamespaceID, def.NamespaceID[:], "namespace identifies (8 bytes in hex)") cmd.Flags().Bool(flagFraudProofs, def.FraudProofs, "enable fraud proofs (experimental & insecure)") + cmd.Flags().Bool(flagLight, def.Light, "run light client") } diff --git a/config/defaults.go b/config/defaults.go index 6a12576498..37af3aac62 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -26,4 +26,5 @@ var DefaultNodeConfig = NodeConfig{ }, DALayer: "mock", DAConfig: "", + Light: false, } diff --git a/node/full/client_test.go b/node/full/client_test.go index 700c1e879d..8df04d72f5 100644 --- a/node/full/client_test.go +++ b/node/full/client_test.go @@ -90,7 +90,7 @@ func TestGenesisChunked(t *testing.T) { mockApp.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) privKey, _, _ := crypto.GenerateEd25519Key(cryptorand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(cryptorand.Reader) - n, _ := NewNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, abcicli.NewLocalClient(nil, mockApp), genDoc, log.TestingLogger()) + n, _ := NewFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, abcicli.NewLocalClient(nil, mockApp), genDoc, log.TestingLogger()) rpc := NewClient(n) @@ -403,7 +403,7 @@ func TestTx(t *testing.T) { mockApp.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - node, err := NewNode(context.Background(), config.NodeConfig{ + node, err := NewFullNode(context.Background(), config.NodeConfig{ DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{ @@ -660,7 +660,7 @@ func TestValidatorSetHandling(t *testing.T) { waitCh <- nil }) - node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) + node, err := NewFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) require.NoError(err) require.NotNil(node) @@ -784,7 +784,7 @@ func getRPC(t *testing.T) (*mocks.Application, *FullClient) { app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := NewFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) @@ -849,7 +849,7 @@ func TestMempool2Nodes(t *testing.T) { id1, err := peer.IDFromPrivateKey(key1) require.NoError(err) - node1, err := NewNode(context.Background(), config.NodeConfig{ + node1, err := NewFullNode(context.Background(), config.NodeConfig{ DALayer: "mock", P2P: config.P2PConfig{ ListenAddress: "/ip4/127.0.0.1/tcp/9001", @@ -858,7 +858,7 @@ func TestMempool2Nodes(t *testing.T) { require.NoError(err) require.NotNil(node1) - node2, err := NewNode(context.Background(), config.NodeConfig{ + node2, err := NewFullNode(context.Background(), config.NodeConfig{ DALayer: "mock", P2P: config.P2PConfig{ ListenAddress: "/ip4/127.0.0.1/tcp/9002", @@ -933,7 +933,7 @@ func TestStatus(t *testing.T) { } } - node, err := NewNode( + node, err := NewFullNode( context.Background(), config.NodeConfig{ DALayer: "mock", diff --git a/node/full/integration_test.go b/node/full/integration_test.go index 4103324971..3c3750f8e2 100644 --- a/node/full/integration_test.go +++ b/node/full/integration_test.go @@ -52,7 +52,7 @@ func TestAggregatorMode(t *testing.T) { BlockTime: 1 * time.Second, NamespaceID: rmtypes.NamespaceID{1, 2, 3, 4, 5, 6, 7, 8}, } - node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := NewFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) @@ -317,7 +317,7 @@ func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, d } signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - node, err := NewNode( + node, err := NewFullNode( ctx, config.NodeConfig{ P2P: p2pConfig, diff --git a/node/full/node.go b/node/full/node.go index dd10006f4d..136412c7ec 100644 --- a/node/full/node.go +++ b/node/full/node.go @@ -79,7 +79,7 @@ type FullNode struct { } // NewNode creates new rollmint node. -func NewNode( +func NewFullNode( ctx context.Context, conf config.NodeConfig, p2pKey crypto.PrivKey, diff --git a/node/full/node_test.go b/node/full/node_test.go index 9a0382e912..8740dc4b9d 100644 --- a/node/full/node_test.go +++ b/node/full/node_test.go @@ -31,7 +31,7 @@ func TestStartup(t *testing.T) { app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(rand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := NewFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) @@ -57,7 +57,7 @@ func TestMempoolDirectly(t *testing.T) { signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) anotherKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := NewFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) diff --git a/node/node.go b/node/node.go index f66fe8c7cf..cf33ae01cf 100644 --- a/node/node.go +++ b/node/node.go @@ -1,10 +1,44 @@ package node import ( + "context" + + "github.com/libp2p/go-libp2p/core/crypto" + + abciclient "github.com/tendermint/tendermint/abci/client" + "github.com/tendermint/tendermint/libs/log" rpcclient "github.com/tendermint/tendermint/rpc/client" + tmtypes "github.com/tendermint/tendermint/types" + + "github.com/celestiaorg/rollmint/config" + "github.com/celestiaorg/rollmint/node/full" ) type Node interface { Start() error GetClient() rpcclient.Client } + +func NewNode( + ctx context.Context, + conf config.NodeConfig, + p2pKey crypto.PrivKey, + signingKey crypto.PrivKey, + appClient abciclient.Client, + genesis *tmtypes.GenesisDoc, + logger log.Logger, +) (Node, error) { + if conf.Light { + return full.NewFullNode( + ctx, + conf, + p2pKey, + signingKey, + appClient, + genesis, + logger, + ) + } else { + panic("Light node not implemented") + } +} diff --git a/rpc/json/service_test.go b/rpc/json/service_test.go index 026b9c4fef..44a093baa5 100644 --- a/rpc/json/service_test.go +++ b/rpc/json/service_test.go @@ -27,7 +27,7 @@ import ( "github.com/celestiaorg/rollmint/config" "github.com/celestiaorg/rollmint/mocks" - "github.com/celestiaorg/rollmint/node/full" + "github.com/celestiaorg/rollmint/node" ) func TestHandlerMapping(t *testing.T) { @@ -292,7 +292,7 @@ func getRPC(t *testing.T) (*mocks.Application, rpcclient.Client) { }) key, _, _ := crypto.GenerateEd25519Key(rand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - n, err := full.NewNode(context.Background(), config.NodeConfig{Aggregator: true, DALayer: "mock", BlockManagerConfig: config.BlockManagerConfig{BlockTime: 1 * time.Second}}, key, signingKey, abciclient.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + n, err := node.NewNode(context.Background(), config.NodeConfig{Aggregator: true, DALayer: "mock", BlockManagerConfig: config.BlockManagerConfig{BlockTime: 1 * time.Second}, Light: false}, key, signingKey, abciclient.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(n) From f96f4cbb5e2f100b84ef8a7ff91d56789767060b Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Wed, 11 Jan 2023 13:49:38 -0500 Subject: [PATCH 11/22] fix bug in NewNode and integrate changes to the data store --- node/full/client_test.go | 5 ++--- node/full/node.go | 40 ++++++++++++++++++++++++++-------------- node/light/node.go | 9 +++++++++ node/node.go | 4 +--- 4 files changed, 38 insertions(+), 20 deletions(-) create mode 100644 node/light/node.go diff --git a/node/full/client_test.go b/node/full/client_test.go index 8df04d72f5..debbf02bb6 100644 --- a/node/full/client_test.go +++ b/node/full/client_test.go @@ -3,7 +3,6 @@ package full import ( "context" crand "crypto/rand" - cryptorand "crypto/rand" "fmt" "math/rand" "testing" @@ -88,8 +87,8 @@ func TestGenesisChunked(t *testing.T) { mockApp := &mocks.Application{} mockApp.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) - privKey, _, _ := crypto.GenerateEd25519Key(cryptorand.Reader) - signingKey, _, _ := crypto.GenerateEd25519Key(cryptorand.Reader) + privKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) + signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) n, _ := NewFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, abcicli.NewLocalClient(nil, mockApp), genDoc, log.TestingLogger()) rpc := NewClient(n) diff --git a/node/full/node.go b/node/full/node.go index 136412c7ec..aff53105b6 100644 --- a/node/full/node.go +++ b/node/full/node.go @@ -7,6 +7,8 @@ import ( "errors" "fmt" + ds "github.com/ipfs/go-datastore" + ktds "github.com/ipfs/go-datastore/keytransform" "github.com/libp2p/go-libp2p/core/crypto" "go.uber.org/multierr" @@ -35,9 +37,9 @@ import ( // prefixes used in KV store to separate main node data from DALC data var ( - mainPrefix = []byte{0} - dalcPrefix = []byte{1} - indexerPrefix = []byte{2} + mainPrefix = "0" + dalcPrefix = "1" + indexerPrefix = "2" // indexPrefix uses "i", so using "0-2" to avoid clash ) const ( @@ -99,18 +101,23 @@ func NewFullNode( return nil, err } - var baseKV store.KVStore + var baseKV ds.TxnDatastore + if conf.RootDir == "" && conf.DBPath == "" { // this is used for testing logger.Info("WARNING: working in in-memory mode") - baseKV = store.NewDefaultInMemoryKVStore() + baseKV, err = store.NewDefaultInMemoryKVStore() } else { - baseKV = store.NewDefaultKVStore(conf.RootDir, conf.DBPath, "rollmint") + baseKV, err = store.NewDefaultKVStore(conf.RootDir, conf.DBPath, "rollmint") + } + if err != nil { + return nil, err } - mainKV := store.NewPrefixKV(baseKV, mainPrefix) - dalcKV := store.NewPrefixKV(baseKV, dalcPrefix) - indexerKV := store.NewPrefixKV(baseKV, indexerPrefix) - s := store.New(mainKV) + mainKV := newPrefixKV(baseKV, mainPrefix) + dalcKV := newPrefixKV(baseKV, dalcPrefix) + indexerKV := newPrefixKV(baseKV, indexerPrefix) + + s := store.New(ctx, mainKV) dalc := registry.GetClient(conf.DALayer) if dalc == nil { @@ -121,7 +128,7 @@ func NewFullNode( return nil, fmt.Errorf("data availability layer client initialization error: %w", err) } - indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(conf, indexerKV, eventBus, logger) + indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(ctx, conf, indexerKV, eventBus, logger) if err != nil { return nil, err } @@ -367,9 +374,14 @@ func (n *FullNode) newFraudProofValidator() p2p.GossipValidator { } } +func newPrefixKV(kvStore ds.Datastore, prefix string) ds.TxnDatastore { + return (ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey(prefix)}).Children()[0]).(ds.TxnDatastore) +} + func createAndStartIndexerService( + ctx context.Context, conf config.NodeConfig, - kvStore store.KVStore, + kvStore ds.TxnDatastore, eventBus *tmtypes.EventBus, logger log.Logger, ) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) { @@ -379,8 +391,8 @@ func createAndStartIndexerService( blockIndexer indexer.BlockIndexer ) - txIndexer = kv.NewTxIndex(kvStore) - blockIndexer = blockidxkv.New(store.NewPrefixKV(kvStore, []byte("block_events"))) + txIndexer = kv.NewTxIndex(ctx, kvStore) + blockIndexer = blockidxkv.New(ctx, newPrefixKV(kvStore, "block_events")) indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) indexerService.SetLogger(logger.With("module", "txindex")) diff --git a/node/light/node.go b/node/light/node.go new file mode 100644 index 0000000000..7c7c1e1c24 --- /dev/null +++ b/node/light/node.go @@ -0,0 +1,9 @@ +package light + +import ( + "github.com/tendermint/tendermint/libs/service" +) + +type LightNode struct { + service.BaseService +} diff --git a/node/node.go b/node/node.go index 0117c37540..20e1bf9c69 100644 --- a/node/node.go +++ b/node/node.go @@ -3,8 +3,6 @@ package node import ( "context" - ds "github.com/ipfs/go-datastore" - ktds "github.com/ipfs/go-datastore/keytransform" "github.com/libp2p/go-libp2p/core/crypto" abciclient "github.com/tendermint/tendermint/abci/client" @@ -30,7 +28,7 @@ func NewNode( genesis *tmtypes.GenesisDoc, logger log.Logger, ) (Node, error) { - if conf.Light { + if !conf.Light { return full.NewFullNode( ctx, conf, From fa5f62fff5a8db655ec4391a803ef58d3a83d6a3 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Thu, 12 Jan 2023 15:33:46 -0500 Subject: [PATCH 12/22] change structure of node to make newFullNode private --- node/{full => }/client_test.go | 16 ++++++++-------- node/{full/node.go => full.go} | 4 ++-- node/{full/client.go => full_client.go} | 2 +- ...ion_test.go => full_node_integration_test.go} | 6 +++--- node/{full/node_test.go => full_node_test.go} | 6 +++--- node/{light/node.go => light.go} | 2 +- node/{full => }/mempool.go | 2 +- node/node.go | 3 +-- 8 files changed, 20 insertions(+), 21 deletions(-) rename node/{full => }/client_test.go (98%) rename node/{full/node.go => full.go} (99%) rename node/{full/client.go => full_client.go} (99%) rename node/{full/integration_test.go => full_node_integration_test.go} (99%) rename node/{full/node_test.go => full_node_test.go} (95%) rename node/{light/node.go => light.go} (88%) rename node/{full => }/mempool.go (99%) diff --git a/node/full/client_test.go b/node/client_test.go similarity index 98% rename from node/full/client_test.go rename to node/client_test.go index debbf02bb6..a9f1ba59bd 100644 --- a/node/full/client_test.go +++ b/node/client_test.go @@ -1,4 +1,4 @@ -package full +package node import ( "context" @@ -89,7 +89,7 @@ func TestGenesisChunked(t *testing.T) { mockApp.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) privKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - n, _ := NewFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, abcicli.NewLocalClient(nil, mockApp), genDoc, log.TestingLogger()) + n, _ := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, abcicli.NewLocalClient(nil, mockApp), genDoc, log.TestingLogger()) rpc := NewClient(n) @@ -402,7 +402,7 @@ func TestTx(t *testing.T) { mockApp.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - node, err := NewFullNode(context.Background(), config.NodeConfig{ + node, err := newFullNode(context.Background(), config.NodeConfig{ DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{ @@ -659,7 +659,7 @@ func TestValidatorSetHandling(t *testing.T) { waitCh <- nil }) - node, err := NewFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) + node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) require.NoError(err) require.NotNil(node) @@ -783,7 +783,7 @@ func getRPC(t *testing.T) (*mocks.Application, *FullClient) { app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - node, err := NewFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) @@ -848,7 +848,7 @@ func TestMempool2Nodes(t *testing.T) { id1, err := peer.IDFromPrivateKey(key1) require.NoError(err) - node1, err := NewFullNode(context.Background(), config.NodeConfig{ + node1, err := newFullNode(context.Background(), config.NodeConfig{ DALayer: "mock", P2P: config.P2PConfig{ ListenAddress: "/ip4/127.0.0.1/tcp/9001", @@ -857,7 +857,7 @@ func TestMempool2Nodes(t *testing.T) { require.NoError(err) require.NotNil(node1) - node2, err := NewFullNode(context.Background(), config.NodeConfig{ + node2, err := newFullNode(context.Background(), config.NodeConfig{ DALayer: "mock", P2P: config.P2PConfig{ ListenAddress: "/ip4/127.0.0.1/tcp/9002", @@ -932,7 +932,7 @@ func TestStatus(t *testing.T) { } } - node, err := NewFullNode( + node, err := newFullNode( context.Background(), config.NodeConfig{ DALayer: "mock", diff --git a/node/full/node.go b/node/full.go similarity index 99% rename from node/full/node.go rename to node/full.go index aff53105b6..95439197af 100644 --- a/node/full/node.go +++ b/node/full.go @@ -1,4 +1,4 @@ -package full +package node import ( "context" @@ -81,7 +81,7 @@ type FullNode struct { } // NewNode creates new rollmint node. -func NewFullNode( +func newFullNode( ctx context.Context, conf config.NodeConfig, p2pKey crypto.PrivKey, diff --git a/node/full/client.go b/node/full_client.go similarity index 99% rename from node/full/client.go rename to node/full_client.go index 040ec88645..45e3b07ec9 100644 --- a/node/full/client.go +++ b/node/full_client.go @@ -1,4 +1,4 @@ -package full +package node import ( "context" diff --git a/node/full/integration_test.go b/node/full_node_integration_test.go similarity index 99% rename from node/full/integration_test.go rename to node/full_node_integration_test.go index b9a0a42b1e..883c3254c1 100644 --- a/node/full/integration_test.go +++ b/node/full_node_integration_test.go @@ -1,4 +1,4 @@ -package full +package node import ( "context" @@ -52,7 +52,7 @@ func TestAggregatorMode(t *testing.T) { BlockTime: 1 * time.Second, NamespaceID: rmtypes.NamespaceID{1, 2, 3, 4, 5, 6, 7, 8}, } - node, err := NewFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) @@ -318,7 +318,7 @@ func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, d } signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - node, err := NewFullNode( + node, err := newFullNode( ctx, config.NodeConfig{ P2P: p2pConfig, diff --git a/node/full/node_test.go b/node/full_node_test.go similarity index 95% rename from node/full/node_test.go rename to node/full_node_test.go index 8740dc4b9d..8121225579 100644 --- a/node/full/node_test.go +++ b/node/full_node_test.go @@ -1,4 +1,4 @@ -package full +package node import ( "context" @@ -31,7 +31,7 @@ func TestStartup(t *testing.T) { app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(rand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - node, err := NewFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) @@ -57,7 +57,7 @@ func TestMempoolDirectly(t *testing.T) { signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) anotherKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) - node, err := NewFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) diff --git a/node/light/node.go b/node/light.go similarity index 88% rename from node/light/node.go rename to node/light.go index 7c7c1e1c24..c1d911c8f1 100644 --- a/node/light/node.go +++ b/node/light.go @@ -1,4 +1,4 @@ -package light +package node import ( "github.com/tendermint/tendermint/libs/service" diff --git a/node/full/mempool.go b/node/mempool.go similarity index 99% rename from node/full/mempool.go rename to node/mempool.go index a903a5dbdd..9f4be446a2 100644 --- a/node/full/mempool.go +++ b/node/mempool.go @@ -1,4 +1,4 @@ -package full +package node import ( "fmt" diff --git a/node/node.go b/node/node.go index 20e1bf9c69..a840e29597 100644 --- a/node/node.go +++ b/node/node.go @@ -11,7 +11,6 @@ import ( tmtypes "github.com/tendermint/tendermint/types" "github.com/celestiaorg/rollmint/config" - "github.com/celestiaorg/rollmint/node/full" ) type Node interface { @@ -29,7 +28,7 @@ func NewNode( logger log.Logger, ) (Node, error) { if !conf.Light { - return full.NewFullNode( + return newFullNode( ctx, conf, p2pKey, From 735650e3ef3348c43191971881fc36d5a0ae4e72 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Thu, 12 Jan 2023 15:42:51 -0500 Subject: [PATCH 13/22] made client_test into full_client_test, but this should probably change --- node/{client_test.go => full_client_test.go} | 21 +++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) rename node/{client_test.go => full_client_test.go} (97%) diff --git a/node/client_test.go b/node/full_client_test.go similarity index 97% rename from node/client_test.go rename to node/full_client_test.go index a9f1ba59bd..291c1d4449 100644 --- a/node/client_test.go +++ b/node/full_client_test.go @@ -24,6 +24,7 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + rpcclient "github.com/tendermint/tendermint/rpc/client" tmtypes "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" @@ -402,19 +403,21 @@ func TestTx(t *testing.T) { mockApp.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - node, err := newFullNode(context.Background(), config.NodeConfig{ + node, err := NewNode(context.Background(), config.NodeConfig{ DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{ BlockTime: 200 * time.Millisecond, - }}, + }, + Light: false, + }, key, signingKey, abcicli.NewLocalClient(nil, mockApp), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) - rpc := NewClient(node) + rpc := node.GetClient() require.NotNil(rpc) mockApp.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) mockApp.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) @@ -424,7 +427,7 @@ func TestTx(t *testing.T) { mockApp.On("GetAppHash", mock.Anything).Return(abci.ResponseGetAppHash{}) mockApp.On("GenerateFraudProof", mock.Anything).Return(abci.ResponseGenerateFraudProof{}) - err = rpc.node.Start() + err = node.Start() require.NoError(err) tx1 := tmtypes.Tx("tx1") @@ -659,11 +662,11 @@ func TestValidatorSetHandling(t *testing.T) { waitCh <- nil }) - node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) + node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}, Light: false}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) require.NoError(err) require.NotNil(node) - rpc := NewClient(node) + rpc := node.GetClient() require.NotNil(rpc) err = node.Start() @@ -776,18 +779,18 @@ func getBlockMeta(rpc *FullClient, n int64) *tmtypes.BlockMeta { return bmeta } -func getRPC(t *testing.T) (*mocks.Application, *FullClient) { +func getRPC(t *testing.T) (*mocks.Application, rpcclient.Client) { t.Helper() require := require.New(t) app := &mocks.Application{} app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Light: false}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) - rpc := NewClient(node) + rpc := node.GetClient() require.NotNil(rpc) return app, rpc From 607c44a89e34de92abbfcc2a46d3a9032703b9b2 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Thu, 12 Jan 2023 16:39:02 -0500 Subject: [PATCH 14/22] fix --- node/full_client_test.go | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/node/full_client_test.go b/node/full_client_test.go index 291c1d4449..a9f1ba59bd 100644 --- a/node/full_client_test.go +++ b/node/full_client_test.go @@ -24,7 +24,6 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" - rpcclient "github.com/tendermint/tendermint/rpc/client" tmtypes "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" @@ -403,21 +402,19 @@ func TestTx(t *testing.T) { mockApp.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - node, err := NewNode(context.Background(), config.NodeConfig{ + node, err := newFullNode(context.Background(), config.NodeConfig{ DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{ BlockTime: 200 * time.Millisecond, - }, - Light: false, - }, + }}, key, signingKey, abcicli.NewLocalClient(nil, mockApp), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) - rpc := node.GetClient() + rpc := NewClient(node) require.NotNil(rpc) mockApp.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) mockApp.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) @@ -427,7 +424,7 @@ func TestTx(t *testing.T) { mockApp.On("GetAppHash", mock.Anything).Return(abci.ResponseGetAppHash{}) mockApp.On("GenerateFraudProof", mock.Anything).Return(abci.ResponseGenerateFraudProof{}) - err = node.Start() + err = rpc.node.Start() require.NoError(err) tx1 := tmtypes.Tx("tx1") @@ -662,11 +659,11 @@ func TestValidatorSetHandling(t *testing.T) { waitCh <- nil }) - node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}, Light: false}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) + node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) require.NoError(err) require.NotNil(node) - rpc := node.GetClient() + rpc := NewClient(node) require.NotNil(rpc) err = node.Start() @@ -779,18 +776,18 @@ func getBlockMeta(rpc *FullClient, n int64) *tmtypes.BlockMeta { return bmeta } -func getRPC(t *testing.T) (*mocks.Application, rpcclient.Client) { +func getRPC(t *testing.T) (*mocks.Application, *FullClient) { t.Helper() require := require.New(t) app := &mocks.Application{} app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) key, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - node, err := NewNode(context.Background(), config.NodeConfig{DALayer: "mock", Light: false}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) - rpc := node.GetClient() + rpc := NewClient(node) require.NotNil(rpc) return app, rpc From 6a4d7b9edd635e11773c35137ace638b974aa438 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Thu, 12 Jan 2023 18:01:29 -0500 Subject: [PATCH 15/22] filled out the LightClient skeleton --- node/full_client.go | 6 +- node/full_client_test.go | 12 +-- node/light.go | 4 + node/light_client.go | 202 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 215 insertions(+), 9 deletions(-) create mode 100644 node/light_client.go diff --git a/node/full_client.go b/node/full_client.go index 45e3b07ec9..1ac6c96c13 100644 --- a/node/full_client.go +++ b/node/full_client.go @@ -52,8 +52,8 @@ type FullClient struct { node *FullNode } -// NewClient returns Client working with given node. -func NewClient(node *FullNode) *FullClient { +// NewFullClient returns Client working with given node. +func NewFullClient(node *FullNode) *FullClient { return &FullClient{ EventBus: node.EventBus(), config: config.DefaultRPCConfig(), @@ -62,7 +62,7 @@ func NewClient(node *FullNode) *FullClient { } func (n *FullNode) GetClient() rpcclient.Client { - return NewClient(n) + return NewFullClient(n) } // ABCIInfo returns basic information about application state. diff --git a/node/full_client_test.go b/node/full_client_test.go index a9f1ba59bd..6e46525589 100644 --- a/node/full_client_test.go +++ b/node/full_client_test.go @@ -91,7 +91,7 @@ func TestGenesisChunked(t *testing.T) { signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) n, _ := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, abcicli.NewLocalClient(nil, mockApp), genDoc, log.TestingLogger()) - rpc := NewClient(n) + rpc := NewFullClient(n) var expectedID uint = 2 gc, err := rpc.GenesisChunked(context.Background(), expectedID) @@ -414,7 +414,7 @@ func TestTx(t *testing.T) { require.NoError(err) require.NotNil(node) - rpc := NewClient(node) + rpc := NewFullClient(node) require.NotNil(rpc) mockApp.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) mockApp.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) @@ -663,7 +663,7 @@ func TestValidatorSetHandling(t *testing.T) { require.NoError(err) require.NotNil(node) - rpc := NewClient(node) + rpc := NewFullClient(node) require.NotNil(rpc) err = node.Start() @@ -787,7 +787,7 @@ func getRPC(t *testing.T) (*mocks.Application, *FullClient) { require.NoError(err) require.NotNil(node) - rpc := NewClient(node) + rpc := NewFullClient(node) require.NotNil(rpc) return app, rpc @@ -878,7 +878,7 @@ func TestMempool2Nodes(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - local := NewClient(node1) + local := NewFullClient(node1) require.NotNil(local) // broadcast the bad Tx, this should not be propogated or added to the local mempool @@ -964,7 +964,7 @@ func TestStatus(t *testing.T) { err = node.Store.UpdateState(types.State{LastValidators: validatorSet, NextValidators: validatorSet, Validators: validatorSet}) assert.NoError(err) - rpc := NewClient(node) + rpc := NewFullClient(node) assert.NotNil(rpc) earliestBlock := getRandomBlockWithProposer(1, 1, validators[0].Address.Bytes()) diff --git a/node/light.go b/node/light.go index c1d911c8f1..4d7009ae50 100644 --- a/node/light.go +++ b/node/light.go @@ -7,3 +7,7 @@ import ( type LightNode struct { service.BaseService } + +func (n *LightNode) IsRunning() bool { + panic("Not implemented") +} diff --git a/node/light_client.go b/node/light_client.go new file mode 100644 index 0000000000..dffdb16095 --- /dev/null +++ b/node/light_client.go @@ -0,0 +1,202 @@ +package node + +import ( + "context" + + abcicli "github.com/tendermint/tendermint/abci/client" + "github.com/tendermint/tendermint/config" + tmbytes "github.com/tendermint/tendermint/libs/bytes" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + rpcclient "github.com/tendermint/tendermint/rpc/client" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/types" +) + +type LightClient struct { + *types.EventBus + config *config.RPCConfig + + node LightNode +} + +func NewLightClient(node *LightNode) *LightClient { + panic("Not implemented") +} + +func (n *LightNode) GetClient() rpcclient.Client { + return NewLightClient(n) +} + +// ABCIInfo returns basic information about application state. +func (c *LightClient) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) { + panic("Not implemented") +} + +// ABCIQuery queries for data from application. +func (c *LightClient) ABCIQuery(ctx context.Context, path string, data tmbytes.HexBytes) (*ctypes.ResultABCIQuery, error) { + panic("Not implemented") +} + +// ABCIQueryWithOptions queries for data from application. +func (c *LightClient) ABCIQueryWithOptions(ctx context.Context, path string, data tmbytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { + panic("Not implemented") +} + +// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. +// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit +func (c *LightClient) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { + panic("Not implemented") +} + +// BroadcastTxAsync returns right away, with no response. Does not wait for +// CheckTx nor DeliverTx results. +// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async +func (c *LightClient) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + panic("Not implemented") +} + +// BroadcastTxSync returns with the response from CheckTx. Does not wait for +// DeliverTx result. +// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync +func (c *LightClient) BroadcastTxSync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + panic("Not implemented") +} + +// Subscribe subscribe given subscriber to a query. +func (c *LightClient) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { + panic("Not implemented") +} + +// Unsubscribe unsubscribes given subscriber from a query. +func (c *LightClient) Unsubscribe(ctx context.Context, subscriber, query string) error { + panic("Not implemented") +} + +// Genesis returns entire genesis. +func (c *LightClient) Genesis(_ context.Context) (*ctypes.ResultGenesis, error) { + panic("Not implemented") +} + +// GenesisChunked returns given chunk of genesis. +func (c *LightClient) GenesisChunked(context context.Context, id uint) (*ctypes.ResultGenesisChunk, error) { + panic("Not implemented") +} + +// BlockchainInfo returns ABCI block meta information for given height range. +func (c *LightClient) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { + panic("Not implemented") +} + +// NetInfo returns basic information about client P2P connections. +func (c *LightClient) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) { + panic("Not implemented") +} + +// DumpConsensusState always returns error as there is no consensus state in rollmint. +func (c *LightClient) DumpConsensusState(ctx context.Context) (*ctypes.ResultDumpConsensusState, error) { + panic("Not implemented") +} + +// ConsensusState always returns error as there is no consensus state in rollmint. +func (c *LightClient) ConsensusState(ctx context.Context) (*ctypes.ResultConsensusState, error) { + panic("Not implemented") +} + +// ConsensusParams returns consensus params at given height. +// +// Currently, consensus params changes are not supported and this method returns params as defined in genesis. +func (c *LightClient) ConsensusParams(ctx context.Context, height *int64) (*ctypes.ResultConsensusParams, error) { + panic("Not implemented") +} + +// Health endpoint returns empty value. It can be used to monitor service availability. +func (c *LightClient) Health(ctx context.Context) (*ctypes.ResultHealth, error) { + panic("Not implemented") +} + +// Block method returns BlockID and block itself for given height. +// +// If height is nil, it returns information about last known block. +func (c *LightClient) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) { + panic("Not implemented") +} + +// BlockByHash returns BlockID and block itself for given hash. +func (c *LightClient) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlock, error) { + panic("Not implemented") +} + +// BlockResults returns information about transactions, events and updates of validator set and consensus params. +func (c *LightClient) BlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) { + panic("Not implemented") +} + +// Commit returns signed header (aka commit) at given height. +func (c *LightClient) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) { + panic("Not implemented") +} + +// Validators returns paginated list of validators at given height. +func (c *LightClient) Validators(ctx context.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*ctypes.ResultValidators, error) { + panic("Not implemented") +} + +// Tx returns detailed information about transaction identified by its hash. +func (c *LightClient) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { + panic("Not implemented") +} + +// TxSearch returns detailed information about transactions matching query. +func (c *LightClient) TxSearch(ctx context.Context, query string, prove bool, pagePtr, perPagePtr *int, orderBy string) (*ctypes.ResultTxSearch, error) { + panic("Not implemented") +} + +// BlockSearch defines a method to search for a paginated set of blocks by +// BeginBlock and EndBlock event search criteria. +func (c *LightClient) BlockSearch(ctx context.Context, query string, page, perPage *int, orderBy string) (*ctypes.ResultBlockSearch, error) { + panic("Not implemented") +} + +// Status returns detailed information about current status of the node. +func (c *LightClient) Status(ctx context.Context) (*ctypes.ResultStatus, error) { + panic("Not implemented") +} + +// BroadcastEvidence is not yet implemented. +func (c *LightClient) BroadcastEvidence(ctx context.Context, evidence types.Evidence) (*ctypes.ResultBroadcastEvidence, error) { + panic("Not implemented") +} + +// NumUnconfirmedTxs returns information about transactions in mempool. +func (c *LightClient) NumUnconfirmedTxs(ctx context.Context) (*ctypes.ResultUnconfirmedTxs, error) { + panic("Not implemented") +} + +// UnconfirmedTxs returns transactions in mempool. +func (c *LightClient) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) { + panic("Not implemented") +} + +// CheckTx executes a new transaction against the application to determine its validity. +// +// If valid, the tx is automatically added to the mempool. +func (c *LightClient) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { + panic("Not implemented") +} + +func (c *LightClient) eventsRoutine(sub types.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) { + panic("Not implemented") +} + +// Try to resubscribe with exponential backoff. +func (c *LightClient) resubscribe(subscriber string, q tmpubsub.Query) types.Subscription { + panic("Not implemented") +} + +func (c *LightClient) appClient() abcicli.Client { + panic("Not implemented") +} + +func (c *LightClient) normalizeHeight(height *int64) uint64 { + panic("Not implemented") +} From c21811cc45e8b78be182b7893018bbe1191daf49 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Fri, 13 Jan 2023 18:37:33 -0500 Subject: [PATCH 16/22] enforce interface adherence --- node/full.go | 2 ++ node/light.go | 2 ++ node/light_client.go | 2 ++ 3 files changed, 6 insertions(+) diff --git a/node/full.go b/node/full.go index 95439197af..655b8190c8 100644 --- a/node/full.go +++ b/node/full.go @@ -48,6 +48,8 @@ const ( genesisChunkSize = 16 * 1024 * 1024 // 16 MiB ) +var _ Node = &FullNode{} + // FullNode represents a client node in rollmint network. // It connects all the components and orchestrates their work. type FullNode struct { diff --git a/node/light.go b/node/light.go index 4d7009ae50..b6887d25b0 100644 --- a/node/light.go +++ b/node/light.go @@ -4,6 +4,8 @@ import ( "github.com/tendermint/tendermint/libs/service" ) +var _ Node = &LightNode{} + type LightNode struct { service.BaseService } diff --git a/node/light_client.go b/node/light_client.go index dffdb16095..0ade224aa4 100644 --- a/node/light_client.go +++ b/node/light_client.go @@ -12,6 +12,8 @@ import ( "github.com/tendermint/tendermint/types" ) +var _ rpcclient.Client = &LightClient{} + type LightClient struct { *types.EventBus config *config.RPCConfig From c914f3d3375309057674b17c831fca2c0baa44d2 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Fri, 13 Jan 2023 18:40:11 -0500 Subject: [PATCH 17/22] newLightNode --- node/light.go | 4 ++++ node/node.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/node/light.go b/node/light.go index b6887d25b0..705d79ed86 100644 --- a/node/light.go +++ b/node/light.go @@ -10,6 +10,10 @@ type LightNode struct { service.BaseService } +func newLightNode() (Node, error) { + return &LightNode{}, nil +} + func (n *LightNode) IsRunning() bool { panic("Not implemented") } diff --git a/node/node.go b/node/node.go index a840e29597..c6ea7300b6 100644 --- a/node/node.go +++ b/node/node.go @@ -38,6 +38,6 @@ func NewNode( logger, ) } else { - panic("Light node not implemented") + return newLightNode() } } From 682eda7684650cc2e50bb61fb541031154c936a3 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Fri, 13 Jan 2023 18:41:16 -0500 Subject: [PATCH 18/22] remove unnecessary functions in light_client --- node/light_client.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/node/light_client.go b/node/light_client.go index 0ade224aa4..1e4b3ca998 100644 --- a/node/light_client.go +++ b/node/light_client.go @@ -3,10 +3,8 @@ package node import ( "context" - abcicli "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/config" tmbytes "github.com/tendermint/tendermint/libs/bytes" - tmpubsub "github.com/tendermint/tendermint/libs/pubsub" rpcclient "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" @@ -185,20 +183,3 @@ func (c *LightClient) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*ctype func (c *LightClient) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { panic("Not implemented") } - -func (c *LightClient) eventsRoutine(sub types.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) { - panic("Not implemented") -} - -// Try to resubscribe with exponential backoff. -func (c *LightClient) resubscribe(subscriber string, q tmpubsub.Query) types.Subscription { - panic("Not implemented") -} - -func (c *LightClient) appClient() abcicli.Client { - panic("Not implemented") -} - -func (c *LightClient) normalizeHeight(height *int64) uint64 { - panic("Not implemented") -} From 7130a5032ecc6c587dca94325d90a820280a89a1 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Fri, 13 Jan 2023 18:42:18 -0500 Subject: [PATCH 19/22] move GetClient to light.go --- node/light.go | 5 +++++ node/light_client.go | 4 ---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/node/light.go b/node/light.go index 705d79ed86..2c2d701050 100644 --- a/node/light.go +++ b/node/light.go @@ -2,6 +2,7 @@ package node import ( "github.com/tendermint/tendermint/libs/service" + rpcclient "github.com/tendermint/tendermint/rpc/client" ) var _ Node = &LightNode{} @@ -10,6 +11,10 @@ type LightNode struct { service.BaseService } +func (n *LightNode) GetClient() rpcclient.Client { + return NewLightClient(n) +} + func newLightNode() (Node, error) { return &LightNode{}, nil } diff --git a/node/light_client.go b/node/light_client.go index 1e4b3ca998..823b9e39c4 100644 --- a/node/light_client.go +++ b/node/light_client.go @@ -23,10 +23,6 @@ func NewLightClient(node *LightNode) *LightClient { panic("Not implemented") } -func (n *LightNode) GetClient() rpcclient.Client { - return NewLightClient(n) -} - // ABCIInfo returns basic information about application state. func (c *LightClient) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) { panic("Not implemented") From 28e6c2c7fa0634dadc3268f27a3681e32850ea83 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Fri, 13 Jan 2023 18:42:54 -0500 Subject: [PATCH 20/22] don't shadow BaseService. not sure why i did that --- node/light.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/node/light.go b/node/light.go index 2c2d701050..a8ce6ad395 100644 --- a/node/light.go +++ b/node/light.go @@ -18,7 +18,3 @@ func (n *LightNode) GetClient() rpcclient.Client { func newLightNode() (Node, error) { return &LightNode{}, nil } - -func (n *LightNode) IsRunning() bool { - panic("Not implemented") -} From 33cd3ced9b484fbd6a354378b6ae0c16efdce686 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Sun, 15 Jan 2023 14:39:36 -0600 Subject: [PATCH 21/22] implement NewLightClient and remove unused fields from LightClient --- node/light_client.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/node/light_client.go b/node/light_client.go index 823b9e39c4..7ca08551b8 100644 --- a/node/light_client.go +++ b/node/light_client.go @@ -3,7 +3,6 @@ package node import ( "context" - "github.com/tendermint/tendermint/config" tmbytes "github.com/tendermint/tendermint/libs/bytes" rpcclient "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -13,14 +12,14 @@ import ( var _ rpcclient.Client = &LightClient{} type LightClient struct { - *types.EventBus - config *config.RPCConfig - - node LightNode + types.EventBus + node *LightNode } func NewLightClient(node *LightNode) *LightClient { - panic("Not implemented") + return &LightClient{ + node: node, + } } // ABCIInfo returns basic information about application state. From de71087731f04493a73088ab8eb932c979a7f329 Mon Sep 17 00:00:00 2001 From: Connor O'Hara Date: Mon, 16 Jan 2023 16:18:33 -0600 Subject: [PATCH 22/22] close channel --- node/full_client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/node/full_client.go b/node/full_client.go index 1ac6c96c13..1328a09b86 100644 --- a/node/full_client.go +++ b/node/full_client.go @@ -799,6 +799,7 @@ func (c *FullClient) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCh } func (c *FullClient) eventsRoutine(sub types.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) { + defer close(outc) for { select { case msg := <-sub.Out():