diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/p2p/network.go | 262 | ||||
-rw-r--r-- | pkg/p2p/network_test.go | 4 | ||||
-rw-r--r-- | pkg/ui/views/play_api.go | 5 |
3 files changed, 156 insertions, 115 deletions
diff --git a/pkg/p2p/network.go b/pkg/p2p/network.go index a89d8c5..0acbdfa 100644 --- a/pkg/p2p/network.go +++ b/pkg/p2p/network.go @@ -14,9 +14,9 @@ import ( // `Message` represents a structured message on this network. type Message struct { - Timestamp int64 `json:"timestamp"` - Source string `json:"source"` - Payload []byte `json:"payload"` + Timestamp int64 `json:"timestamp"` + Source NetworkID `json:"source"` + Payload []byte `json:"payload"` } // A network ID is represented by a string @@ -31,9 +31,9 @@ type NetworkMessageReceiveFunc func(msg Message) // This type represent the callback function invokes every new handshake between // two peers -type NetworkHandshakeFunc func() error +type NetworkHandshakeFunc func(conn net.Conn) error -func DefaultHandshake() error { +func DefaultHandshake(conn net.Conn) error { return nil } @@ -46,16 +46,21 @@ type TCPNetworkOpts struct { Logger *zap.Logger } +// PeerConnection holds the connection and address of a peer. +type PeerConnection struct { + Conn net.Conn + Address string +} + // TCPNetwork represents a TCP peer capable to send and receive messages type TCPNetwork struct { sync.Mutex TCPNetworkOpts - id NetworkID - listener net.Listener - connections map[NetworkID]net.Conn - peerAddresses map[NetworkID]string - isClosed bool + id NetworkID + listener net.Listener + connections map[NetworkID]PeerConnection + isClosed bool } // Initiliaze a new TCP network @@ -63,8 +68,7 @@ func NewTCPNetwork(localID NetworkID, opts TCPNetworkOpts) *TCPNetwork { n := &TCPNetwork{ TCPNetworkOpts: opts, id: localID, - connections: make(map[NetworkID]net.Conn), - peerAddresses: make(map[NetworkID]string), + connections: make(map[NetworkID]PeerConnection), } go n.startServer() @@ -74,30 +78,53 @@ func NewTCPNetwork(localID NetworkID, opts TCPNetworkOpts) *TCPNetwork { // Close listener' connection func (n *TCPNetwork) Close() error { - return n.listener.Close() + n.isClosed = true + if n.listener != nil { + err := n.listener.Close() + if err != nil { + return err + } + } + n.Lock() + for _, pc := range n.connections { + if pc.Conn != nil { + pc.Conn.Close() + } + } + n.connections = nil + n.Unlock() + return nil } // Add a new peer connection to the local peer func (n *TCPNetwork) AddPeer(remoteID NetworkID, addr string) { n.Lock() - n.peerAddresses[remoteID] = addr + if _, exists := n.connections[remoteID]; !exists { + n.connections[remoteID] = PeerConnection{Address: addr} + go n.retryConnect(remoteID, addr) + } n.Unlock() - go n.retryConnect(remoteID, addr) } // Send methods is used to send a message to a specified remote peer func (n *TCPNetwork) Send(remoteID NetworkID, payload []byte) error { n.Lock() - conn, exists := n.connections[remoteID] + peerConn, exists := n.connections[remoteID] n.Unlock() if !exists { return fmt.Errorf("not connected to peer %s", remoteID) } + if peerConn.Conn == nil { + n.Logger.Sugar().Warnf("connection to peer %s is nil, attempting reconnect", remoteID) + go n.retryConnect(remoteID, peerConn.Address) + return fmt.Errorf("connection to peer %s is nil", remoteID) + } + message := Message{ Payload: payload, - Source: n.listener.Addr().String(), + Source: n.id, Timestamp: time.Now().Unix(), } @@ -106,21 +133,15 @@ func (n *TCPNetwork) Send(remoteID NetworkID, payload []byte) error { return fmt.Errorf("failed to marshal message: %v", err) } - _, err = conn.Write(append(data, '\n')) + _, err = peerConn.Conn.Write(append(data, '\n')) + if err != nil { n.Logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err) - n.Lock() - delete(n.connections, remoteID) - addr, ok := n.peerAddresses[remoteID] - n.Unlock() - if ok { - go n.retryConnect(remoteID, addr) - } else { - n.Logger.Sugar().Warnf("no address found for peer %s to reconnect", remoteID) - } + n.removeConnection(remoteID) + go n.retryConnect(remoteID, peerConn.Address) return fmt.Errorf("failed to send message: %v", err) } else { - n.Logger.Sugar().Infof("sent message to '%s': %s", remoteID, message.Payload) + n.Logger.Sugar().Infof("sent message to '%s' (%s): %s", remoteID, peerConn.Address, string(message.Payload)) } return nil @@ -132,150 +153,169 @@ func (n *TCPNetwork) RegisterHandler(callback NetworkMessageReceiveFunc) { } // startServer starts a TCP server to accept connections. -func (n *TCPNetwork) startServer() error { +func (n *TCPNetwork) startServer() { var err error n.listener, err = net.Listen("tcp", n.ListenAddr) if err != nil { n.Logger.Sugar().Errorf("failed to start server: %v", err) - return err + return } n.isClosed = false - go n.listenLoop() - n.Logger.Sugar().Infof("server started on %s\n", n.ListenAddr) - return nil - -} - -func (n *TCPNetwork) listenLoop() error { for { conn, err := n.listener.Accept() - if errors.Is(err, net.ErrClosed) { - n.isClosed = true - n.Logger.Sugar().Errorf("connection is closed in such a way: %v\n", err) - return err + if n.isClosed { + n.Logger.Info("server listener closed") + return } - if err != nil { n.Logger.Sugar().Errorf("failed to accept connection: %v\n", err) continue } + go n.handleConnection(conn) + } +} - remoteAddr := conn.RemoteAddr().String() - remoteID := NetworkID(remoteAddr) - n.Lock() - n.connections[remoteID] = conn - n.peerAddresses[remoteID] = remoteAddr - if err := n.HandshakeFn(); err != nil { - n.Logger.Sugar().Errorf("error on handsharemoteIDking with %s: %v\n", remoteAddr, err) +func (n *TCPNetwork) handleConnection(conn net.Conn) { + remoteAddr := conn.RemoteAddr().String() + remoteID := NetworkID(remoteAddr) + + n.Lock() + n.connections[remoteID] = PeerConnection{Conn: conn, Address: remoteAddr} + n.Unlock() + + if n.HandshakeFn != nil { + if err := n.HandshakeFn(conn); err != nil { + n.Logger.Sugar().Errorf("error on handshaking with %s: %v\n", remoteAddr, err) conn.Close() - delete(n.connections, remoteID) - delete(n.peerAddresses, remoteID) - n.Unlock() - return err + n.removeConnection(remoteID) + return } - n.Unlock() + } - n.Logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr) + n.Logger.Sugar().Infof("connected to remote peer %s (%s)\n", remoteID, remoteAddr) - // Read loop - go n.listenForMessages(conn) - } -} + n.listenForMessages(conn, remoteID) -// listenForMessages listens for incoming messages. -func (n *TCPNetwork) listenForMessages(conn net.Conn) { - reader := bufio.NewReader(conn) - var remoteID NetworkID + n.removeConnection(remoteID) + conn.Close() + n.Logger.Sugar().Infof("connection to %s closed\n", remoteAddr) +} +func (n *TCPNetwork) removeConnection(id NetworkID) { n.Lock() - for id, c := range n.connections { - if c == conn { - remoteID = id - break - } - } + delete(n.connections, id) n.Unlock() +} + +// listenForMessages listens for incoming messages on a specific connection. +func (n *TCPNetwork) listenForMessages(conn net.Conn, remoteID NetworkID) { + reader := bufio.NewReader(conn) + remoteAddr := conn.RemoteAddr().String() for { data, err := reader.ReadBytes('\n') if err != nil { - n.Logger.Debug("connection lost. Reconnecting...") + if errors.Is(err, net.ErrClosed) { + n.Logger.Sugar().Debugf("connection to %s closed by remote peer", remoteAddr) + } else { + n.Logger.Sugar().Warnf("error reading from connection %s: %v", remoteAddr, err) + } + n.Lock() - delete(n.connections, remoteID) - addr, ok := n.peerAddresses[remoteID] + peerConn, exists := n.connections[remoteID] n.Unlock() - if ok { - go n.retryConnect(remoteID, addr) + if exists { + go n.retryConnect(remoteID, peerConn.Address) } else { - n.Logger.Sugar().Warnf("no address found for peer %s to reconnect", remoteID) + n.Logger.Sugar().Warnf("no address to reconnect to peer %s", remoteID) } return } var message Message if err := json.Unmarshal(data, &message); err != nil { - n.Logger.Sugar().Errorf("failed to unmarshal message: %v\n", err) + n.Logger.Sugar().Errorf("failed to unmarshal message from %s: %v\n", remoteAddr, err) continue } - n.Logger.Sugar().Infof("received message from '%s': %s", remoteID, string(message.Payload)) + n.Logger.Sugar().Infof("received message from '%s' (%s): %s", message.Source, remoteAddr, string(message.Payload)) - n.OnReceiveFn(message) + if n.OnReceiveFn != nil { + n.OnReceiveFn(message) + } } } // retryConnect attempts to connect to a remote peer. func (n *TCPNetwork) retryConnect(remoteID NetworkID, addr string) { - for { + if addr == "" { + n.Logger.Sugar().Warnf("no address to reconnect to peer %s", remoteID) + n.removeConnection(remoteID) + return + } + + retryDelay := n.RetryDelay + for !n.isClosed { n.Lock() - _, exists := n.connections[remoteID] + _, connected := n.connections[remoteID] n.Unlock() - if exists { - time.Sleep(5 * time.Second) - continue + if connected { + if n.connections[remoteID].Conn != nil { + time.Sleep(5 * time.Second) + continue + } } - if addr == "" { - n.Logger.Sugar().Warnf("no address to retry connection for peer %s", remoteID) + conn, err := net.Dial("tcp", addr) + + if err == nil { + n.Logger.Sugar().Infof("successfully connected to peer %s (%s)!", remoteID, addr) n.Lock() - delete(n.peerAddresses, remoteID) + n.connections[remoteID] = PeerConnection{Conn: conn, Address: addr} n.Unlock() + go n.handleConnection(conn) return - } - - conn, err := net.Dial("tcp", addr) - - if err != nil { - n.Logger.Sugar().Errorf("failed to connect to %s (%s): %v. Retrying in %v...", remoteID, addr, err, n.RetryDelay) - time.Sleep(n.RetryDelay) - if !n.isClosed && n.RetryDelay < 2*time.Minute { - n.RetryDelay *= 2 - } else if !n.isClosed { - n.Lock() - delete(n.connections, remoteID) - delete(n.peerAddresses, remoteID) - n.Unlock() - n.Logger.Sugar().Infof("stopped retrying and removed peer %s", remoteID) + } else { + n.Logger.Sugar().Errorf("failed to connect to %s (%s): %v. Retrying in %v...", remoteID, addr, err, retryDelay) + select { + case <-time.After(retryDelay): + if retryDelay < 2*time.Minute { + retryDelay *= 2 + } + case <-n.closed(): + n.Logger.Info("retryConnect stopped due to network closure (inner for)") return - } else { - return // Exit if the network is closed } - continue } + } + n.Logger.Info("retryConnect stopped due to network closure") +} - n.Lock() - n.connections[remoteID] = conn - n.Unlock() - - n.Logger.Sugar().Infof("successfully connected to peer %s (%s)!", remoteID, addr) +func (n *TCPNetwork) closed() <-chan struct{} { + ch := make(chan struct{}) + go func() { + <-n.listenerClosed() + close(ch) + }() + return ch +} - go n.listenForMessages(conn) - return +func (n *TCPNetwork) listenerClosed() <-chan struct{} { + if n.listener == nil { + ch := make(chan struct{}) + close(ch) + return ch } + done := make(chan struct{}) + go func() { + n.listener.Accept() // This will block until closed + close(done) + }() + return done } diff --git a/pkg/p2p/network_test.go b/pkg/p2p/network_test.go index faad9d4..fd5dde0 100644 --- a/pkg/p2p/network_test.go +++ b/pkg/p2p/network_test.go @@ -13,7 +13,7 @@ func TestPeerToPeerCommunication(t *testing.T) { // Create a mock of the first peer (peer-1) peer1Opts := TCPNetworkOpts{ ListenAddr: ":9001", - HandshakeFn: func() error { return nil }, + HandshakeFn: DefaultHandshake, RetryDelay: time.Second * 2, Logger: zap.L(), } @@ -26,7 +26,7 @@ func TestPeerToPeerCommunication(t *testing.T) { // Create a mock of the second peer (peer-2) peer2Opts := TCPNetworkOpts{ ListenAddr: ":9002", - HandshakeFn: func() error { return nil }, + HandshakeFn: DefaultHandshake, RetryDelay: time.Second * 2, Logger: zap.L(), OnReceiveFn: func(msg Message) { diff --git a/pkg/ui/views/play_api.go b/pkg/ui/views/play_api.go index 6cc0c6e..c098930 100644 --- a/pkg/ui/views/play_api.go +++ b/pkg/ui/views/play_api.go @@ -3,6 +3,7 @@ package views import ( "encoding/json" "fmt" + "net" "net/http" "os" "strconv" @@ -45,8 +46,8 @@ func (m *PlayModel) handlePlayResponse(msg playResponse) (tea.Model, tea.Cmd) { logger, _ := logger.GetLogger() callbackCompleted := make(chan bool) - m.network = multiplayer.NewGameNetwork(fmt.Sprintf("%s-1", m.playName), fmt.Sprintf("%s:%d", msg.Ok.IP, msg.Ok.Port), func() error { - close(callbackCompleted) + m.network = multiplayer.NewGameNetwork(fmt.Sprintf("%s-1", m.playName), fmt.Sprintf("%s:%d", msg.Ok.IP, msg.Ok.Port), func(net.Conn) error { + callbackCompleted <- true return nil }, logger) |