diff options
Diffstat (limited to 'internal/network/network.go')
-rw-r--r-- | internal/network/network.go | 238 |
1 files changed, 0 insertions, 238 deletions
diff --git a/internal/network/network.go b/internal/network/network.go deleted file mode 100644 index cec024f..0000000 --- a/internal/network/network.go +++ /dev/null @@ -1,238 +0,0 @@ -package network - -import ( - "bufio" - "encoding/json" - "errors" - "fmt" - "net" - "sync" - "time" - - "go.uber.org/zap" -) - -// `Message` represents a structured message on this network. -type Message struct { - Timestamp int64 `json:"timestamp"` - Source string `json:"source"` - Payload []byte `json:"payload"` -} - -// A network ID is represented by a string -type NetworkID string - -// This type represents the function that is called every time a new message -// arrives to the server. -type NetworkMessageReceiveFunc func(msg Message) - -// This type represent the callback function invokes every new handshake between -// two peers -type NetworkHandshakeFunc func() error - -func DefaultHandshake() error { - return nil -} - -// Network options to define on new `TCPNetwork` -type TCPNetworkOpts struct { - ListenAddr string - RetryDelay time.Duration - HandshakeFn NetworkHandshakeFunc - OnReceiveFn NetworkMessageReceiveFunc - Logger *zap.Logger -} - -// TCPNetwork represents a full-duplex TCP peer. -type TCPNetwork struct { - sync.Mutex - TCPNetworkOpts - - id NetworkID - listener net.Listener - connections map[NetworkID]net.Conn -} - -// Initiliaze a new TCP network -func NewTCPNetwork(localID NetworkID, opts TCPNetworkOpts) *TCPNetwork { - n := &TCPNetwork{ - TCPNetworkOpts: opts, - id: localID, - connections: make(map[NetworkID]net.Conn), - } - - go n.startServer() - - return n -} - -// Close listener' connection -func (n *TCPNetwork) Close() error { - return n.listener.Close() -} - -// Add a new peer connection to the local peer -func (n *TCPNetwork) AddPeer(remoteID NetworkID, addr string) { - go n.retryConnect(remoteID, addr) -} - -// Send methods is used to send a message to a specified remote peer -func (n *TCPNetwork) Send(remoteID NetworkID, payload []byte) error { - n.Lock() - conn, exists := n.connections[remoteID] - n.Unlock() - - if !exists { - return fmt.Errorf("not connected to peer %s", remoteID) - } - - msg := Message{ - Payload: payload, - Source: n.listener.Addr().String(), - Timestamp: time.Now().Unix(), - } - - data, err := json.Marshal(msg) - if err != nil { - return fmt.Errorf("failed to marshal message: %v", err) - } - - _, err = conn.Write(append(data, '\n')) - if err != nil { - n.Logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err) - n.Lock() - delete(n.connections, remoteID) - n.Unlock() - - go n.retryConnect(remoteID, "") - - return fmt.Errorf("failed to send message: %v", err) - } - - return nil -} - -// RegisterHandler registers a callback for a message type. -func (n *TCPNetwork) RegisterHandler(callback NetworkMessageReceiveFunc) { - n.OnReceiveFn = callback -} - -// startServer starts a TCP server to accept connections. -func (n *TCPNetwork) startServer() error { - var err error - - n.listener, err = net.Listen("tcp", n.ListenAddr) - if err != nil { - n.Logger.Sugar().Errorf("failed to start server: %v", err) - return err - } - - go n.listenLoop() - - n.Logger.Sugar().Infof("server started on %s\n", n.ListenAddr) - - return nil - -} - -func (n *TCPNetwork) listenLoop() error { - for { - conn, err := n.listener.Accept() - if errors.Is(err, net.ErrClosed) { - n.Logger.Sugar().Errorf("connection is closed in such a way: %v\n", err) - return err - } - - if err != nil { - n.Logger.Sugar().Errorf("failed to accept connection: %v\n", err) - continue - } - - remoteAddr := conn.RemoteAddr().String() - n.Lock() - n.connections[NetworkID(remoteAddr)] = conn - if err := n.HandshakeFn(); err != nil { - n.Logger.Sugar().Errorf("error on handshaking: %v\n", err) - return err - } - n.Unlock() - n.RetryDelay = 2 * time.Second - - n.Logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr) - - // Read loop - go n.listenForMessages(conn) - } -} - -// listenForMessages listens for incoming messages. -func (n *TCPNetwork) listenForMessages(conn net.Conn) { - reader := bufio.NewReader(conn) - - for { - data, err := reader.ReadBytes('\n') - if err != nil { - n.Logger.Debug("connection lost. Reconnecting...") - n.Lock() - - // FIXME: a better way to re-establish the connection between peer - for id, c := range n.connections { - if c == conn { - delete(n.connections, id) - go n.retryConnect(id, "") - break - } - } - n.Unlock() - return - } - - var message Message - if err := json.Unmarshal(data, &message); err != nil { - n.Logger.Sugar().Errorf("failed to unmarshal message: %v\n", err) - continue - } - - n.Logger.Sugar().Infof("Received message from '%s': %s", message.Source, string(message.Payload)) - - n.OnReceiveFn(message) - } -} - -// retryConnect attempts to connect to a remote peer. -func (n *TCPNetwork) retryConnect(remoteID NetworkID, addr string) { - for { - n.Lock() - _, exists := n.connections[remoteID] - n.Unlock() - - if exists { - time.Sleep(5 * time.Second) - continue - } - - conn, err := net.Dial("tcp", addr) - - if err != nil { - n.Logger.Sugar().Errorf("failed to connect to %s: %v. Retrying in %v...", remoteID, err, n.RetryDelay) - time.Sleep(n.RetryDelay) - if n.RetryDelay < 30*time.Second { - n.RetryDelay *= 2 - } else { - n.Lock() - delete(n.connections, remoteID) - n.Unlock() - n.Logger.Sugar().Infof("removed %s connection", remoteID) - return - } - continue - } - - n.Lock() - n.connections[remoteID] = conn - n.Unlock() - n.Logger.Sugar().Infof("successfully connected to peer %s!", remoteID) - - go n.listenForMessages(conn) - } -} |