summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanto Cariotti <santo@dcariotti.me>2025-04-16 11:46:04 +0200
committerSanto Cariotti <santo@dcariotti.me>2025-04-16 11:46:04 +0200
commitba4afeb4ee19c24b393ec21d374bdd752651c1a6 (patch)
tree2082deaddd348439605a7209fbffd4a355ff7b37
parent76f46e54175253d4b2ba61b9cb8f2525a48c15d8 (diff)
Remove topics on network
-rw-r--r--cmd/ui/main.go2
-rw-r--r--internal/network/network.go262
-rw-r--r--internal/network/network_test.go63
-rw-r--r--pkg/ui/multiplayer/multiplayer.go13
-rw-r--r--pkg/ui/views/game.go18
-rw-r--r--pkg/ui/views/play.go16
6 files changed, 215 insertions, 159 deletions
diff --git a/cmd/ui/main.go b/cmd/ui/main.go
index 0b8d4bb..97f894a 100644
--- a/cmd/ui/main.go
+++ b/cmd/ui/main.go
@@ -3,12 +3,14 @@ package main
import (
"log"
+ "github.com/boozec/rahanna/internal/logger"
"github.com/boozec/rahanna/pkg/ui/views"
tea "github.com/charmbracelet/bubbletea"
)
func main() {
views.ClearScreen()
+ _ = logger.InitLogger("rahanna-ui.log")
p := tea.NewProgram(views.NewRahannaModel(), tea.WithAltScreen())
diff --git a/internal/network/network.go b/internal/network/network.go
index 8b6c686..0b4f4aa 100644
--- a/internal/network/network.go
+++ b/internal/network/network.go
@@ -3,133 +3,81 @@ package network
import (
"bufio"
"encoding/json"
+ "errors"
"fmt"
"net"
"sync"
"time"
- "github.com/boozec/rahanna/internal/logger"
"go.uber.org/zap"
)
-// PeerInfo represents a peer's ID and IP.
-type PeerInfo struct {
- ID string `json:"id"`
- IP string `json:"ip"`
- Port int `json:"port"`
+// `Message` represents a structured message on this network.
+type Message struct {
+ Timestamp int64 `json:"timestamp"`
+ Source string `json:"source"`
+ Payload []byte `json:"payload"`
}
-// Message represents a structured message.
-type Message struct {
- Type string `json:"type"`
- Payload []byte `json:"payload"`
- Source PeerInfo `json:"source"`
- Target PeerInfo `json:"target"`
- Timestamp int64 `json:"timestamp"`
+// A network ID is represented by a string
+type NetworkID string
+
+// This type represents the function that is called every time a new message
+// arrives to the server.
+type NetworkMessageReceiveFunc func(msg Message)
+
+// This type represent the callback function invokes every new handshake between
+// two peers
+type NetworkHandshakeFunc func() error
+
+func DefaultHandshake() error {
+ return nil
}
-type NetworkCallback func(msg Message)
+// Network options to define on new `TCPNetwork`
+type TCPNetworkOpts struct {
+ ListenAddr string
+ RetryDelay time.Duration
+ HandshakeFn NetworkHandshakeFunc
+ OnReceiveFn NetworkMessageReceiveFunc
+ Logger *zap.Logger
+}
// TCPNetwork represents a full-duplex TCP peer.
type TCPNetwork struct {
- localPeer PeerInfo
- connections map[string]net.Conn
- listener net.Listener
- callbacks map[string]NetworkCallback
- callbacksMu sync.RWMutex
- isConnected bool
- retryDelay time.Duration
- logger *zap.Logger
sync.Mutex
+ TCPNetworkOpts
+
+ id NetworkID
+ listener net.Listener
+ connections map[NetworkID]net.Conn
}
-// initializes a TCP peer
-func NewTCPNetwork(localID, localIP string, localPort int, onReceive func()) *TCPNetwork {
+// Initiliaze a new TCP network
+func NewTCPNetwork(localID NetworkID, opts TCPNetworkOpts) *TCPNetwork {
n := &TCPNetwork{
- localPeer: PeerInfo{ID: localID, IP: localIP, Port: localPort},
- connections: make(map[string]net.Conn),
- callbacks: make(map[string]NetworkCallback),
- isConnected: false,
- retryDelay: 2 * time.Second,
- logger: logger.InitLogger("rahanna.log"),
+ TCPNetworkOpts: opts,
+ id: localID,
+ connections: make(map[NetworkID]net.Conn),
}
- go n.startServer(onReceive)
+ go n.startServer()
return n
}
-// Add a new peer connection to the local peer
-func (n *TCPNetwork) AddPeer(remoteID string, remoteIP string, remotePort int) {
- go n.retryConnect(remoteID, remoteIP, remotePort)
-}
-
-// startServer starts a TCP server to accept connections.
-func (n *TCPNetwork) startServer(callback func()) {
- address := fmt.Sprintf("%s:%d", n.localPeer.IP, n.localPeer.Port)
- listener, err := net.Listen("tcp", address)
- if err != nil {
- n.logger.Sugar().Errorf("failed to start server: %v", err)
- return
- }
- n.listener = listener
- n.logger.Sugar().Infof("server started on %s\n", address)
-
- for {
- conn, err := listener.Accept()
- if err != nil {
- n.logger.Sugar().Errorf("failed to accept connection: %v\n", err)
- continue
- }
-
- remoteAddr := conn.RemoteAddr().String()
- n.Lock()
- n.connections[remoteAddr] = conn
- callback()
- n.Unlock()
- n.isConnected = true
- n.retryDelay = 2 * time.Second
-
- n.logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr)
- go n.listenForMessages(conn)
- }
+// Close listener' connection
+func (n *TCPNetwork) Close() error {
+ return n.listener.Close()
}
-// retryConnect attempts to connect to a remote peer.
-func (n *TCPNetwork) retryConnect(remoteID, remoteIP string, remotePort int) {
- for {
- n.Lock()
- _, exists := n.connections[remoteID]
- n.Unlock()
-
- if exists {
- time.Sleep(5 * time.Second)
- continue
- }
-
- address := fmt.Sprintf("%s:%d", remoteIP, remotePort)
- conn, err := net.Dial("tcp", address)
-
- if err != nil {
- n.logger.Sugar().Errorf("failed to connect to %s: %v. Retrying in %v...", remoteID, err, n.retryDelay)
- time.Sleep(n.retryDelay)
- if n.retryDelay < 30*time.Second {
- n.retryDelay *= 2
- }
- continue
- }
-
- n.Lock()
- n.connections[remoteID] = conn
- n.Unlock()
- n.logger.Sugar().Infof("successfully connected to peer %s!", remoteID)
-
- go n.listenForMessages(conn)
- }
+// Add a new peer connection to the local peer
+func (n *TCPNetwork) AddPeer(remoteID NetworkID, addr string) {
+ go n.retryConnect(remoteID, addr)
}
-// Send sends a message to a specified remote peer.
-func (n *TCPNetwork) Send(remoteID, messageType string, payload []byte) error {
+// Send methods is used to send a message to a specified remote peer
+func (n *TCPNetwork) Send(remoteID NetworkID, payload []byte) error {
n.Lock()
conn, exists := n.connections[remoteID]
n.Unlock()
@@ -139,10 +87,8 @@ func (n *TCPNetwork) Send(remoteID, messageType string, payload []byte) error {
}
msg := Message{
- Type: messageType,
Payload: payload,
- Source: n.localPeer,
- Target: PeerInfo{ID: remoteID},
+ Source: n.listener.Addr().String(),
Timestamp: time.Now().Unix(),
}
@@ -153,11 +99,13 @@ func (n *TCPNetwork) Send(remoteID, messageType string, payload []byte) error {
_, err = conn.Write(append(data, '\n'))
if err != nil {
- n.logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err)
+ n.Logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err)
n.Lock()
delete(n.connections, remoteID)
n.Unlock()
- go n.retryConnect(remoteID, "", 0)
+
+ go n.retryConnect(remoteID, "")
+
return fmt.Errorf("failed to send message: %v", err)
}
@@ -165,10 +113,56 @@ func (n *TCPNetwork) Send(remoteID, messageType string, payload []byte) error {
}
// RegisterHandler registers a callback for a message type.
-func (n *TCPNetwork) RegisterHandler(messageType string, callback NetworkCallback) {
- n.callbacksMu.Lock()
- n.callbacks[messageType] = callback
- n.callbacksMu.Unlock()
+func (n *TCPNetwork) RegisterHandler(callback NetworkMessageReceiveFunc) {
+ n.OnReceiveFn = callback
+}
+
+// startServer starts a TCP server to accept connections.
+func (n *TCPNetwork) startServer() error {
+ var err error
+
+ n.listener, err = net.Listen("tcp", n.ListenAddr)
+ if err != nil {
+ n.Logger.Sugar().Errorf("failed to start server: %v", err)
+ return err
+ }
+
+ go n.listenLoop()
+
+ n.Logger.Sugar().Infof("server started on %s\n", n.ListenAddr)
+
+ return nil
+
+}
+
+func (n *TCPNetwork) listenLoop() error {
+ for {
+ conn, err := n.listener.Accept()
+ if errors.Is(err, net.ErrClosed) {
+ n.Logger.Sugar().Errorf("connection is closed in such a way: %v\n", err)
+ return err
+ }
+
+ if err != nil {
+ n.Logger.Sugar().Errorf("failed to accept connection: %v\n", err)
+ continue
+ }
+
+ remoteAddr := conn.RemoteAddr().String()
+ n.Lock()
+ n.connections[NetworkID(remoteAddr)] = conn
+ if err := n.HandshakeFn(); err != nil {
+ n.Logger.Sugar().Errorf("error on handshaking: %v\n", err)
+ return err
+ }
+ n.Unlock()
+ n.RetryDelay = 2 * time.Second
+
+ n.Logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr)
+
+ // Read loop
+ go n.listenForMessages(conn)
+ }
}
// listenForMessages listens for incoming messages.
@@ -178,12 +172,14 @@ func (n *TCPNetwork) listenForMessages(conn net.Conn) {
for {
data, err := reader.ReadBytes('\n')
if err != nil {
- n.logger.Debug("connection lost. Reconnecting...")
+ n.Logger.Debug("connection lost. Reconnecting...")
n.Lock()
+
+ // FIXME: a better way to re-establish the connection between peer
for id, c := range n.connections {
if c == conn {
delete(n.connections, id)
- go n.retryConnect(id, "", 0)
+ go n.retryConnect(id, "")
break
}
}
@@ -193,22 +189,48 @@ func (n *TCPNetwork) listenForMessages(conn net.Conn) {
var message Message
if err := json.Unmarshal(data, &message); err != nil {
- n.logger.Sugar().Errorf("failed to unmarshal message: %v\n", err)
+ n.Logger.Sugar().Errorf("failed to unmarshal message: %v\n", err)
continue
}
- n.callbacksMu.RLock()
- callback, exists := n.callbacks[message.Type]
- n.callbacksMu.RUnlock()
+ n.OnReceiveFn(message)
+ }
+}
+
+// retryConnect attempts to connect to a remote peer.
+func (n *TCPNetwork) retryConnect(remoteID NetworkID, addr string) {
+ for {
+ n.Lock()
+ _, exists := n.connections[remoteID]
+ n.Unlock()
if exists {
- go callback(message)
+ time.Sleep(5 * time.Second)
+ continue
}
- }
-}
-func (n *TCPNetwork) IsConnected() bool {
- n.Lock()
- defer n.Unlock()
- return n.isConnected
+ conn, err := net.Dial("tcp", addr)
+
+ if err != nil {
+ n.Logger.Sugar().Errorf("failed to connect to %s: %v. Retrying in %v...", remoteID, err, n.RetryDelay)
+ time.Sleep(n.RetryDelay)
+ if n.RetryDelay < 30*time.Second {
+ n.RetryDelay *= 2
+ } else {
+ n.Lock()
+ delete(n.connections, remoteID)
+ n.Unlock()
+ n.Logger.Sugar().Infof("removed %s connection", remoteID)
+ return
+ }
+ continue
+ }
+
+ n.Lock()
+ n.connections[remoteID] = conn
+ n.Unlock()
+ n.Logger.Sugar().Infof("successfully connected to peer %s!", remoteID)
+
+ go n.listenForMessages(conn)
+ }
}
diff --git a/internal/network/network_test.go b/internal/network/network_test.go
index 4f3211c..1bc42fb 100644
--- a/internal/network/network_test.go
+++ b/internal/network/network_test.go
@@ -5,36 +5,49 @@ import (
"time"
"github.com/stretchr/testify/assert"
+ "go.uber.org/zap"
)
// TestPeerToPeerCommunication tests if two peers can communicate.
func TestPeerToPeerCommunication(t *testing.T) {
// Create a mock of the first peer (peer-1)
- peer1IP := "127.0.0.1"
- peer1Port := 9001
- peer1 := NewTCPNetwork("peer-1", peer1IP, peer1Port, func() {})
+ peer1Opts := TCPNetworkOpts{
+ ListenAddr: ":9001",
+ HandshakeFn: func() error { return nil },
+ RetryDelay: time.Second * 2,
+ Logger: zap.L(),
+ }
+ peer1 := NewTCPNetwork("peer-1", peer1Opts)
- // Create a mock of the second peer (peer-2)
- peer2IP := "127.0.0.1"
- peer2Port := 9002
- peer2 := NewTCPNetwork("peer-2", peer2IP, peer2Port, func() {})
-
- // Register a message handler on peer-2 to receive the message from peer-1
- peer2.RegisterHandler("chat", func(msg Message) {
- assert.Equal(t, "peer-1", msg.Source.ID)
- assert.Equal(t, "Hey from peer-1!", string(msg.Payload))
+ peer1.RegisterHandler(func(msg Message) {
+ assert.Equal(t, "Hey from peer-2!", string(msg.Payload))
})
+ // Create a mock of the second peer (peer-2)
+ peer2Opts := TCPNetworkOpts{
+ ListenAddr: ":9002",
+ HandshakeFn: func() error { return nil },
+ RetryDelay: time.Second * 2,
+ Logger: zap.L(),
+ OnReceiveFn: func(msg Message) {
+ assert.Equal(t, "Hey from peer-1!", string(msg.Payload))
+ },
+ }
+ peer2 := NewTCPNetwork("peer-2", peer2Opts)
+
// Start the first peer and add the second peer
- go peer1.AddPeer("peer-2", peer2IP, peer2Port)
- go peer2.AddPeer("peer-1", peer1IP, peer1Port)
+ go peer1.AddPeer("peer-2", peer2.ListenAddr)
+ go peer2.AddPeer("peer-1", peer1.ListenAddr)
// Wait for the connections to be established
// You might need a little more time based on network delay and retry logic
time.Sleep(5 * time.Second)
// Send a message from peer-1 to peer-2
- err := peer1.Send("peer-2", "chat", []byte("Hey from peer-1!"))
+ err := peer1.Send("peer-2", []byte("Hey from peer-1!"))
+ assert.NoError(t, err)
+
+ err = peer2.Send("peer-1", []byte("Hey from peer-2!"))
assert.NoError(t, err)
// Allow some time for the message to be received and handled
@@ -43,10 +56,24 @@ func TestPeerToPeerCommunication(t *testing.T) {
// TestSendFailure tests if sending a message fails when no connection exists.
func TestSendFailure(t *testing.T) {
- peer1 := NewTCPNetwork("peer-1", "127.0.0.1", 9001, func() {})
- _ = NewTCPNetwork("peer-2", "127.0.0.1", 9002, func() {})
+ peer1Opts := TCPNetworkOpts{
+ ListenAddr: ":9001",
+ HandshakeFn: DefaultHandshake,
+ RetryDelay: time.Second * 2,
+ Logger: zap.L(),
+ }
+ peer1 := NewTCPNetwork("peer-1", peer1Opts)
+
+ // Create a mock of the second peer (peer-2)
+ peer2Opts := TCPNetworkOpts{
+ ListenAddr: ":9002",
+ HandshakeFn: DefaultHandshake,
+ RetryDelay: time.Second * 2,
+ Logger: zap.L(),
+ }
+ _ = NewTCPNetwork("peer-2", peer2Opts)
// Attempt to send a message without establishing a connection first
- err := peer1.Send("peer-2", "chat", []byte("Message without connection"))
+ err := peer1.Send("peer-2", []byte("Message without connection"))
assert.Error(t, err, "Expected error when sending to a non-connected peer")
}
diff --git a/pkg/ui/multiplayer/multiplayer.go b/pkg/ui/multiplayer/multiplayer.go
index 436388f..1680035 100644
--- a/pkg/ui/multiplayer/multiplayer.go
+++ b/pkg/ui/multiplayer/multiplayer.go
@@ -1,7 +1,10 @@
package multiplayer
import (
+ "time"
+
"github.com/boozec/rahanna/internal/network"
+ "go.uber.org/zap"
)
type GameNetwork struct {
@@ -9,8 +12,14 @@ type GameNetwork struct {
Peer string
}
-func NewGameNetwork(localID, localIP string, localPort int, callback func()) *GameNetwork {
- server := network.NewTCPNetwork(localID, localIP, localPort, callback)
+func NewGameNetwork(localID string, address string, onHandshake network.NetworkHandshakeFunc, logger *zap.Logger) *GameNetwork {
+ opts := network.TCPNetworkOpts{
+ ListenAddr: address,
+ HandshakeFn: onHandshake,
+ RetryDelay: time.Second * 2,
+ Logger: logger,
+ }
+ server := network.NewTCPNetwork(network.NetworkID(localID), opts)
peer := ""
return &GameNetwork{
Server: server,
diff --git a/pkg/ui/views/game.go b/pkg/ui/views/game.go
index 52d8459..5a0972c 100644
--- a/pkg/ui/views/game.go
+++ b/pkg/ui/views/game.go
@@ -4,8 +4,6 @@ import (
"encoding/json"
"fmt"
"os"
- "strconv"
- "strings"
"github.com/boozec/rahanna/internal/api/database"
"github.com/boozec/rahanna/pkg/ui/multiplayer"
@@ -197,21 +195,13 @@ func (m *GameModel) getGame() tea.Cmd {
// Establish peer connection
if m.peer == "peer-1" {
if game.IP2 != "" {
- ipParts := strings.Split(game.IP2, ":")
- if len(ipParts) == 2 {
- remoteIP := ipParts[0]
- remotePortInt, _ := strconv.Atoi(ipParts[1])
- go m.network.Server.AddPeer("peer-2", remoteIP, remotePortInt)
- }
+ remote := game.IP2
+ go m.network.Server.AddPeer("peer-2", remote)
}
} else {
if game.IP1 != "" {
- ipParts := strings.Split(game.IP1, ":")
- if len(ipParts) == 2 {
- remoteIP := ipParts[0]
- remotePortInt, _ := strconv.Atoi(ipParts[1])
- go m.network.Server.AddPeer("peer-1", remoteIP, remotePortInt)
- }
+ remote := game.IP1
+ go m.network.Server.AddPeer("peer-1", remote)
}
}
diff --git a/pkg/ui/views/play.go b/pkg/ui/views/play.go
index 4801556..cf1f6ca 100644
--- a/pkg/ui/views/play.go
+++ b/pkg/ui/views/play.go
@@ -9,6 +9,7 @@ import (
"strings"
"github.com/boozec/rahanna/internal/api/database"
+ "github.com/boozec/rahanna/internal/logger"
"github.com/boozec/rahanna/internal/network"
"github.com/boozec/rahanna/pkg/ui/multiplayer"
"github.com/charmbracelet/bubbles/key"
@@ -267,10 +268,11 @@ func (m *PlayModel) handlePlayResponse(msg playResponse) (tea.Model, tea.Cmd) {
} else {
m.playName = msg.Ok.Name
m.currentGameId = msg.Ok.GameID
-
- m.network = multiplayer.NewGameNetwork("peer-1", msg.Ok.IP, msg.Ok.Port, func() {
- close(start)
- })
+ logger, _ := logger.GetLogger()
+ m.network = multiplayer.NewGameNetwork("peer-1", fmt.Sprintf("%s:%d", msg.Ok.IP, msg.Ok.Port), func() error {
+ start <- 1
+ return nil
+ }, logger)
}
return m, nil
@@ -284,7 +286,11 @@ func (m *PlayModel) handleGameResponse(msg database.Game) (tea.Model, tea.Cmd) {
if len(ip) == 2 {
localIP := ip[0]
localPort, _ := strconv.ParseInt(ip[1], 10, 32)
- network := multiplayer.NewGameNetwork("peer-2", localIP, int(localPort), func() {})
+
+ logger, _ := logger.GetLogger()
+ network := multiplayer.NewGameNetwork("peer-2", fmt.Sprintf("%s:%d", localIP, localPort), func() error {
+ return nil
+ }, logger)
return m, SwitchModelCmd(NewGameModel(m.width, m.height+1, "peer-2", m.game.ID, network))
}