summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanto Cariotti <santo@dcariotti.me>2025-04-18 14:33:09 +0200
committerSanto Cariotti <santo@dcariotti.me>2025-04-18 14:33:09 +0200
commit77c044f9e63c1ebc144130a7a4babecc8977d262 (patch)
tree0818229cbcdba85a680e1b679d7f304db0ebbbbe
parent040b6e169dbd78b0f79921e4590e49b8970f685f (diff)
Store peer addresses too
-rw-r--r--pkg/p2p/network.go88
1 files changed, 61 insertions, 27 deletions
diff --git a/pkg/p2p/network.go b/pkg/p2p/network.go
index fba8c80..a89d8c5 100644
--- a/pkg/p2p/network.go
+++ b/pkg/p2p/network.go
@@ -46,15 +46,16 @@ type TCPNetworkOpts struct {
Logger *zap.Logger
}
-// TCPNetwork represents a full-duplex TCP peer.
+// TCPNetwork represents a TCP peer capable to send and receive messages
type TCPNetwork struct {
sync.Mutex
TCPNetworkOpts
- id NetworkID
- listener net.Listener
- connections map[NetworkID]net.Conn
- isClosed bool
+ id NetworkID
+ listener net.Listener
+ connections map[NetworkID]net.Conn
+ peerAddresses map[NetworkID]string
+ isClosed bool
}
// Initiliaze a new TCP network
@@ -63,6 +64,7 @@ func NewTCPNetwork(localID NetworkID, opts TCPNetworkOpts) *TCPNetwork {
TCPNetworkOpts: opts,
id: localID,
connections: make(map[NetworkID]net.Conn),
+ peerAddresses: make(map[NetworkID]string),
}
go n.startServer()
@@ -77,6 +79,9 @@ func (n *TCPNetwork) Close() error {
// Add a new peer connection to the local peer
func (n *TCPNetwork) AddPeer(remoteID NetworkID, addr string) {
+ n.Lock()
+ n.peerAddresses[remoteID] = addr
+ n.Unlock()
go n.retryConnect(remoteID, addr)
}
@@ -106,13 +111,16 @@ func (n *TCPNetwork) Send(remoteID NetworkID, payload []byte) error {
n.Logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err)
n.Lock()
delete(n.connections, remoteID)
+ addr, ok := n.peerAddresses[remoteID]
n.Unlock()
-
- go n.retryConnect(remoteID, "")
-
+ if ok {
+ go n.retryConnect(remoteID, addr)
+ } else {
+ n.Logger.Sugar().Warnf("no address found for peer %s to reconnect", remoteID)
+ }
return fmt.Errorf("failed to send message: %v", err)
} else {
- n.Logger.Sugar().Infof("Sent message to '%s': %s", conn.LocalAddr(), message.Payload)
+ n.Logger.Sugar().Infof("sent message to '%s': %s", remoteID, message.Payload)
}
return nil
@@ -158,14 +166,19 @@ func (n *TCPNetwork) listenLoop() error {
}
remoteAddr := conn.RemoteAddr().String()
+ remoteID := NetworkID(remoteAddr)
n.Lock()
- n.connections[NetworkID(remoteAddr)] = conn
+ n.connections[remoteID] = conn
+ n.peerAddresses[remoteID] = remoteAddr
if err := n.HandshakeFn(); err != nil {
- n.Logger.Sugar().Errorf("error on handshaking: %v\n", err)
+ n.Logger.Sugar().Errorf("error on handsharemoteIDking with %s: %v\n", remoteAddr, err)
+ conn.Close()
+ delete(n.connections, remoteID)
+ delete(n.peerAddresses, remoteID)
+ n.Unlock()
return err
}
n.Unlock()
- n.RetryDelay = 2 * time.Second
n.Logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr)
@@ -177,22 +190,30 @@ func (n *TCPNetwork) listenLoop() error {
// listenForMessages listens for incoming messages.
func (n *TCPNetwork) listenForMessages(conn net.Conn) {
reader := bufio.NewReader(conn)
+ var remoteID NetworkID
+
+ n.Lock()
+ for id, c := range n.connections {
+ if c == conn {
+ remoteID = id
+ break
+ }
+ }
+ n.Unlock()
for {
data, err := reader.ReadBytes('\n')
if err != nil {
n.Logger.Debug("connection lost. Reconnecting...")
n.Lock()
-
- // FIXME: a better way to re-establish the connection between peer
- for id, c := range n.connections {
- if c == conn {
- delete(n.connections, id)
- go n.retryConnect(id, "")
- break
- }
- }
+ delete(n.connections, remoteID)
+ addr, ok := n.peerAddresses[remoteID]
n.Unlock()
+ if ok {
+ go n.retryConnect(remoteID, addr)
+ } else {
+ n.Logger.Sugar().Warnf("no address found for peer %s to reconnect", remoteID)
+ }
return
}
@@ -202,7 +223,7 @@ func (n *TCPNetwork) listenForMessages(conn net.Conn) {
continue
}
- n.Logger.Sugar().Infof("Received message from '%s': %s", message.Source, string(message.Payload))
+ n.Logger.Sugar().Infof("received message from '%s': %s", remoteID, string(message.Payload))
n.OnReceiveFn(message)
}
@@ -220,19 +241,30 @@ func (n *TCPNetwork) retryConnect(remoteID NetworkID, addr string) {
continue
}
+ if addr == "" {
+ n.Logger.Sugar().Warnf("no address to retry connection for peer %s", remoteID)
+ n.Lock()
+ delete(n.peerAddresses, remoteID)
+ n.Unlock()
+ return
+ }
+
conn, err := net.Dial("tcp", addr)
if err != nil {
- n.Logger.Sugar().Errorf("failed to connect to %s: %v. Retrying in %v...", remoteID, err, n.RetryDelay)
+ n.Logger.Sugar().Errorf("failed to connect to %s (%s): %v. Retrying in %v...", remoteID, addr, err, n.RetryDelay)
time.Sleep(n.RetryDelay)
- if !n.isClosed && n.RetryDelay < 30*time.Second {
+ if !n.isClosed && n.RetryDelay < 2*time.Minute {
n.RetryDelay *= 2
- } else {
+ } else if !n.isClosed {
n.Lock()
delete(n.connections, remoteID)
+ delete(n.peerAddresses, remoteID)
n.Unlock()
- n.Logger.Sugar().Infof("removed %s connection", remoteID)
+ n.Logger.Sugar().Infof("stopped retrying and removed peer %s", remoteID)
return
+ } else {
+ return // Exit if the network is closed
}
continue
}
@@ -240,8 +272,10 @@ func (n *TCPNetwork) retryConnect(remoteID NetworkID, addr string) {
n.Lock()
n.connections[remoteID] = conn
n.Unlock()
- n.Logger.Sugar().Infof("successfully connected to peer %s!", remoteID)
+
+ n.Logger.Sugar().Infof("successfully connected to peer %s (%s)!", remoteID, addr)
go n.listenForMessages(conn)
+ return
}
}