summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanto Cariotti <santo@dcariotti.me>2025-03-29 17:21:27 +0100
committerSanto Cariotti <santo@dcariotti.me>2025-03-29 17:21:27 +0100
commitd8295b3f23bd9af7b8947bccb990601aa1739a0b (patch)
tree8393b80aa821f0973e86e9ff1b7e119acf507304
parente356e1b4cb32ac0bb38907feff9ebee5d9ad79fd (diff)
Add network module to establish a TCP full-duplex connection
-rw-r--r--go.mod2
-rw-r--r--go.sum9
-rw-r--r--network/network.go212
-rw-r--r--network/network_test.go52
4 files changed, 274 insertions, 1 deletions
diff --git a/go.mod b/go.mod
index c50007c..5e52de7 100644
--- a/go.mod
+++ b/go.mod
@@ -11,11 +11,13 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/kr/pretty v0.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
+ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
diff --git a/go.sum b/go.sum
index 4491c73..079caf1 100644
--- a/go.sum
+++ b/go.sum
@@ -10,6 +10,12 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
@@ -44,7 +50,8 @@ google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg=
google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec=
google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM=
google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
-gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/network/network.go b/network/network.go
new file mode 100644
index 0000000..0b7fab7
--- /dev/null
+++ b/network/network.go
@@ -0,0 +1,212 @@
+package network
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "go.uber.org/zap"
+ "net"
+ "sync"
+ "time"
+)
+
+var logger *zap.Logger
+
+// PeerInfo represents a peer's ID and IP.
+type PeerInfo struct {
+ ID string `json:"id"`
+ IP string `json:"ip"`
+ Port int `json:"port"`
+}
+
+// Message represents a structured message.
+type Message struct {
+ Type string `json:"type"`
+ Payload []byte `json:"payload"`
+ Source PeerInfo `json:"source"`
+ Target PeerInfo `json:"target"`
+ Timestamp int64 `json:"timestamp"`
+}
+
+type NetworkCallback func(msg Message)
+
+// TCPNetwork represents a full-duplex TCP peer.
+type TCPNetwork struct {
+ localPeer PeerInfo
+ connections map[string]net.Conn
+ listener net.Listener
+ callbacks map[string]NetworkCallback
+ callbacksMu sync.RWMutex
+ isConnected bool
+ retryDelay time.Duration
+ sync.Mutex
+}
+
+// initializes a TCP peer
+func NewTCPNetwork(localID, localIP string, localPort int) *TCPNetwork {
+ n := &TCPNetwork{
+ localPeer: PeerInfo{ID: localID, IP: localIP, Port: localPort},
+ connections: make(map[string]net.Conn),
+ callbacks: make(map[string]NetworkCallback),
+ isConnected: false,
+ retryDelay: 2 * time.Second,
+ }
+
+ go n.startServer()
+
+ logger, _ = zap.NewProduction()
+
+ return n
+}
+
+// Add a new peer connection to the local peer
+func (n *TCPNetwork) AddPeer(remoteID string, remoteIP string, remotePort int) {
+ go n.retryConnect(remoteID, remoteIP, remotePort)
+}
+
+// startServer starts a TCP server to accept connections.
+func (n *TCPNetwork) startServer() {
+ address := fmt.Sprintf("%s:%d", n.localPeer.IP, n.localPeer.Port)
+ listener, err := net.Listen("tcp", address)
+ if err != nil {
+ logger.Sugar().Errorf("failed to start server: %v", err)
+ }
+ n.listener = listener
+ logger.Sugar().Infof("server started on %s\n", address)
+
+ for {
+ conn, err := listener.Accept()
+ if err != nil {
+ logger.Sugar().Errorf("failed to accept connection: %v\n", err)
+ continue
+ }
+
+ remoteAddr := conn.RemoteAddr().String()
+ n.Lock()
+ n.connections[remoteAddr] = conn
+ n.Unlock()
+ n.isConnected = true
+ n.retryDelay = 2 * time.Second
+
+ logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr)
+ go n.listenForMessages(conn)
+ }
+}
+
+// retryConnect attempts to connect to a remote peer.
+func (n *TCPNetwork) retryConnect(remoteID, remoteIP string, remotePort int) {
+ for {
+ n.Lock()
+ _, exists := n.connections[remoteID]
+ n.Unlock()
+
+ if exists {
+ time.Sleep(5 * time.Second)
+ continue
+ }
+
+ address := fmt.Sprintf("%s:%d", remoteIP, remotePort)
+ conn, err := net.Dial("tcp", address)
+
+ if err != nil {
+ logger.Sugar().Errorf("failed to connect to %s: %v. Retrying in %v...", remoteID, err, n.retryDelay)
+ time.Sleep(n.retryDelay)
+ if n.retryDelay < 30*time.Second {
+ n.retryDelay *= 2
+ }
+ continue
+ }
+
+ n.Lock()
+ n.connections[remoteID] = conn
+ n.Unlock()
+ logger.Sugar().Infof("successfully connected to peer %s!", remoteID)
+
+ go n.listenForMessages(conn)
+ }
+}
+
+// Send sends a message to a specified remote peer.
+func (n *TCPNetwork) Send(remoteID, messageType string, payload []byte) error {
+ n.Lock()
+ conn, exists := n.connections[remoteID]
+ n.Unlock()
+
+ if !exists {
+ return fmt.Errorf("not connected to peer %s", remoteID)
+ }
+
+ msg := Message{
+ Type: messageType,
+ Payload: payload,
+ Source: n.localPeer,
+ Target: PeerInfo{ID: remoteID},
+ Timestamp: time.Now().Unix(),
+ }
+
+ data, err := json.Marshal(msg)
+ if err != nil {
+ return fmt.Errorf("failed to marshal message: %v", err)
+ }
+
+ _, err = conn.Write(append(data, '\n'))
+ if err != nil {
+ logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err)
+ n.Lock()
+ delete(n.connections, remoteID)
+ n.Unlock()
+ go n.retryConnect(remoteID, "", 0)
+ return fmt.Errorf("failed to send message: %v", err)
+ }
+
+ return nil
+}
+
+// RegisterHandler registers a callback for a message type.
+func (n *TCPNetwork) RegisterHandler(messageType string, callback NetworkCallback) {
+ n.callbacksMu.Lock()
+ n.callbacks[messageType] = callback
+ n.callbacksMu.Unlock()
+}
+
+// listenForMessages listens for incoming messages.
+func (n *TCPNetwork) listenForMessages(conn net.Conn) {
+ reader := bufio.NewReader(conn)
+
+ for {
+ data, err := reader.ReadBytes('\n')
+ if err != nil {
+ logger.Debug("connection lost. Reconnecting...")
+ n.Lock()
+ for id, c := range n.connections {
+ if c == conn {
+ delete(n.connections, id)
+ go n.retryConnect(id, "", 0)
+ break
+ }
+ }
+ n.Unlock()
+ return
+ }
+
+ var message Message
+ if err := json.Unmarshal(data, &message); err != nil {
+ logger.Sugar().Errorf("failed to unmarshal message: %v\n", err)
+ continue
+ }
+
+ n.callbacksMu.RLock()
+ callback, exists := n.callbacks[message.Type]
+ n.callbacksMu.RUnlock()
+
+ if exists {
+ go callback(message)
+ }
+ }
+}
+
+func (n *TCPNetwork) IsConnected() bool {
+ n.Lock()
+ defer n.Unlock()
+ return n.isConnected
+}
diff --git a/network/network_test.go b/network/network_test.go
new file mode 100644
index 0000000..9dbc416
--- /dev/null
+++ b/network/network_test.go
@@ -0,0 +1,52 @@
+package network
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+// TestPeerToPeerCommunication tests if two peers can communicate.
+func TestPeerToPeerCommunication(t *testing.T) {
+ // Create a mock of the first peer (peer-1)
+ peer1IP := "127.0.0.1"
+ peer1Port := 9001
+ peer1 := NewTCPNetwork("peer-1", peer1IP, peer1Port)
+
+ // Create a mock of the second peer (peer-2)
+ peer2IP := "127.0.0.1"
+ peer2Port := 9002
+ peer2 := NewTCPNetwork("peer-2", peer2IP, peer2Port)
+
+ // Register a message handler on peer-2 to receive the message from peer-1
+ peer2.RegisterHandler("chat", func(msg Message) {
+ assert.Equal(t, "peer-1", msg.Source.ID)
+ assert.Equal(t, "Hey from peer-1!", string(msg.Payload))
+ })
+
+ // Start the first peer and add the second peer
+ go peer1.AddPeer("peer-2", peer2IP, peer2Port)
+ go peer2.AddPeer("peer-1", peer1IP, peer1Port)
+
+ // Wait for the connections to be established
+ // You might need a little more time based on network delay and retry logic
+ time.Sleep(5 * time.Second)
+
+ // Send a message from peer-1 to peer-2
+ err := peer1.Send("peer-2", "chat", []byte("Hey from peer-1!"))
+ assert.NoError(t, err)
+
+ // Allow some time for the message to be received and handled
+ time.Sleep(2 * time.Second)
+}
+
+// TestSendFailure tests if sending a message fails when no connection exists.
+func TestSendFailure(t *testing.T) {
+ peer1 := NewTCPNetwork("peer-1", "127.0.0.1", 9001)
+ _ = NewTCPNetwork("peer-2", "127.0.0.1", 9002)
+
+ // Attempt to send a message without establishing a connection first
+ err := peer1.Send("peer-2", "chat", []byte("Message without connection"))
+ assert.Error(t, err, "Expected error when sending to a non-connected peer")
+}