diff options
author | Santo Cariotti <santo@dcariotti.me> | 2025-04-16 11:46:04 +0200 |
---|---|---|
committer | Santo Cariotti <santo@dcariotti.me> | 2025-04-16 11:46:04 +0200 |
commit | ba4afeb4ee19c24b393ec21d374bdd752651c1a6 (patch) | |
tree | 2082deaddd348439605a7209fbffd4a355ff7b37 /internal | |
parent | 76f46e54175253d4b2ba61b9cb8f2525a48c15d8 (diff) |
Remove topics on network
Diffstat (limited to 'internal')
-rw-r--r-- | internal/network/network.go | 262 | ||||
-rw-r--r-- | internal/network/network_test.go | 63 |
2 files changed, 187 insertions, 138 deletions
diff --git a/internal/network/network.go b/internal/network/network.go index 8b6c686..0b4f4aa 100644 --- a/internal/network/network.go +++ b/internal/network/network.go @@ -3,133 +3,81 @@ package network import ( "bufio" "encoding/json" + "errors" "fmt" "net" "sync" "time" - "github.com/boozec/rahanna/internal/logger" "go.uber.org/zap" ) -// PeerInfo represents a peer's ID and IP. -type PeerInfo struct { - ID string `json:"id"` - IP string `json:"ip"` - Port int `json:"port"` +// `Message` represents a structured message on this network. +type Message struct { + Timestamp int64 `json:"timestamp"` + Source string `json:"source"` + Payload []byte `json:"payload"` } -// Message represents a structured message. -type Message struct { - Type string `json:"type"` - Payload []byte `json:"payload"` - Source PeerInfo `json:"source"` - Target PeerInfo `json:"target"` - Timestamp int64 `json:"timestamp"` +// A network ID is represented by a string +type NetworkID string + +// This type represents the function that is called every time a new message +// arrives to the server. +type NetworkMessageReceiveFunc func(msg Message) + +// This type represent the callback function invokes every new handshake between +// two peers +type NetworkHandshakeFunc func() error + +func DefaultHandshake() error { + return nil } -type NetworkCallback func(msg Message) +// Network options to define on new `TCPNetwork` +type TCPNetworkOpts struct { + ListenAddr string + RetryDelay time.Duration + HandshakeFn NetworkHandshakeFunc + OnReceiveFn NetworkMessageReceiveFunc + Logger *zap.Logger +} // TCPNetwork represents a full-duplex TCP peer. type TCPNetwork struct { - localPeer PeerInfo - connections map[string]net.Conn - listener net.Listener - callbacks map[string]NetworkCallback - callbacksMu sync.RWMutex - isConnected bool - retryDelay time.Duration - logger *zap.Logger sync.Mutex + TCPNetworkOpts + + id NetworkID + listener net.Listener + connections map[NetworkID]net.Conn } -// initializes a TCP peer -func NewTCPNetwork(localID, localIP string, localPort int, onReceive func()) *TCPNetwork { +// Initiliaze a new TCP network +func NewTCPNetwork(localID NetworkID, opts TCPNetworkOpts) *TCPNetwork { n := &TCPNetwork{ - localPeer: PeerInfo{ID: localID, IP: localIP, Port: localPort}, - connections: make(map[string]net.Conn), - callbacks: make(map[string]NetworkCallback), - isConnected: false, - retryDelay: 2 * time.Second, - logger: logger.InitLogger("rahanna.log"), + TCPNetworkOpts: opts, + id: localID, + connections: make(map[NetworkID]net.Conn), } - go n.startServer(onReceive) + go n.startServer() return n } -// Add a new peer connection to the local peer -func (n *TCPNetwork) AddPeer(remoteID string, remoteIP string, remotePort int) { - go n.retryConnect(remoteID, remoteIP, remotePort) -} - -// startServer starts a TCP server to accept connections. -func (n *TCPNetwork) startServer(callback func()) { - address := fmt.Sprintf("%s:%d", n.localPeer.IP, n.localPeer.Port) - listener, err := net.Listen("tcp", address) - if err != nil { - n.logger.Sugar().Errorf("failed to start server: %v", err) - return - } - n.listener = listener - n.logger.Sugar().Infof("server started on %s\n", address) - - for { - conn, err := listener.Accept() - if err != nil { - n.logger.Sugar().Errorf("failed to accept connection: %v\n", err) - continue - } - - remoteAddr := conn.RemoteAddr().String() - n.Lock() - n.connections[remoteAddr] = conn - callback() - n.Unlock() - n.isConnected = true - n.retryDelay = 2 * time.Second - - n.logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr) - go n.listenForMessages(conn) - } +// Close listener' connection +func (n *TCPNetwork) Close() error { + return n.listener.Close() } -// retryConnect attempts to connect to a remote peer. -func (n *TCPNetwork) retryConnect(remoteID, remoteIP string, remotePort int) { - for { - n.Lock() - _, exists := n.connections[remoteID] - n.Unlock() - - if exists { - time.Sleep(5 * time.Second) - continue - } - - address := fmt.Sprintf("%s:%d", remoteIP, remotePort) - conn, err := net.Dial("tcp", address) - - if err != nil { - n.logger.Sugar().Errorf("failed to connect to %s: %v. Retrying in %v...", remoteID, err, n.retryDelay) - time.Sleep(n.retryDelay) - if n.retryDelay < 30*time.Second { - n.retryDelay *= 2 - } - continue - } - - n.Lock() - n.connections[remoteID] = conn - n.Unlock() - n.logger.Sugar().Infof("successfully connected to peer %s!", remoteID) - - go n.listenForMessages(conn) - } +// Add a new peer connection to the local peer +func (n *TCPNetwork) AddPeer(remoteID NetworkID, addr string) { + go n.retryConnect(remoteID, addr) } -// Send sends a message to a specified remote peer. -func (n *TCPNetwork) Send(remoteID, messageType string, payload []byte) error { +// Send methods is used to send a message to a specified remote peer +func (n *TCPNetwork) Send(remoteID NetworkID, payload []byte) error { n.Lock() conn, exists := n.connections[remoteID] n.Unlock() @@ -139,10 +87,8 @@ func (n *TCPNetwork) Send(remoteID, messageType string, payload []byte) error { } msg := Message{ - Type: messageType, Payload: payload, - Source: n.localPeer, - Target: PeerInfo{ID: remoteID}, + Source: n.listener.Addr().String(), Timestamp: time.Now().Unix(), } @@ -153,11 +99,13 @@ func (n *TCPNetwork) Send(remoteID, messageType string, payload []byte) error { _, err = conn.Write(append(data, '\n')) if err != nil { - n.logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err) + n.Logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err) n.Lock() delete(n.connections, remoteID) n.Unlock() - go n.retryConnect(remoteID, "", 0) + + go n.retryConnect(remoteID, "") + return fmt.Errorf("failed to send message: %v", err) } @@ -165,10 +113,56 @@ func (n *TCPNetwork) Send(remoteID, messageType string, payload []byte) error { } // RegisterHandler registers a callback for a message type. -func (n *TCPNetwork) RegisterHandler(messageType string, callback NetworkCallback) { - n.callbacksMu.Lock() - n.callbacks[messageType] = callback - n.callbacksMu.Unlock() +func (n *TCPNetwork) RegisterHandler(callback NetworkMessageReceiveFunc) { + n.OnReceiveFn = callback +} + +// startServer starts a TCP server to accept connections. +func (n *TCPNetwork) startServer() error { + var err error + + n.listener, err = net.Listen("tcp", n.ListenAddr) + if err != nil { + n.Logger.Sugar().Errorf("failed to start server: %v", err) + return err + } + + go n.listenLoop() + + n.Logger.Sugar().Infof("server started on %s\n", n.ListenAddr) + + return nil + +} + +func (n *TCPNetwork) listenLoop() error { + for { + conn, err := n.listener.Accept() + if errors.Is(err, net.ErrClosed) { + n.Logger.Sugar().Errorf("connection is closed in such a way: %v\n", err) + return err + } + + if err != nil { + n.Logger.Sugar().Errorf("failed to accept connection: %v\n", err) + continue + } + + remoteAddr := conn.RemoteAddr().String() + n.Lock() + n.connections[NetworkID(remoteAddr)] = conn + if err := n.HandshakeFn(); err != nil { + n.Logger.Sugar().Errorf("error on handshaking: %v\n", err) + return err + } + n.Unlock() + n.RetryDelay = 2 * time.Second + + n.Logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr) + + // Read loop + go n.listenForMessages(conn) + } } // listenForMessages listens for incoming messages. @@ -178,12 +172,14 @@ func (n *TCPNetwork) listenForMessages(conn net.Conn) { for { data, err := reader.ReadBytes('\n') if err != nil { - n.logger.Debug("connection lost. Reconnecting...") + n.Logger.Debug("connection lost. Reconnecting...") n.Lock() + + // FIXME: a better way to re-establish the connection between peer for id, c := range n.connections { if c == conn { delete(n.connections, id) - go n.retryConnect(id, "", 0) + go n.retryConnect(id, "") break } } @@ -193,22 +189,48 @@ func (n *TCPNetwork) listenForMessages(conn net.Conn) { var message Message if err := json.Unmarshal(data, &message); err != nil { - n.logger.Sugar().Errorf("failed to unmarshal message: %v\n", err) + n.Logger.Sugar().Errorf("failed to unmarshal message: %v\n", err) continue } - n.callbacksMu.RLock() - callback, exists := n.callbacks[message.Type] - n.callbacksMu.RUnlock() + n.OnReceiveFn(message) + } +} + +// retryConnect attempts to connect to a remote peer. +func (n *TCPNetwork) retryConnect(remoteID NetworkID, addr string) { + for { + n.Lock() + _, exists := n.connections[remoteID] + n.Unlock() if exists { - go callback(message) + time.Sleep(5 * time.Second) + continue } - } -} -func (n *TCPNetwork) IsConnected() bool { - n.Lock() - defer n.Unlock() - return n.isConnected + conn, err := net.Dial("tcp", addr) + + if err != nil { + n.Logger.Sugar().Errorf("failed to connect to %s: %v. Retrying in %v...", remoteID, err, n.RetryDelay) + time.Sleep(n.RetryDelay) + if n.RetryDelay < 30*time.Second { + n.RetryDelay *= 2 + } else { + n.Lock() + delete(n.connections, remoteID) + n.Unlock() + n.Logger.Sugar().Infof("removed %s connection", remoteID) + return + } + continue + } + + n.Lock() + n.connections[remoteID] = conn + n.Unlock() + n.Logger.Sugar().Infof("successfully connected to peer %s!", remoteID) + + go n.listenForMessages(conn) + } } diff --git a/internal/network/network_test.go b/internal/network/network_test.go index 4f3211c..1bc42fb 100644 --- a/internal/network/network_test.go +++ b/internal/network/network_test.go @@ -5,36 +5,49 @@ import ( "time" "github.com/stretchr/testify/assert" + "go.uber.org/zap" ) // TestPeerToPeerCommunication tests if two peers can communicate. func TestPeerToPeerCommunication(t *testing.T) { // Create a mock of the first peer (peer-1) - peer1IP := "127.0.0.1" - peer1Port := 9001 - peer1 := NewTCPNetwork("peer-1", peer1IP, peer1Port, func() {}) + peer1Opts := TCPNetworkOpts{ + ListenAddr: ":9001", + HandshakeFn: func() error { return nil }, + RetryDelay: time.Second * 2, + Logger: zap.L(), + } + peer1 := NewTCPNetwork("peer-1", peer1Opts) - // Create a mock of the second peer (peer-2) - peer2IP := "127.0.0.1" - peer2Port := 9002 - peer2 := NewTCPNetwork("peer-2", peer2IP, peer2Port, func() {}) - - // Register a message handler on peer-2 to receive the message from peer-1 - peer2.RegisterHandler("chat", func(msg Message) { - assert.Equal(t, "peer-1", msg.Source.ID) - assert.Equal(t, "Hey from peer-1!", string(msg.Payload)) + peer1.RegisterHandler(func(msg Message) { + assert.Equal(t, "Hey from peer-2!", string(msg.Payload)) }) + // Create a mock of the second peer (peer-2) + peer2Opts := TCPNetworkOpts{ + ListenAddr: ":9002", + HandshakeFn: func() error { return nil }, + RetryDelay: time.Second * 2, + Logger: zap.L(), + OnReceiveFn: func(msg Message) { + assert.Equal(t, "Hey from peer-1!", string(msg.Payload)) + }, + } + peer2 := NewTCPNetwork("peer-2", peer2Opts) + // Start the first peer and add the second peer - go peer1.AddPeer("peer-2", peer2IP, peer2Port) - go peer2.AddPeer("peer-1", peer1IP, peer1Port) + go peer1.AddPeer("peer-2", peer2.ListenAddr) + go peer2.AddPeer("peer-1", peer1.ListenAddr) // Wait for the connections to be established // You might need a little more time based on network delay and retry logic time.Sleep(5 * time.Second) // Send a message from peer-1 to peer-2 - err := peer1.Send("peer-2", "chat", []byte("Hey from peer-1!")) + err := peer1.Send("peer-2", []byte("Hey from peer-1!")) + assert.NoError(t, err) + + err = peer2.Send("peer-1", []byte("Hey from peer-2!")) assert.NoError(t, err) // Allow some time for the message to be received and handled @@ -43,10 +56,24 @@ func TestPeerToPeerCommunication(t *testing.T) { // TestSendFailure tests if sending a message fails when no connection exists. func TestSendFailure(t *testing.T) { - peer1 := NewTCPNetwork("peer-1", "127.0.0.1", 9001, func() {}) - _ = NewTCPNetwork("peer-2", "127.0.0.1", 9002, func() {}) + peer1Opts := TCPNetworkOpts{ + ListenAddr: ":9001", + HandshakeFn: DefaultHandshake, + RetryDelay: time.Second * 2, + Logger: zap.L(), + } + peer1 := NewTCPNetwork("peer-1", peer1Opts) + + // Create a mock of the second peer (peer-2) + peer2Opts := TCPNetworkOpts{ + ListenAddr: ":9002", + HandshakeFn: DefaultHandshake, + RetryDelay: time.Second * 2, + Logger: zap.L(), + } + _ = NewTCPNetwork("peer-2", peer2Opts) // Attempt to send a message without establishing a connection first - err := peer1.Send("peer-2", "chat", []byte("Message without connection")) + err := peer1.Send("peer-2", []byte("Message without connection")) assert.Error(t, err, "Expected error when sending to a non-connected peer") } |