diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index 0093a8ad60..e8fc6eabe1 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -35,12 +35,6 @@ const ( // peerLimit defines limit of number of peers returned during active peer discovery. peerLimit = 60 - - // seedReconnectBackoff is the initial backoff when reconnecting to a disconnected seed peer. - seedReconnectBackoff = 1 * time.Second - - // seedReconnectMaxBackoff is the maximum backoff for seed peer reconnection attempts. - seedReconnectMaxBackoff = 30 * time.Second ) // Client is a P2P client, implemented with libp2p. @@ -62,11 +56,6 @@ type Client struct { ps *pubsub.PubSub started bool - ctx context.Context - cancel context.CancelFunc - - seedPeers []peer.AddrInfo - metrics *Metrics } @@ -151,7 +140,6 @@ func (c *Client) Start(ctx context.Context) error { func (c *Client) startWithHost(ctx context.Context, h host.Host) error { c.host = h - c.ctx, c.cancel = context.WithCancel(ctx) for _, a := range c.host.Addrs() { c.logger.Info().Str("address", fmt.Sprintf("%s/p2p/%s", a, c.host.ID())).Msg("listening on address") } @@ -182,17 +170,11 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { } c.started = true - - c.host.Network().Notify(c.newDisconnectNotifee()) - return nil } // Close gently stops Client. func (c *Client) Close() error { - if c.cancel != nil { - c.cancel() - } var err error if c.dht != nil { err = errors.Join(err, c.dht.Close()) @@ -263,77 +245,6 @@ func (c *Client) Peers() []PeerConnection { return res } -type disconnectNotifee struct { - c *Client -} - -func (n disconnectNotifee) Connected(_ network.Network, conn network.Conn) { - p := conn.RemotePeer() - for _, sp := range n.c.seedPeers { - if sp.ID == p { - n.c.logger.Info().Str("peer", p.String()).Msg("connected to seed peer") - return - } - } -} -func (n disconnectNotifee) OpenedStream(_ network.Network, _ network.Stream) {} -func (n disconnectNotifee) ClosedStream(_ network.Network, _ network.Stream) {} -func (n disconnectNotifee) Listen(_ network.Network, _ multiaddr.Multiaddr) {} -func (n disconnectNotifee) ListenClose(_ network.Network, _ multiaddr.Multiaddr) {} - -func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) { - p := conn.RemotePeer() - for _, sp := range n.c.seedPeers { - if sp.ID == p { - n.c.logger.Warn().Str("peer", p.String()).Msg("disconnected from seed peer, scheduling reconnect") - go n.c.reconnectSeedPeer(sp) - return - } - } -} - -func (c *Client) reconnectSeedPeer(sp peer.AddrInfo) { - backoff := seedReconnectBackoff - for { - if c.ctx.Err() != nil { - return - } - if c.isConnected(sp.ID) { - return - } - - err := c.host.Connect(c.ctx, sp) - if err == nil { - c.logger.Info().Str("peer", sp.ID.String()).Msg("reconnected to seed peer") - return - } - if c.ctx.Err() != nil { - return - } - - c.logger.Debug().Str("peer", sp.ID.String()).Dur("backoff", backoff).Err(err).Msg("failed to reconnect to seed peer, retrying") - select { - case <-c.ctx.Done(): - return - case <-time.After(backoff): - } - - backoff *= 2 - if backoff > seedReconnectMaxBackoff { - backoff = seedReconnectMaxBackoff - } - } -} - -func (c *Client) newDisconnectNotifee() disconnectNotifee { - return disconnectNotifee{c: c} -} - -// isConnected returns true if there is an active connection to the given peer. -func (c *Client) isConnected(id peer.ID) bool { - return c.host.Network().Connectedness(id) == network.Connected -} - func (c *Client) listen() (host.Host, error) { maddr, err := multiaddr.NewMultiaddr(c.conf.ListenAddress) if err != nil { @@ -345,7 +256,6 @@ func (c *Client) listen() (host.Host, error) { func (c *Client) setupDHT(ctx context.Context) error { peers := c.parseAddrInfoList(c.conf.Peers) - c.seedPeers = peers if len(peers) == 0 { c.logger.Info().Msg("no peers - only listening for connections") } diff --git a/pkg/p2p/client_test.go b/pkg/p2p/client_test.go index 568b152ea4..e3ac6f1fab 100644 --- a/pkg/p2p/client_test.go +++ b/pkg/p2p/client_test.go @@ -278,104 +278,6 @@ func waitForCondition(timeout time.Duration, conditionFunc func() bool) error { } } -func TestSeedPeerReconnect(t *testing.T) { - require := require.New(t) - assert := assert.New(t) - logger := zerolog.Nop() - - mn := mocknet.New() - defer mn.Close() - - seedKey, err := key.GenerateNodeKey() - require.NoError(err) - seedAddr, err := getAddr(seedKey.PrivKey) - require.NoError(err) - seedHost, err := mn.AddPeer(seedKey.PrivKey, seedAddr) - require.NoError(err) - - clientKey, err := key.GenerateNodeKey() - require.NoError(err) - clientAddr, err := getAddr(clientKey.PrivKey) - require.NoError(err) - clientHost, err := mn.AddPeer(clientKey.PrivKey, clientAddr) - require.NoError(err) - - seedAddrStr := seedHost.Addrs()[0].String() + "/p2p/" + seedHost.ID().String() - conf := config.P2PConfig{Peers: seedAddrStr} - - client, err := NewClient(conf, clientKey.PrivKey, dssync.MutexWrap(datastore.NewMapDatastore()), "test-chain", logger, NopMetrics()) - require.NoError(err) - require.NotNil(client) - - err = mn.LinkAll() - require.NoError(err) - err = mn.ConnectAllButSelf() - require.NoError(err) - - ctx := t.Context() - err = client.startWithHost(ctx, clientHost) - require.NoError(err) - defer client.Close() - - err = waitForCondition(2*time.Second, func() bool { - return client.isConnected(seedHost.ID()) - }) - require.NoError(err, "client should connect to seed peer on start") - - conns := client.host.Network().ConnsToPeer(seedHost.ID()) - for _, conn := range conns { - conn.Close() - } - client.host.Network().ClosePeer(seedHost.ID()) - - assert.False(client.isConnected(seedHost.ID()), "seed peer should be disconnected") - - err = waitForCondition(5*time.Second, func() bool { - return client.isConnected(seedHost.ID()) - }) - require.NoError(err, "client should reconnect to seed peer after disconnect") -} - -func TestSeedPeerReconnectStopsOnClose(t *testing.T) { - require := require.New(t) - - mn := mocknet.New() - defer mn.Close() - - seedKey, err := key.GenerateNodeKey() - require.NoError(err) - seedAddr, err := getAddr(seedKey.PrivKey) - require.NoError(err) - seedHost, err := mn.AddPeer(seedKey.PrivKey, seedAddr) - require.NoError(err) - - clientKey, err := key.GenerateNodeKey() - require.NoError(err) - clientAddr, err := getAddr(clientKey.PrivKey) - require.NoError(err) - clientHost, err := mn.AddPeer(clientKey.PrivKey, clientAddr) - require.NoError(err) - - seedAddrStr := seedHost.Addrs()[0].String() + "/p2p/" + seedHost.ID().String() - conf := config.P2PConfig{Peers: seedAddrStr} - - client, err := NewClient(conf, clientKey.PrivKey, dssync.MutexWrap(datastore.NewMapDatastore()), "test-chain", zerolog.Nop(), NopMetrics()) - require.NoError(err) - - err = mn.LinkAll() - require.NoError(err) - err = mn.ConnectAllButSelf() - require.NoError(err) - - ctx := t.Context() - err = client.startWithHost(ctx, clientHost) - require.NoError(err) - - require.NoError(client.Close()) - - require.Error(client.ctx.Err(), "client context should be cancelled after Close") -} - func TestClientInfoMethods(t *testing.T) { require := require.New(t) assert := assert.New(t)