diff options
author | Santo Cariotti <santo@dcariotti.me> | 2025-04-18 14:33:09 +0200 |
---|---|---|
committer | Santo Cariotti <santo@dcariotti.me> | 2025-04-18 14:33:09 +0200 |
commit | 77c044f9e63c1ebc144130a7a4babecc8977d262 (patch) | |
tree | 0818229cbcdba85a680e1b679d7f304db0ebbbbe /pkg | |
parent | 040b6e169dbd78b0f79921e4590e49b8970f685f (diff) |
Store peer addresses too
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/p2p/network.go | 88 |
1 files changed, 61 insertions, 27 deletions
diff --git a/pkg/p2p/network.go b/pkg/p2p/network.go index fba8c80..a89d8c5 100644 --- a/pkg/p2p/network.go +++ b/pkg/p2p/network.go @@ -46,15 +46,16 @@ type TCPNetworkOpts struct { Logger *zap.Logger } -// TCPNetwork represents a full-duplex TCP peer. +// TCPNetwork represents a TCP peer capable to send and receive messages type TCPNetwork struct { sync.Mutex TCPNetworkOpts - id NetworkID - listener net.Listener - connections map[NetworkID]net.Conn - isClosed bool + id NetworkID + listener net.Listener + connections map[NetworkID]net.Conn + peerAddresses map[NetworkID]string + isClosed bool } // Initiliaze a new TCP network @@ -63,6 +64,7 @@ func NewTCPNetwork(localID NetworkID, opts TCPNetworkOpts) *TCPNetwork { TCPNetworkOpts: opts, id: localID, connections: make(map[NetworkID]net.Conn), + peerAddresses: make(map[NetworkID]string), } go n.startServer() @@ -77,6 +79,9 @@ func (n *TCPNetwork) Close() error { // Add a new peer connection to the local peer func (n *TCPNetwork) AddPeer(remoteID NetworkID, addr string) { + n.Lock() + n.peerAddresses[remoteID] = addr + n.Unlock() go n.retryConnect(remoteID, addr) } @@ -106,13 +111,16 @@ func (n *TCPNetwork) Send(remoteID NetworkID, payload []byte) error { n.Logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err) n.Lock() delete(n.connections, remoteID) + addr, ok := n.peerAddresses[remoteID] n.Unlock() - - go n.retryConnect(remoteID, "") - + if ok { + go n.retryConnect(remoteID, addr) + } else { + n.Logger.Sugar().Warnf("no address found for peer %s to reconnect", remoteID) + } return fmt.Errorf("failed to send message: %v", err) } else { - n.Logger.Sugar().Infof("Sent message to '%s': %s", conn.LocalAddr(), message.Payload) + n.Logger.Sugar().Infof("sent message to '%s': %s", remoteID, message.Payload) } return nil @@ -158,14 +166,19 @@ func (n *TCPNetwork) listenLoop() error { } remoteAddr := conn.RemoteAddr().String() + remoteID := NetworkID(remoteAddr) n.Lock() - n.connections[NetworkID(remoteAddr)] = conn + n.connections[remoteID] = conn + n.peerAddresses[remoteID] = remoteAddr if err := n.HandshakeFn(); err != nil { - n.Logger.Sugar().Errorf("error on handshaking: %v\n", err) + n.Logger.Sugar().Errorf("error on handsharemoteIDking with %s: %v\n", remoteAddr, err) + conn.Close() + delete(n.connections, remoteID) + delete(n.peerAddresses, remoteID) + n.Unlock() return err } n.Unlock() - n.RetryDelay = 2 * time.Second n.Logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr) @@ -177,22 +190,30 @@ func (n *TCPNetwork) listenLoop() error { // listenForMessages listens for incoming messages. func (n *TCPNetwork) listenForMessages(conn net.Conn) { reader := bufio.NewReader(conn) + var remoteID NetworkID + + n.Lock() + for id, c := range n.connections { + if c == conn { + remoteID = id + break + } + } + n.Unlock() for { data, err := reader.ReadBytes('\n') if err != nil { n.Logger.Debug("connection lost. Reconnecting...") n.Lock() - - // FIXME: a better way to re-establish the connection between peer - for id, c := range n.connections { - if c == conn { - delete(n.connections, id) - go n.retryConnect(id, "") - break - } - } + delete(n.connections, remoteID) + addr, ok := n.peerAddresses[remoteID] n.Unlock() + if ok { + go n.retryConnect(remoteID, addr) + } else { + n.Logger.Sugar().Warnf("no address found for peer %s to reconnect", remoteID) + } return } @@ -202,7 +223,7 @@ func (n *TCPNetwork) listenForMessages(conn net.Conn) { continue } - n.Logger.Sugar().Infof("Received message from '%s': %s", message.Source, string(message.Payload)) + n.Logger.Sugar().Infof("received message from '%s': %s", remoteID, string(message.Payload)) n.OnReceiveFn(message) } @@ -220,19 +241,30 @@ func (n *TCPNetwork) retryConnect(remoteID NetworkID, addr string) { continue } + if addr == "" { + n.Logger.Sugar().Warnf("no address to retry connection for peer %s", remoteID) + n.Lock() + delete(n.peerAddresses, remoteID) + n.Unlock() + return + } + conn, err := net.Dial("tcp", addr) if err != nil { - n.Logger.Sugar().Errorf("failed to connect to %s: %v. Retrying in %v...", remoteID, err, n.RetryDelay) + n.Logger.Sugar().Errorf("failed to connect to %s (%s): %v. Retrying in %v...", remoteID, addr, err, n.RetryDelay) time.Sleep(n.RetryDelay) - if !n.isClosed && n.RetryDelay < 30*time.Second { + if !n.isClosed && n.RetryDelay < 2*time.Minute { n.RetryDelay *= 2 - } else { + } else if !n.isClosed { n.Lock() delete(n.connections, remoteID) + delete(n.peerAddresses, remoteID) n.Unlock() - n.Logger.Sugar().Infof("removed %s connection", remoteID) + n.Logger.Sugar().Infof("stopped retrying and removed peer %s", remoteID) return + } else { + return // Exit if the network is closed } continue } @@ -240,8 +272,10 @@ func (n *TCPNetwork) retryConnect(remoteID NetworkID, addr string) { n.Lock() n.connections[remoteID] = conn n.Unlock() - n.Logger.Sugar().Infof("successfully connected to peer %s!", remoteID) + + n.Logger.Sugar().Infof("successfully connected to peer %s (%s)!", remoteID, addr) go n.listenForMessages(conn) + return } } |