summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorSanto Cariotti <santo@dcariotti.me>2025-04-17 22:28:08 +0200
committerSanto Cariotti <santo@dcariotti.me>2025-04-17 22:28:08 +0200
commit544977d54effa7804386aa40f30a87f5e2365efa (patch)
treeaa6896cfa3db9c2e61b219f6995e25da64255f30 /internal
parent8255fbdd7d9d595e71545b7c6909114024527a34 (diff)
Move internal/network package to pkg/p2p
Diffstat (limited to 'internal')
-rw-r--r--internal/api/handlers/handlers.go4
-rw-r--r--internal/network/ip.go37
-rw-r--r--internal/network/network.go238
-rw-r--r--internal/network/network_test.go79
-rw-r--r--internal/network/session.go23
5 files changed, 2 insertions, 379 deletions
diff --git a/internal/api/handlers/handlers.go b/internal/api/handlers/handlers.go
index 6d1b4e3..c8b7425 100644
--- a/internal/api/handlers/handlers.go
+++ b/internal/api/handlers/handlers.go
@@ -9,7 +9,7 @@ import (
"github.com/boozec/rahanna/internal/api/auth"
"github.com/boozec/rahanna/internal/api/database"
"github.com/boozec/rahanna/internal/logger"
- "github.com/boozec/rahanna/internal/network"
+ "github.com/boozec/rahanna/pkg/p2p"
"github.com/gorilla/mux"
"gorm.io/gorm"
)
@@ -116,7 +116,7 @@ func NewPlay(w http.ResponseWriter, r *http.Request) {
db, _ := database.GetDb()
- name := network.NewSession()
+ name := p2p.NewSession()
play := database.Game{
Player1ID: claims.UserID,
Player2ID: nil,
diff --git a/internal/network/ip.go b/internal/network/ip.go
deleted file mode 100644
index ec1e984..0000000
--- a/internal/network/ip.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package network
-
-import (
- "fmt"
- "math/rand"
- "net"
-
- "github.com/boozec/rahanna/internal/logger"
-)
-
-// Connect a DNS to get the address
-func GetOutboundIP() net.IP {
- log, _ := logger.GetLogger()
- conn, err := net.Dial("udp", "8.8.8.8:80")
- if err != nil {
- log.Sugar().Error("err", err)
- }
- defer conn.Close()
-
- localAddr := conn.LocalAddr().(*net.UDPAddr)
-
- return localAddr.IP
-}
-
-// Returns a random available port on the node
-func GetRandomAvailablePort() (int, error) {
- for i := 0; i < 100; i += 1 {
- port := rand.Intn(65535-1024) + 1024
- addr := fmt.Sprintf(":%d", port)
- ln, err := net.Listen("tcp", addr)
- if err == nil {
- defer ln.Close()
- return port, nil
- }
- }
- return 0, fmt.Errorf("failed to find an available port after multiple attempts")
-}
diff --git a/internal/network/network.go b/internal/network/network.go
deleted file mode 100644
index cec024f..0000000
--- a/internal/network/network.go
+++ /dev/null
@@ -1,238 +0,0 @@
-package network
-
-import (
- "bufio"
- "encoding/json"
- "errors"
- "fmt"
- "net"
- "sync"
- "time"
-
- "go.uber.org/zap"
-)
-
-// `Message` represents a structured message on this network.
-type Message struct {
- Timestamp int64 `json:"timestamp"`
- Source string `json:"source"`
- Payload []byte `json:"payload"`
-}
-
-// A network ID is represented by a string
-type NetworkID string
-
-// This type represents the function that is called every time a new message
-// arrives to the server.
-type NetworkMessageReceiveFunc func(msg Message)
-
-// This type represent the callback function invokes every new handshake between
-// two peers
-type NetworkHandshakeFunc func() error
-
-func DefaultHandshake() error {
- return nil
-}
-
-// Network options to define on new `TCPNetwork`
-type TCPNetworkOpts struct {
- ListenAddr string
- RetryDelay time.Duration
- HandshakeFn NetworkHandshakeFunc
- OnReceiveFn NetworkMessageReceiveFunc
- Logger *zap.Logger
-}
-
-// TCPNetwork represents a full-duplex TCP peer.
-type TCPNetwork struct {
- sync.Mutex
- TCPNetworkOpts
-
- id NetworkID
- listener net.Listener
- connections map[NetworkID]net.Conn
-}
-
-// Initiliaze a new TCP network
-func NewTCPNetwork(localID NetworkID, opts TCPNetworkOpts) *TCPNetwork {
- n := &TCPNetwork{
- TCPNetworkOpts: opts,
- id: localID,
- connections: make(map[NetworkID]net.Conn),
- }
-
- go n.startServer()
-
- return n
-}
-
-// Close listener' connection
-func (n *TCPNetwork) Close() error {
- return n.listener.Close()
-}
-
-// Add a new peer connection to the local peer
-func (n *TCPNetwork) AddPeer(remoteID NetworkID, addr string) {
- go n.retryConnect(remoteID, addr)
-}
-
-// Send methods is used to send a message to a specified remote peer
-func (n *TCPNetwork) Send(remoteID NetworkID, payload []byte) error {
- n.Lock()
- conn, exists := n.connections[remoteID]
- n.Unlock()
-
- if !exists {
- return fmt.Errorf("not connected to peer %s", remoteID)
- }
-
- msg := Message{
- Payload: payload,
- Source: n.listener.Addr().String(),
- Timestamp: time.Now().Unix(),
- }
-
- data, err := json.Marshal(msg)
- if err != nil {
- return fmt.Errorf("failed to marshal message: %v", err)
- }
-
- _, err = conn.Write(append(data, '\n'))
- if err != nil {
- n.Logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err)
- n.Lock()
- delete(n.connections, remoteID)
- n.Unlock()
-
- go n.retryConnect(remoteID, "")
-
- return fmt.Errorf("failed to send message: %v", err)
- }
-
- return nil
-}
-
-// RegisterHandler registers a callback for a message type.
-func (n *TCPNetwork) RegisterHandler(callback NetworkMessageReceiveFunc) {
- n.OnReceiveFn = callback
-}
-
-// startServer starts a TCP server to accept connections.
-func (n *TCPNetwork) startServer() error {
- var err error
-
- n.listener, err = net.Listen("tcp", n.ListenAddr)
- if err != nil {
- n.Logger.Sugar().Errorf("failed to start server: %v", err)
- return err
- }
-
- go n.listenLoop()
-
- n.Logger.Sugar().Infof("server started on %s\n", n.ListenAddr)
-
- return nil
-
-}
-
-func (n *TCPNetwork) listenLoop() error {
- for {
- conn, err := n.listener.Accept()
- if errors.Is(err, net.ErrClosed) {
- n.Logger.Sugar().Errorf("connection is closed in such a way: %v\n", err)
- return err
- }
-
- if err != nil {
- n.Logger.Sugar().Errorf("failed to accept connection: %v\n", err)
- continue
- }
-
- remoteAddr := conn.RemoteAddr().String()
- n.Lock()
- n.connections[NetworkID(remoteAddr)] = conn
- if err := n.HandshakeFn(); err != nil {
- n.Logger.Sugar().Errorf("error on handshaking: %v\n", err)
- return err
- }
- n.Unlock()
- n.RetryDelay = 2 * time.Second
-
- n.Logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr)
-
- // Read loop
- go n.listenForMessages(conn)
- }
-}
-
-// listenForMessages listens for incoming messages.
-func (n *TCPNetwork) listenForMessages(conn net.Conn) {
- reader := bufio.NewReader(conn)
-
- for {
- data, err := reader.ReadBytes('\n')
- if err != nil {
- n.Logger.Debug("connection lost. Reconnecting...")
- n.Lock()
-
- // FIXME: a better way to re-establish the connection between peer
- for id, c := range n.connections {
- if c == conn {
- delete(n.connections, id)
- go n.retryConnect(id, "")
- break
- }
- }
- n.Unlock()
- return
- }
-
- var message Message
- if err := json.Unmarshal(data, &message); err != nil {
- n.Logger.Sugar().Errorf("failed to unmarshal message: %v\n", err)
- continue
- }
-
- n.Logger.Sugar().Infof("Received message from '%s': %s", message.Source, string(message.Payload))
-
- n.OnReceiveFn(message)
- }
-}
-
-// retryConnect attempts to connect to a remote peer.
-func (n *TCPNetwork) retryConnect(remoteID NetworkID, addr string) {
- for {
- n.Lock()
- _, exists := n.connections[remoteID]
- n.Unlock()
-
- if exists {
- time.Sleep(5 * time.Second)
- continue
- }
-
- conn, err := net.Dial("tcp", addr)
-
- if err != nil {
- n.Logger.Sugar().Errorf("failed to connect to %s: %v. Retrying in %v...", remoteID, err, n.RetryDelay)
- time.Sleep(n.RetryDelay)
- if n.RetryDelay < 30*time.Second {
- n.RetryDelay *= 2
- } else {
- n.Lock()
- delete(n.connections, remoteID)
- n.Unlock()
- n.Logger.Sugar().Infof("removed %s connection", remoteID)
- return
- }
- continue
- }
-
- n.Lock()
- n.connections[remoteID] = conn
- n.Unlock()
- n.Logger.Sugar().Infof("successfully connected to peer %s!", remoteID)
-
- go n.listenForMessages(conn)
- }
-}
diff --git a/internal/network/network_test.go b/internal/network/network_test.go
deleted file mode 100644
index 1bc42fb..0000000
--- a/internal/network/network_test.go
+++ /dev/null
@@ -1,79 +0,0 @@
-package network
-
-import (
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
- "go.uber.org/zap"
-)
-
-// TestPeerToPeerCommunication tests if two peers can communicate.
-func TestPeerToPeerCommunication(t *testing.T) {
- // Create a mock of the first peer (peer-1)
- peer1Opts := TCPNetworkOpts{
- ListenAddr: ":9001",
- HandshakeFn: func() error { return nil },
- RetryDelay: time.Second * 2,
- Logger: zap.L(),
- }
- peer1 := NewTCPNetwork("peer-1", peer1Opts)
-
- peer1.RegisterHandler(func(msg Message) {
- assert.Equal(t, "Hey from peer-2!", string(msg.Payload))
- })
-
- // Create a mock of the second peer (peer-2)
- peer2Opts := TCPNetworkOpts{
- ListenAddr: ":9002",
- HandshakeFn: func() error { return nil },
- RetryDelay: time.Second * 2,
- Logger: zap.L(),
- OnReceiveFn: func(msg Message) {
- assert.Equal(t, "Hey from peer-1!", string(msg.Payload))
- },
- }
- peer2 := NewTCPNetwork("peer-2", peer2Opts)
-
- // Start the first peer and add the second peer
- go peer1.AddPeer("peer-2", peer2.ListenAddr)
- go peer2.AddPeer("peer-1", peer1.ListenAddr)
-
- // Wait for the connections to be established
- // You might need a little more time based on network delay and retry logic
- time.Sleep(5 * time.Second)
-
- // Send a message from peer-1 to peer-2
- err := peer1.Send("peer-2", []byte("Hey from peer-1!"))
- assert.NoError(t, err)
-
- err = peer2.Send("peer-1", []byte("Hey from peer-2!"))
- assert.NoError(t, err)
-
- // Allow some time for the message to be received and handled
- time.Sleep(2 * time.Second)
-}
-
-// TestSendFailure tests if sending a message fails when no connection exists.
-func TestSendFailure(t *testing.T) {
- peer1Opts := TCPNetworkOpts{
- ListenAddr: ":9001",
- HandshakeFn: DefaultHandshake,
- RetryDelay: time.Second * 2,
- Logger: zap.L(),
- }
- peer1 := NewTCPNetwork("peer-1", peer1Opts)
-
- // Create a mock of the second peer (peer-2)
- peer2Opts := TCPNetworkOpts{
- ListenAddr: ":9002",
- HandshakeFn: DefaultHandshake,
- RetryDelay: time.Second * 2,
- Logger: zap.L(),
- }
- _ = NewTCPNetwork("peer-2", peer2Opts)
-
- // Attempt to send a message without establishing a connection first
- err := peer1.Send("peer-2", []byte("Message without connection"))
- assert.Error(t, err, "Expected error when sending to a non-connected peer")
-}
diff --git a/internal/network/session.go b/internal/network/session.go
deleted file mode 100644
index a4f60aa..0000000
--- a/internal/network/session.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package network
-
-import (
- "math/rand"
-)
-
-var adjectives = []string{
- "adamant", "adept", "adventurous", "arcadian", "auspicious",
- "awesome", "blossoming", "brave", "charming", "chatty",
- "circular", "considerate", "cubic", "curious", "delighted",
-}
-
-var nouns = []string{
- "aardvark", "accordion", "apple", "apricot", "bee",
- "brachiosaur", "cactus", "capsicum", "clarinet", "cowbell",
- "crab", "cuckoo", "cymbal", "diplodocus", "donkey",
-}
-
-func NewSession() string {
- noun := nouns[rand.Intn(len(nouns))]
- adjective := adjectives[rand.Intn(len(adjectives))]
- return noun + "-" + adjective
-}