diff options
author | Santo Cariotti <santo@dcariotti.me> | 2025-04-08 14:37:33 +0200 |
---|---|---|
committer | Santo Cariotti <santo@dcariotti.me> | 2025-04-08 14:39:13 +0200 |
commit | 1f0d9ec8452f15c27cd33c4e3874454c35993743 (patch) | |
tree | c453a31ae5eb823aaf48868eea9fc4daf65f108b /network | |
parent | c5b10e28b358308d8349b940af09f64368172f2e (diff) |
Use internal/pkg structure
Diffstat (limited to 'network')
-rw-r--r-- | network/ip.go | 33 | ||||
-rw-r--r-- | network/network.go | 213 | ||||
-rw-r--r-- | network/network_test.go | 52 | ||||
-rw-r--r-- | network/session.go | 23 |
4 files changed, 0 insertions, 321 deletions
diff --git a/network/ip.go b/network/ip.go deleted file mode 100644 index 0c6451e..0000000 --- a/network/ip.go +++ /dev/null @@ -1,33 +0,0 @@ -package network - -import ( - "fmt" - "log/slog" - "math/rand" - "net" -) - -func GetOutboundIP() net.IP { - conn, err := net.Dial("udp", "8.8.8.8:80") - if err != nil { - slog.Error("err", err) - } - defer conn.Close() - - localAddr := conn.LocalAddr().(*net.UDPAddr) - - return localAddr.IP -} - -func GetRandomAvailablePort() (int, error) { - for i := 0; i < 100; i += 1 { - port := rand.Intn(65535-1024) + 1024 - addr := fmt.Sprintf(":%d", port) - ln, err := net.Listen("tcp", addr) - if err == nil { - defer ln.Close() - return port, nil - } - } - return 0, fmt.Errorf("failed to find an available port after multiple attempts") -} diff --git a/network/network.go b/network/network.go deleted file mode 100644 index 8283993..0000000 --- a/network/network.go +++ /dev/null @@ -1,213 +0,0 @@ -package network - -import ( - "bufio" - "encoding/json" - "fmt" - "net" - "sync" - "time" - - "go.uber.org/zap" -) - -var logger *zap.Logger - -// PeerInfo represents a peer's ID and IP. -type PeerInfo struct { - ID string `json:"id"` - IP string `json:"ip"` - Port int `json:"port"` -} - -// Message represents a structured message. -type Message struct { - Type string `json:"type"` - Payload []byte `json:"payload"` - Source PeerInfo `json:"source"` - Target PeerInfo `json:"target"` - Timestamp int64 `json:"timestamp"` -} - -type NetworkCallback func(msg Message) - -// TCPNetwork represents a full-duplex TCP peer. -type TCPNetwork struct { - localPeer PeerInfo - connections map[string]net.Conn - listener net.Listener - callbacks map[string]NetworkCallback - callbacksMu sync.RWMutex - isConnected bool - retryDelay time.Duration - sync.Mutex -} - -// initializes a TCP peer -func NewTCPNetwork(localID, localIP string, localPort int) *TCPNetwork { - n := &TCPNetwork{ - localPeer: PeerInfo{ID: localID, IP: localIP, Port: localPort}, - connections: make(map[string]net.Conn), - callbacks: make(map[string]NetworkCallback), - isConnected: false, - retryDelay: 2 * time.Second, - } - - go n.startServer() - - logger, _ = zap.NewProduction() - - return n -} - -// Add a new peer connection to the local peer -func (n *TCPNetwork) AddPeer(remoteID string, remoteIP string, remotePort int) { - go n.retryConnect(remoteID, remoteIP, remotePort) -} - -// startServer starts a TCP server to accept connections. -func (n *TCPNetwork) startServer() { - address := fmt.Sprintf("%s:%d", n.localPeer.IP, n.localPeer.Port) - listener, err := net.Listen("tcp", address) - if err != nil { - logger.Sugar().Errorf("failed to start server: %v", err) - } - n.listener = listener - logger.Sugar().Infof("server started on %s\n", address) - - for { - conn, err := listener.Accept() - if err != nil { - logger.Sugar().Errorf("failed to accept connection: %v\n", err) - continue - } - - remoteAddr := conn.RemoteAddr().String() - n.Lock() - n.connections[remoteAddr] = conn - n.Unlock() - n.isConnected = true - n.retryDelay = 2 * time.Second - - logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr) - go n.listenForMessages(conn) - } -} - -// retryConnect attempts to connect to a remote peer. -func (n *TCPNetwork) retryConnect(remoteID, remoteIP string, remotePort int) { - for { - n.Lock() - _, exists := n.connections[remoteID] - n.Unlock() - - if exists { - time.Sleep(5 * time.Second) - continue - } - - address := fmt.Sprintf("%s:%d", remoteIP, remotePort) - conn, err := net.Dial("tcp", address) - - if err != nil { - logger.Sugar().Errorf("failed to connect to %s: %v. Retrying in %v...", remoteID, err, n.retryDelay) - time.Sleep(n.retryDelay) - if n.retryDelay < 30*time.Second { - n.retryDelay *= 2 - } - continue - } - - n.Lock() - n.connections[remoteID] = conn - n.Unlock() - logger.Sugar().Infof("successfully connected to peer %s!", remoteID) - - go n.listenForMessages(conn) - } -} - -// Send sends a message to a specified remote peer. -func (n *TCPNetwork) Send(remoteID, messageType string, payload []byte) error { - n.Lock() - conn, exists := n.connections[remoteID] - n.Unlock() - - if !exists { - return fmt.Errorf("not connected to peer %s", remoteID) - } - - msg := Message{ - Type: messageType, - Payload: payload, - Source: n.localPeer, - Target: PeerInfo{ID: remoteID}, - Timestamp: time.Now().Unix(), - } - - data, err := json.Marshal(msg) - if err != nil { - return fmt.Errorf("failed to marshal message: %v", err) - } - - _, err = conn.Write(append(data, '\n')) - if err != nil { - logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err) - n.Lock() - delete(n.connections, remoteID) - n.Unlock() - go n.retryConnect(remoteID, "", 0) - return fmt.Errorf("failed to send message: %v", err) - } - - return nil -} - -// RegisterHandler registers a callback for a message type. -func (n *TCPNetwork) RegisterHandler(messageType string, callback NetworkCallback) { - n.callbacksMu.Lock() - n.callbacks[messageType] = callback - n.callbacksMu.Unlock() -} - -// listenForMessages listens for incoming messages. -func (n *TCPNetwork) listenForMessages(conn net.Conn) { - reader := bufio.NewReader(conn) - - for { - data, err := reader.ReadBytes('\n') - if err != nil { - logger.Debug("connection lost. Reconnecting...") - n.Lock() - for id, c := range n.connections { - if c == conn { - delete(n.connections, id) - go n.retryConnect(id, "", 0) - break - } - } - n.Unlock() - return - } - - var message Message - if err := json.Unmarshal(data, &message); err != nil { - logger.Sugar().Errorf("failed to unmarshal message: %v\n", err) - continue - } - - n.callbacksMu.RLock() - callback, exists := n.callbacks[message.Type] - n.callbacksMu.RUnlock() - - if exists { - go callback(message) - } - } -} - -func (n *TCPNetwork) IsConnected() bool { - n.Lock() - defer n.Unlock() - return n.isConnected -} diff --git a/network/network_test.go b/network/network_test.go deleted file mode 100644 index 9dbc416..0000000 --- a/network/network_test.go +++ /dev/null @@ -1,52 +0,0 @@ -package network - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -// TestPeerToPeerCommunication tests if two peers can communicate. -func TestPeerToPeerCommunication(t *testing.T) { - // Create a mock of the first peer (peer-1) - peer1IP := "127.0.0.1" - peer1Port := 9001 - peer1 := NewTCPNetwork("peer-1", peer1IP, peer1Port) - - // Create a mock of the second peer (peer-2) - peer2IP := "127.0.0.1" - peer2Port := 9002 - peer2 := NewTCPNetwork("peer-2", peer2IP, peer2Port) - - // Register a message handler on peer-2 to receive the message from peer-1 - peer2.RegisterHandler("chat", func(msg Message) { - assert.Equal(t, "peer-1", msg.Source.ID) - assert.Equal(t, "Hey from peer-1!", string(msg.Payload)) - }) - - // Start the first peer and add the second peer - go peer1.AddPeer("peer-2", peer2IP, peer2Port) - go peer2.AddPeer("peer-1", peer1IP, peer1Port) - - // Wait for the connections to be established - // You might need a little more time based on network delay and retry logic - time.Sleep(5 * time.Second) - - // Send a message from peer-1 to peer-2 - err := peer1.Send("peer-2", "chat", []byte("Hey from peer-1!")) - assert.NoError(t, err) - - // Allow some time for the message to be received and handled - time.Sleep(2 * time.Second) -} - -// TestSendFailure tests if sending a message fails when no connection exists. -func TestSendFailure(t *testing.T) { - peer1 := NewTCPNetwork("peer-1", "127.0.0.1", 9001) - _ = NewTCPNetwork("peer-2", "127.0.0.1", 9002) - - // Attempt to send a message without establishing a connection first - err := peer1.Send("peer-2", "chat", []byte("Message without connection")) - assert.Error(t, err, "Expected error when sending to a non-connected peer") -} diff --git a/network/session.go b/network/session.go deleted file mode 100644 index a4f60aa..0000000 --- a/network/session.go +++ /dev/null @@ -1,23 +0,0 @@ -package network - -import ( - "math/rand" -) - -var adjectives = []string{ - "adamant", "adept", "adventurous", "arcadian", "auspicious", - "awesome", "blossoming", "brave", "charming", "chatty", - "circular", "considerate", "cubic", "curious", "delighted", -} - -var nouns = []string{ - "aardvark", "accordion", "apple", "apricot", "bee", - "brachiosaur", "cactus", "capsicum", "clarinet", "cowbell", - "crab", "cuckoo", "cymbal", "diplodocus", "donkey", -} - -func NewSession() string { - noun := nouns[rand.Intn(len(nouns))] - adjective := adjectives[rand.Intn(len(adjectives))] - return noun + "-" + adjective -} |