diff options
Diffstat (limited to 'pkg/p2p/network.go')
-rw-r--r-- | pkg/p2p/network.go | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/pkg/p2p/network.go b/pkg/p2p/network.go new file mode 100644 index 0000000..3362b4a --- /dev/null +++ b/pkg/p2p/network.go @@ -0,0 +1,238 @@ +package p2p + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "net" + "sync" + "time" + + "go.uber.org/zap" +) + +// `Message` represents a structured message on this network. +type Message struct { + Timestamp int64 `json:"timestamp"` + Source string `json:"source"` + Payload []byte `json:"payload"` +} + +// A network ID is represented by a string +type NetworkID string + +// This type represents the function that is called every time a new message +// arrives to the server. +type NetworkMessageReceiveFunc func(msg Message) + +// This type represent the callback function invokes every new handshake between +// two peers +type NetworkHandshakeFunc func() error + +func DefaultHandshake() error { + return nil +} + +// Network options to define on new `TCPNetwork` +type TCPNetworkOpts struct { + ListenAddr string + RetryDelay time.Duration + HandshakeFn NetworkHandshakeFunc + OnReceiveFn NetworkMessageReceiveFunc + Logger *zap.Logger +} + +// TCPNetwork represents a full-duplex TCP peer. +type TCPNetwork struct { + sync.Mutex + TCPNetworkOpts + + id NetworkID + listener net.Listener + connections map[NetworkID]net.Conn +} + +// Initiliaze a new TCP network +func NewTCPNetwork(localID NetworkID, opts TCPNetworkOpts) *TCPNetwork { + n := &TCPNetwork{ + TCPNetworkOpts: opts, + id: localID, + connections: make(map[NetworkID]net.Conn), + } + + go n.startServer() + + return n +} + +// Close listener' connection +func (n *TCPNetwork) Close() error { + return n.listener.Close() +} + +// Add a new peer connection to the local peer +func (n *TCPNetwork) AddPeer(remoteID NetworkID, addr string) { + go n.retryConnect(remoteID, addr) +} + +// Send methods is used to send a message to a specified remote peer +func (n *TCPNetwork) Send(remoteID NetworkID, payload []byte) error { + n.Lock() + conn, exists := n.connections[remoteID] + n.Unlock() + + if !exists { + return fmt.Errorf("not connected to peer %s", remoteID) + } + + msg := Message{ + Payload: payload, + Source: n.listener.Addr().String(), + Timestamp: time.Now().Unix(), + } + + data, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message: %v", err) + } + + _, err = conn.Write(append(data, '\n')) + if err != nil { + n.Logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err) + n.Lock() + delete(n.connections, remoteID) + n.Unlock() + + go n.retryConnect(remoteID, "") + + return fmt.Errorf("failed to send message: %v", err) + } + + return nil +} + +// RegisterHandler registers a callback for a message type. +func (n *TCPNetwork) RegisterHandler(callback NetworkMessageReceiveFunc) { + n.OnReceiveFn = callback +} + +// startServer starts a TCP server to accept connections. +func (n *TCPNetwork) startServer() error { + var err error + + n.listener, err = net.Listen("tcp", n.ListenAddr) + if err != nil { + n.Logger.Sugar().Errorf("failed to start server: %v", err) + return err + } + + go n.listenLoop() + + n.Logger.Sugar().Infof("server started on %s\n", n.ListenAddr) + + return nil + +} + +func (n *TCPNetwork) listenLoop() error { + for { + conn, err := n.listener.Accept() + if errors.Is(err, net.ErrClosed) { + n.Logger.Sugar().Errorf("connection is closed in such a way: %v\n", err) + return err + } + + if err != nil { + n.Logger.Sugar().Errorf("failed to accept connection: %v\n", err) + continue + } + + remoteAddr := conn.RemoteAddr().String() + n.Lock() + n.connections[NetworkID(remoteAddr)] = conn + if err := n.HandshakeFn(); err != nil { + n.Logger.Sugar().Errorf("error on handshaking: %v\n", err) + return err + } + n.Unlock() + n.RetryDelay = 2 * time.Second + + n.Logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr) + + // Read loop + go n.listenForMessages(conn) + } +} + +// listenForMessages listens for incoming messages. +func (n *TCPNetwork) listenForMessages(conn net.Conn) { + reader := bufio.NewReader(conn) + + for { + data, err := reader.ReadBytes('\n') + if err != nil { + n.Logger.Debug("connection lost. Reconnecting...") + n.Lock() + + // FIXME: a better way to re-establish the connection between peer + for id, c := range n.connections { + if c == conn { + delete(n.connections, id) + go n.retryConnect(id, "") + break + } + } + n.Unlock() + return + } + + var message Message + if err := json.Unmarshal(data, &message); err != nil { + n.Logger.Sugar().Errorf("failed to unmarshal message: %v\n", err) + continue + } + + n.Logger.Sugar().Infof("Received message from '%s': %s", message.Source, string(message.Payload)) + + n.OnReceiveFn(message) + } +} + +// retryConnect attempts to connect to a remote peer. +func (n *TCPNetwork) retryConnect(remoteID NetworkID, addr string) { + for { + n.Lock() + _, exists := n.connections[remoteID] + n.Unlock() + + if exists { + time.Sleep(5 * time.Second) + continue + } + + conn, err := net.Dial("tcp", addr) + + if err != nil { + n.Logger.Sugar().Errorf("failed to connect to %s: %v. Retrying in %v...", remoteID, err, n.RetryDelay) + time.Sleep(n.RetryDelay) + if n.RetryDelay < 30*time.Second { + n.RetryDelay *= 2 + } else { + n.Lock() + delete(n.connections, remoteID) + n.Unlock() + n.Logger.Sugar().Infof("removed %s connection", remoteID) + return + } + continue + } + + n.Lock() + n.connections[remoteID] = conn + n.Unlock() + n.Logger.Sugar().Infof("successfully connected to peer %s!", remoteID) + + go n.listenForMessages(conn) + } +} |