summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/p2p/network.go262
-rw-r--r--pkg/p2p/network_test.go4
-rw-r--r--pkg/ui/views/play_api.go5
3 files changed, 156 insertions, 115 deletions
diff --git a/pkg/p2p/network.go b/pkg/p2p/network.go
index a89d8c5..0acbdfa 100644
--- a/pkg/p2p/network.go
+++ b/pkg/p2p/network.go
@@ -14,9 +14,9 @@ import (
// `Message` represents a structured message on this network.
type Message struct {
- Timestamp int64 `json:"timestamp"`
- Source string `json:"source"`
- Payload []byte `json:"payload"`
+ Timestamp int64 `json:"timestamp"`
+ Source NetworkID `json:"source"`
+ Payload []byte `json:"payload"`
}
// A network ID is represented by a string
@@ -31,9 +31,9 @@ type NetworkMessageReceiveFunc func(msg Message)
// This type represent the callback function invokes every new handshake between
// two peers
-type NetworkHandshakeFunc func() error
+type NetworkHandshakeFunc func(conn net.Conn) error
-func DefaultHandshake() error {
+func DefaultHandshake(conn net.Conn) error {
return nil
}
@@ -46,16 +46,21 @@ type TCPNetworkOpts struct {
Logger *zap.Logger
}
+// PeerConnection holds the connection and address of a peer.
+type PeerConnection struct {
+ Conn net.Conn
+ Address string
+}
+
// TCPNetwork represents a TCP peer capable to send and receive messages
type TCPNetwork struct {
sync.Mutex
TCPNetworkOpts
- id NetworkID
- listener net.Listener
- connections map[NetworkID]net.Conn
- peerAddresses map[NetworkID]string
- isClosed bool
+ id NetworkID
+ listener net.Listener
+ connections map[NetworkID]PeerConnection
+ isClosed bool
}
// Initiliaze a new TCP network
@@ -63,8 +68,7 @@ func NewTCPNetwork(localID NetworkID, opts TCPNetworkOpts) *TCPNetwork {
n := &TCPNetwork{
TCPNetworkOpts: opts,
id: localID,
- connections: make(map[NetworkID]net.Conn),
- peerAddresses: make(map[NetworkID]string),
+ connections: make(map[NetworkID]PeerConnection),
}
go n.startServer()
@@ -74,30 +78,53 @@ func NewTCPNetwork(localID NetworkID, opts TCPNetworkOpts) *TCPNetwork {
// Close listener' connection
func (n *TCPNetwork) Close() error {
- return n.listener.Close()
+ n.isClosed = true
+ if n.listener != nil {
+ err := n.listener.Close()
+ if err != nil {
+ return err
+ }
+ }
+ n.Lock()
+ for _, pc := range n.connections {
+ if pc.Conn != nil {
+ pc.Conn.Close()
+ }
+ }
+ n.connections = nil
+ n.Unlock()
+ return nil
}
// Add a new peer connection to the local peer
func (n *TCPNetwork) AddPeer(remoteID NetworkID, addr string) {
n.Lock()
- n.peerAddresses[remoteID] = addr
+ if _, exists := n.connections[remoteID]; !exists {
+ n.connections[remoteID] = PeerConnection{Address: addr}
+ go n.retryConnect(remoteID, addr)
+ }
n.Unlock()
- go n.retryConnect(remoteID, addr)
}
// Send methods is used to send a message to a specified remote peer
func (n *TCPNetwork) Send(remoteID NetworkID, payload []byte) error {
n.Lock()
- conn, exists := n.connections[remoteID]
+ peerConn, exists := n.connections[remoteID]
n.Unlock()
if !exists {
return fmt.Errorf("not connected to peer %s", remoteID)
}
+ if peerConn.Conn == nil {
+ n.Logger.Sugar().Warnf("connection to peer %s is nil, attempting reconnect", remoteID)
+ go n.retryConnect(remoteID, peerConn.Address)
+ return fmt.Errorf("connection to peer %s is nil", remoteID)
+ }
+
message := Message{
Payload: payload,
- Source: n.listener.Addr().String(),
+ Source: n.id,
Timestamp: time.Now().Unix(),
}
@@ -106,21 +133,15 @@ func (n *TCPNetwork) Send(remoteID NetworkID, payload []byte) error {
return fmt.Errorf("failed to marshal message: %v", err)
}
- _, err = conn.Write(append(data, '\n'))
+ _, err = peerConn.Conn.Write(append(data, '\n'))
+
if err != nil {
n.Logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err)
- n.Lock()
- delete(n.connections, remoteID)
- addr, ok := n.peerAddresses[remoteID]
- n.Unlock()
- if ok {
- go n.retryConnect(remoteID, addr)
- } else {
- n.Logger.Sugar().Warnf("no address found for peer %s to reconnect", remoteID)
- }
+ n.removeConnection(remoteID)
+ go n.retryConnect(remoteID, peerConn.Address)
return fmt.Errorf("failed to send message: %v", err)
} else {
- n.Logger.Sugar().Infof("sent message to '%s': %s", remoteID, message.Payload)
+ n.Logger.Sugar().Infof("sent message to '%s' (%s): %s", remoteID, peerConn.Address, string(message.Payload))
}
return nil
@@ -132,150 +153,169 @@ func (n *TCPNetwork) RegisterHandler(callback NetworkMessageReceiveFunc) {
}
// startServer starts a TCP server to accept connections.
-func (n *TCPNetwork) startServer() error {
+func (n *TCPNetwork) startServer() {
var err error
n.listener, err = net.Listen("tcp", n.ListenAddr)
if err != nil {
n.Logger.Sugar().Errorf("failed to start server: %v", err)
- return err
+ return
}
n.isClosed = false
- go n.listenLoop()
-
n.Logger.Sugar().Infof("server started on %s\n", n.ListenAddr)
- return nil
-
-}
-
-func (n *TCPNetwork) listenLoop() error {
for {
conn, err := n.listener.Accept()
- if errors.Is(err, net.ErrClosed) {
- n.isClosed = true
- n.Logger.Sugar().Errorf("connection is closed in such a way: %v\n", err)
- return err
+ if n.isClosed {
+ n.Logger.Info("server listener closed")
+ return
}
-
if err != nil {
n.Logger.Sugar().Errorf("failed to accept connection: %v\n", err)
continue
}
+ go n.handleConnection(conn)
+ }
+}
- remoteAddr := conn.RemoteAddr().String()
- remoteID := NetworkID(remoteAddr)
- n.Lock()
- n.connections[remoteID] = conn
- n.peerAddresses[remoteID] = remoteAddr
- if err := n.HandshakeFn(); err != nil {
- n.Logger.Sugar().Errorf("error on handsharemoteIDking with %s: %v\n", remoteAddr, err)
+func (n *TCPNetwork) handleConnection(conn net.Conn) {
+ remoteAddr := conn.RemoteAddr().String()
+ remoteID := NetworkID(remoteAddr)
+
+ n.Lock()
+ n.connections[remoteID] = PeerConnection{Conn: conn, Address: remoteAddr}
+ n.Unlock()
+
+ if n.HandshakeFn != nil {
+ if err := n.HandshakeFn(conn); err != nil {
+ n.Logger.Sugar().Errorf("error on handshaking with %s: %v\n", remoteAddr, err)
conn.Close()
- delete(n.connections, remoteID)
- delete(n.peerAddresses, remoteID)
- n.Unlock()
- return err
+ n.removeConnection(remoteID)
+ return
}
- n.Unlock()
+ }
- n.Logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr)
+ n.Logger.Sugar().Infof("connected to remote peer %s (%s)\n", remoteID, remoteAddr)
- // Read loop
- go n.listenForMessages(conn)
- }
-}
+ n.listenForMessages(conn, remoteID)
-// listenForMessages listens for incoming messages.
-func (n *TCPNetwork) listenForMessages(conn net.Conn) {
- reader := bufio.NewReader(conn)
- var remoteID NetworkID
+ n.removeConnection(remoteID)
+ conn.Close()
+ n.Logger.Sugar().Infof("connection to %s closed\n", remoteAddr)
+}
+func (n *TCPNetwork) removeConnection(id NetworkID) {
n.Lock()
- for id, c := range n.connections {
- if c == conn {
- remoteID = id
- break
- }
- }
+ delete(n.connections, id)
n.Unlock()
+}
+
+// listenForMessages listens for incoming messages on a specific connection.
+func (n *TCPNetwork) listenForMessages(conn net.Conn, remoteID NetworkID) {
+ reader := bufio.NewReader(conn)
+ remoteAddr := conn.RemoteAddr().String()
for {
data, err := reader.ReadBytes('\n')
if err != nil {
- n.Logger.Debug("connection lost. Reconnecting...")
+ if errors.Is(err, net.ErrClosed) {
+ n.Logger.Sugar().Debugf("connection to %s closed by remote peer", remoteAddr)
+ } else {
+ n.Logger.Sugar().Warnf("error reading from connection %s: %v", remoteAddr, err)
+ }
+
n.Lock()
- delete(n.connections, remoteID)
- addr, ok := n.peerAddresses[remoteID]
+ peerConn, exists := n.connections[remoteID]
n.Unlock()
- if ok {
- go n.retryConnect(remoteID, addr)
+ if exists {
+ go n.retryConnect(remoteID, peerConn.Address)
} else {
- n.Logger.Sugar().Warnf("no address found for peer %s to reconnect", remoteID)
+ n.Logger.Sugar().Warnf("no address to reconnect to peer %s", remoteID)
}
return
}
var message Message
if err := json.Unmarshal(data, &message); err != nil {
- n.Logger.Sugar().Errorf("failed to unmarshal message: %v\n", err)
+ n.Logger.Sugar().Errorf("failed to unmarshal message from %s: %v\n", remoteAddr, err)
continue
}
- n.Logger.Sugar().Infof("received message from '%s': %s", remoteID, string(message.Payload))
+ n.Logger.Sugar().Infof("received message from '%s' (%s): %s", message.Source, remoteAddr, string(message.Payload))
- n.OnReceiveFn(message)
+ if n.OnReceiveFn != nil {
+ n.OnReceiveFn(message)
+ }
}
}
// retryConnect attempts to connect to a remote peer.
func (n *TCPNetwork) retryConnect(remoteID NetworkID, addr string) {
- for {
+ if addr == "" {
+ n.Logger.Sugar().Warnf("no address to reconnect to peer %s", remoteID)
+ n.removeConnection(remoteID)
+ return
+ }
+
+ retryDelay := n.RetryDelay
+ for !n.isClosed {
n.Lock()
- _, exists := n.connections[remoteID]
+ _, connected := n.connections[remoteID]
n.Unlock()
- if exists {
- time.Sleep(5 * time.Second)
- continue
+ if connected {
+ if n.connections[remoteID].Conn != nil {
+ time.Sleep(5 * time.Second)
+ continue
+ }
}
- if addr == "" {
- n.Logger.Sugar().Warnf("no address to retry connection for peer %s", remoteID)
+ conn, err := net.Dial("tcp", addr)
+
+ if err == nil {
+ n.Logger.Sugar().Infof("successfully connected to peer %s (%s)!", remoteID, addr)
n.Lock()
- delete(n.peerAddresses, remoteID)
+ n.connections[remoteID] = PeerConnection{Conn: conn, Address: addr}
n.Unlock()
+ go n.handleConnection(conn)
return
- }
-
- conn, err := net.Dial("tcp", addr)
-
- if err != nil {
- n.Logger.Sugar().Errorf("failed to connect to %s (%s): %v. Retrying in %v...", remoteID, addr, err, n.RetryDelay)
- time.Sleep(n.RetryDelay)
- if !n.isClosed && n.RetryDelay < 2*time.Minute {
- n.RetryDelay *= 2
- } else if !n.isClosed {
- n.Lock()
- delete(n.connections, remoteID)
- delete(n.peerAddresses, remoteID)
- n.Unlock()
- n.Logger.Sugar().Infof("stopped retrying and removed peer %s", remoteID)
+ } else {
+ n.Logger.Sugar().Errorf("failed to connect to %s (%s): %v. Retrying in %v...", remoteID, addr, err, retryDelay)
+ select {
+ case <-time.After(retryDelay):
+ if retryDelay < 2*time.Minute {
+ retryDelay *= 2
+ }
+ case <-n.closed():
+ n.Logger.Info("retryConnect stopped due to network closure (inner for)")
return
- } else {
- return // Exit if the network is closed
}
- continue
}
+ }
+ n.Logger.Info("retryConnect stopped due to network closure")
+}
- n.Lock()
- n.connections[remoteID] = conn
- n.Unlock()
-
- n.Logger.Sugar().Infof("successfully connected to peer %s (%s)!", remoteID, addr)
+func (n *TCPNetwork) closed() <-chan struct{} {
+ ch := make(chan struct{})
+ go func() {
+ <-n.listenerClosed()
+ close(ch)
+ }()
+ return ch
+}
- go n.listenForMessages(conn)
- return
+func (n *TCPNetwork) listenerClosed() <-chan struct{} {
+ if n.listener == nil {
+ ch := make(chan struct{})
+ close(ch)
+ return ch
}
+ done := make(chan struct{})
+ go func() {
+ n.listener.Accept() // This will block until closed
+ close(done)
+ }()
+ return done
}
diff --git a/pkg/p2p/network_test.go b/pkg/p2p/network_test.go
index faad9d4..fd5dde0 100644
--- a/pkg/p2p/network_test.go
+++ b/pkg/p2p/network_test.go
@@ -13,7 +13,7 @@ func TestPeerToPeerCommunication(t *testing.T) {
// Create a mock of the first peer (peer-1)
peer1Opts := TCPNetworkOpts{
ListenAddr: ":9001",
- HandshakeFn: func() error { return nil },
+ HandshakeFn: DefaultHandshake,
RetryDelay: time.Second * 2,
Logger: zap.L(),
}
@@ -26,7 +26,7 @@ func TestPeerToPeerCommunication(t *testing.T) {
// Create a mock of the second peer (peer-2)
peer2Opts := TCPNetworkOpts{
ListenAddr: ":9002",
- HandshakeFn: func() error { return nil },
+ HandshakeFn: DefaultHandshake,
RetryDelay: time.Second * 2,
Logger: zap.L(),
OnReceiveFn: func(msg Message) {
diff --git a/pkg/ui/views/play_api.go b/pkg/ui/views/play_api.go
index 6cc0c6e..c098930 100644
--- a/pkg/ui/views/play_api.go
+++ b/pkg/ui/views/play_api.go
@@ -3,6 +3,7 @@ package views
import (
"encoding/json"
"fmt"
+ "net"
"net/http"
"os"
"strconv"
@@ -45,8 +46,8 @@ func (m *PlayModel) handlePlayResponse(msg playResponse) (tea.Model, tea.Cmd) {
logger, _ := logger.GetLogger()
callbackCompleted := make(chan bool)
- m.network = multiplayer.NewGameNetwork(fmt.Sprintf("%s-1", m.playName), fmt.Sprintf("%s:%d", msg.Ok.IP, msg.Ok.Port), func() error {
- close(callbackCompleted)
+ m.network = multiplayer.NewGameNetwork(fmt.Sprintf("%s-1", m.playName), fmt.Sprintf("%s:%d", msg.Ok.IP, msg.Ok.Port), func(net.Conn) error {
+ callbackCompleted <- true
return nil
}, logger)