diff options
author | Santo Cariotti <santo@dcariotti.me> | 2025-03-29 17:21:27 +0100 |
---|---|---|
committer | Santo Cariotti <santo@dcariotti.me> | 2025-03-29 17:21:27 +0100 |
commit | d8295b3f23bd9af7b8947bccb990601aa1739a0b (patch) | |
tree | 8393b80aa821f0973e86e9ff1b7e119acf507304 | |
parent | e356e1b4cb32ac0bb38907feff9ebee5d9ad79fd (diff) |
Add network module to establish a TCP full-duplex connection
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 9 | ||||
-rw-r--r-- | network/network.go | 212 | ||||
-rw-r--r-- | network/network_test.go | 52 |
4 files changed, 274 insertions, 1 deletions
@@ -11,11 +11,13 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/kr/pretty v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect + gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) @@ -10,6 +10,12 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= @@ -44,7 +50,8 @@ google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/network/network.go b/network/network.go new file mode 100644 index 0000000..0b7fab7 --- /dev/null +++ b/network/network.go @@ -0,0 +1,212 @@ +package network + +import ( + "bufio" + "encoding/json" + "fmt" + "go.uber.org/zap" + "net" + "sync" + "time" +) + +var logger *zap.Logger + +// PeerInfo represents a peer's ID and IP. +type PeerInfo struct { + ID string `json:"id"` + IP string `json:"ip"` + Port int `json:"port"` +} + +// Message represents a structured message. +type Message struct { + Type string `json:"type"` + Payload []byte `json:"payload"` + Source PeerInfo `json:"source"` + Target PeerInfo `json:"target"` + Timestamp int64 `json:"timestamp"` +} + +type NetworkCallback func(msg Message) + +// TCPNetwork represents a full-duplex TCP peer. +type TCPNetwork struct { + localPeer PeerInfo + connections map[string]net.Conn + listener net.Listener + callbacks map[string]NetworkCallback + callbacksMu sync.RWMutex + isConnected bool + retryDelay time.Duration + sync.Mutex +} + +// initializes a TCP peer +func NewTCPNetwork(localID, localIP string, localPort int) *TCPNetwork { + n := &TCPNetwork{ + localPeer: PeerInfo{ID: localID, IP: localIP, Port: localPort}, + connections: make(map[string]net.Conn), + callbacks: make(map[string]NetworkCallback), + isConnected: false, + retryDelay: 2 * time.Second, + } + + go n.startServer() + + logger, _ = zap.NewProduction() + + return n +} + +// Add a new peer connection to the local peer +func (n *TCPNetwork) AddPeer(remoteID string, remoteIP string, remotePort int) { + go n.retryConnect(remoteID, remoteIP, remotePort) +} + +// startServer starts a TCP server to accept connections. +func (n *TCPNetwork) startServer() { + address := fmt.Sprintf("%s:%d", n.localPeer.IP, n.localPeer.Port) + listener, err := net.Listen("tcp", address) + if err != nil { + logger.Sugar().Errorf("failed to start server: %v", err) + } + n.listener = listener + logger.Sugar().Infof("server started on %s\n", address) + + for { + conn, err := listener.Accept() + if err != nil { + logger.Sugar().Errorf("failed to accept connection: %v\n", err) + continue + } + + remoteAddr := conn.RemoteAddr().String() + n.Lock() + n.connections[remoteAddr] = conn + n.Unlock() + n.isConnected = true + n.retryDelay = 2 * time.Second + + logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr) + go n.listenForMessages(conn) + } +} + +// retryConnect attempts to connect to a remote peer. +func (n *TCPNetwork) retryConnect(remoteID, remoteIP string, remotePort int) { + for { + n.Lock() + _, exists := n.connections[remoteID] + n.Unlock() + + if exists { + time.Sleep(5 * time.Second) + continue + } + + address := fmt.Sprintf("%s:%d", remoteIP, remotePort) + conn, err := net.Dial("tcp", address) + + if err != nil { + logger.Sugar().Errorf("failed to connect to %s: %v. Retrying in %v...", remoteID, err, n.retryDelay) + time.Sleep(n.retryDelay) + if n.retryDelay < 30*time.Second { + n.retryDelay *= 2 + } + continue + } + + n.Lock() + n.connections[remoteID] = conn + n.Unlock() + logger.Sugar().Infof("successfully connected to peer %s!", remoteID) + + go n.listenForMessages(conn) + } +} + +// Send sends a message to a specified remote peer. +func (n *TCPNetwork) Send(remoteID, messageType string, payload []byte) error { + n.Lock() + conn, exists := n.connections[remoteID] + n.Unlock() + + if !exists { + return fmt.Errorf("not connected to peer %s", remoteID) + } + + msg := Message{ + Type: messageType, + Payload: payload, + Source: n.localPeer, + Target: PeerInfo{ID: remoteID}, + Timestamp: time.Now().Unix(), + } + + data, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message: %v", err) + } + + _, err = conn.Write(append(data, '\n')) + if err != nil { + logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err) + n.Lock() + delete(n.connections, remoteID) + n.Unlock() + go n.retryConnect(remoteID, "", 0) + return fmt.Errorf("failed to send message: %v", err) + } + + return nil +} + +// RegisterHandler registers a callback for a message type. +func (n *TCPNetwork) RegisterHandler(messageType string, callback NetworkCallback) { + n.callbacksMu.Lock() + n.callbacks[messageType] = callback + n.callbacksMu.Unlock() +} + +// listenForMessages listens for incoming messages. +func (n *TCPNetwork) listenForMessages(conn net.Conn) { + reader := bufio.NewReader(conn) + + for { + data, err := reader.ReadBytes('\n') + if err != nil { + logger.Debug("connection lost. Reconnecting...") + n.Lock() + for id, c := range n.connections { + if c == conn { + delete(n.connections, id) + go n.retryConnect(id, "", 0) + break + } + } + n.Unlock() + return + } + + var message Message + if err := json.Unmarshal(data, &message); err != nil { + logger.Sugar().Errorf("failed to unmarshal message: %v\n", err) + continue + } + + n.callbacksMu.RLock() + callback, exists := n.callbacks[message.Type] + n.callbacksMu.RUnlock() + + if exists { + go callback(message) + } + } +} + +func (n *TCPNetwork) IsConnected() bool { + n.Lock() + defer n.Unlock() + return n.isConnected +} diff --git a/network/network_test.go b/network/network_test.go new file mode 100644 index 0000000..9dbc416 --- /dev/null +++ b/network/network_test.go @@ -0,0 +1,52 @@ +package network + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// TestPeerToPeerCommunication tests if two peers can communicate. +func TestPeerToPeerCommunication(t *testing.T) { + // Create a mock of the first peer (peer-1) + peer1IP := "127.0.0.1" + peer1Port := 9001 + peer1 := NewTCPNetwork("peer-1", peer1IP, peer1Port) + + // Create a mock of the second peer (peer-2) + peer2IP := "127.0.0.1" + peer2Port := 9002 + peer2 := NewTCPNetwork("peer-2", peer2IP, peer2Port) + + // Register a message handler on peer-2 to receive the message from peer-1 + peer2.RegisterHandler("chat", func(msg Message) { + assert.Equal(t, "peer-1", msg.Source.ID) + assert.Equal(t, "Hey from peer-1!", string(msg.Payload)) + }) + + // Start the first peer and add the second peer + go peer1.AddPeer("peer-2", peer2IP, peer2Port) + go peer2.AddPeer("peer-1", peer1IP, peer1Port) + + // Wait for the connections to be established + // You might need a little more time based on network delay and retry logic + time.Sleep(5 * time.Second) + + // Send a message from peer-1 to peer-2 + err := peer1.Send("peer-2", "chat", []byte("Hey from peer-1!")) + assert.NoError(t, err) + + // Allow some time for the message to be received and handled + time.Sleep(2 * time.Second) +} + +// TestSendFailure tests if sending a message fails when no connection exists. +func TestSendFailure(t *testing.T) { + peer1 := NewTCPNetwork("peer-1", "127.0.0.1", 9001) + _ = NewTCPNetwork("peer-2", "127.0.0.1", 9002) + + // Attempt to send a message without establishing a connection first + err := peer1.Send("peer-2", "chat", []byte("Message without connection")) + assert.Error(t, err, "Expected error when sending to a non-connected peer") +} |