summaryrefslogtreecommitdiff
path: root/pkg/p2p
diff options
context:
space:
mode:
authorSanto Cariotti <santo@dcariotti.me>2025-04-17 22:28:08 +0200
committerSanto Cariotti <santo@dcariotti.me>2025-04-17 22:28:08 +0200
commit544977d54effa7804386aa40f30a87f5e2365efa (patch)
treeaa6896cfa3db9c2e61b219f6995e25da64255f30 /pkg/p2p
parent8255fbdd7d9d595e71545b7c6909114024527a34 (diff)
Move internal/network package to pkg/p2p
Diffstat (limited to 'pkg/p2p')
-rw-r--r--pkg/p2p/ip.go37
-rw-r--r--pkg/p2p/network.go238
-rw-r--r--pkg/p2p/network_test.go79
-rw-r--r--pkg/p2p/session.go23
4 files changed, 377 insertions, 0 deletions
diff --git a/pkg/p2p/ip.go b/pkg/p2p/ip.go
new file mode 100644
index 0000000..c82b861
--- /dev/null
+++ b/pkg/p2p/ip.go
@@ -0,0 +1,37 @@
+package p2p
+
+import (
+ "fmt"
+ "math/rand"
+ "net"
+
+ "github.com/boozec/rahanna/internal/logger"
+)
+
+// Connect a DNS to get the address
+func GetOutboundIP() net.IP {
+ log, _ := logger.GetLogger()
+ conn, err := net.Dial("udp", "8.8.8.8:80")
+ if err != nil {
+ log.Sugar().Error("err", err)
+ }
+ defer conn.Close()
+
+ localAddr := conn.LocalAddr().(*net.UDPAddr)
+
+ return localAddr.IP
+}
+
+// Returns a random available port on the node
+func GetRandomAvailablePort() (int, error) {
+ for i := 0; i < 100; i += 1 {
+ port := rand.Intn(65535-1024) + 1024
+ addr := fmt.Sprintf(":%d", port)
+ ln, err := net.Listen("tcp", addr)
+ if err == nil {
+ defer ln.Close()
+ return port, nil
+ }
+ }
+ return 0, fmt.Errorf("failed to find an available port after multiple attempts")
+}
diff --git a/pkg/p2p/network.go b/pkg/p2p/network.go
new file mode 100644
index 0000000..3362b4a
--- /dev/null
+++ b/pkg/p2p/network.go
@@ -0,0 +1,238 @@
+package p2p
+
+import (
+ "bufio"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net"
+ "sync"
+ "time"
+
+ "go.uber.org/zap"
+)
+
+// `Message` represents a structured message on this network.
+type Message struct {
+ Timestamp int64 `json:"timestamp"`
+ Source string `json:"source"`
+ Payload []byte `json:"payload"`
+}
+
+// A network ID is represented by a string
+type NetworkID string
+
+// This type represents the function that is called every time a new message
+// arrives to the server.
+type NetworkMessageReceiveFunc func(msg Message)
+
+// This type represent the callback function invokes every new handshake between
+// two peers
+type NetworkHandshakeFunc func() error
+
+func DefaultHandshake() error {
+ return nil
+}
+
+// Network options to define on new `TCPNetwork`
+type TCPNetworkOpts struct {
+ ListenAddr string
+ RetryDelay time.Duration
+ HandshakeFn NetworkHandshakeFunc
+ OnReceiveFn NetworkMessageReceiveFunc
+ Logger *zap.Logger
+}
+
+// TCPNetwork represents a full-duplex TCP peer.
+type TCPNetwork struct {
+ sync.Mutex
+ TCPNetworkOpts
+
+ id NetworkID
+ listener net.Listener
+ connections map[NetworkID]net.Conn
+}
+
+// Initiliaze a new TCP network
+func NewTCPNetwork(localID NetworkID, opts TCPNetworkOpts) *TCPNetwork {
+ n := &TCPNetwork{
+ TCPNetworkOpts: opts,
+ id: localID,
+ connections: make(map[NetworkID]net.Conn),
+ }
+
+ go n.startServer()
+
+ return n
+}
+
+// Close listener' connection
+func (n *TCPNetwork) Close() error {
+ return n.listener.Close()
+}
+
+// Add a new peer connection to the local peer
+func (n *TCPNetwork) AddPeer(remoteID NetworkID, addr string) {
+ go n.retryConnect(remoteID, addr)
+}
+
+// Send methods is used to send a message to a specified remote peer
+func (n *TCPNetwork) Send(remoteID NetworkID, payload []byte) error {
+ n.Lock()
+ conn, exists := n.connections[remoteID]
+ n.Unlock()
+
+ if !exists {
+ return fmt.Errorf("not connected to peer %s", remoteID)
+ }
+
+ msg := Message{
+ Payload: payload,
+ Source: n.listener.Addr().String(),
+ Timestamp: time.Now().Unix(),
+ }
+
+ data, err := json.Marshal(msg)
+ if err != nil {
+ return fmt.Errorf("failed to marshal message: %v", err)
+ }
+
+ _, err = conn.Write(append(data, '\n'))
+ if err != nil {
+ n.Logger.Sugar().Errorf("failed to send message to %s: %v. Reconnecting...", remoteID, err)
+ n.Lock()
+ delete(n.connections, remoteID)
+ n.Unlock()
+
+ go n.retryConnect(remoteID, "")
+
+ return fmt.Errorf("failed to send message: %v", err)
+ }
+
+ return nil
+}
+
+// RegisterHandler registers a callback for a message type.
+func (n *TCPNetwork) RegisterHandler(callback NetworkMessageReceiveFunc) {
+ n.OnReceiveFn = callback
+}
+
+// startServer starts a TCP server to accept connections.
+func (n *TCPNetwork) startServer() error {
+ var err error
+
+ n.listener, err = net.Listen("tcp", n.ListenAddr)
+ if err != nil {
+ n.Logger.Sugar().Errorf("failed to start server: %v", err)
+ return err
+ }
+
+ go n.listenLoop()
+
+ n.Logger.Sugar().Infof("server started on %s\n", n.ListenAddr)
+
+ return nil
+
+}
+
+func (n *TCPNetwork) listenLoop() error {
+ for {
+ conn, err := n.listener.Accept()
+ if errors.Is(err, net.ErrClosed) {
+ n.Logger.Sugar().Errorf("connection is closed in such a way: %v\n", err)
+ return err
+ }
+
+ if err != nil {
+ n.Logger.Sugar().Errorf("failed to accept connection: %v\n", err)
+ continue
+ }
+
+ remoteAddr := conn.RemoteAddr().String()
+ n.Lock()
+ n.connections[NetworkID(remoteAddr)] = conn
+ if err := n.HandshakeFn(); err != nil {
+ n.Logger.Sugar().Errorf("error on handshaking: %v\n", err)
+ return err
+ }
+ n.Unlock()
+ n.RetryDelay = 2 * time.Second
+
+ n.Logger.Sugar().Infof("connected to remote peer %s\n", remoteAddr)
+
+ // Read loop
+ go n.listenForMessages(conn)
+ }
+}
+
+// listenForMessages listens for incoming messages.
+func (n *TCPNetwork) listenForMessages(conn net.Conn) {
+ reader := bufio.NewReader(conn)
+
+ for {
+ data, err := reader.ReadBytes('\n')
+ if err != nil {
+ n.Logger.Debug("connection lost. Reconnecting...")
+ n.Lock()
+
+ // FIXME: a better way to re-establish the connection between peer
+ for id, c := range n.connections {
+ if c == conn {
+ delete(n.connections, id)
+ go n.retryConnect(id, "")
+ break
+ }
+ }
+ n.Unlock()
+ return
+ }
+
+ var message Message
+ if err := json.Unmarshal(data, &message); err != nil {
+ n.Logger.Sugar().Errorf("failed to unmarshal message: %v\n", err)
+ continue
+ }
+
+ n.Logger.Sugar().Infof("Received message from '%s': %s", message.Source, string(message.Payload))
+
+ n.OnReceiveFn(message)
+ }
+}
+
+// retryConnect attempts to connect to a remote peer.
+func (n *TCPNetwork) retryConnect(remoteID NetworkID, addr string) {
+ for {
+ n.Lock()
+ _, exists := n.connections[remoteID]
+ n.Unlock()
+
+ if exists {
+ time.Sleep(5 * time.Second)
+ continue
+ }
+
+ conn, err := net.Dial("tcp", addr)
+
+ if err != nil {
+ n.Logger.Sugar().Errorf("failed to connect to %s: %v. Retrying in %v...", remoteID, err, n.RetryDelay)
+ time.Sleep(n.RetryDelay)
+ if n.RetryDelay < 30*time.Second {
+ n.RetryDelay *= 2
+ } else {
+ n.Lock()
+ delete(n.connections, remoteID)
+ n.Unlock()
+ n.Logger.Sugar().Infof("removed %s connection", remoteID)
+ return
+ }
+ continue
+ }
+
+ n.Lock()
+ n.connections[remoteID] = conn
+ n.Unlock()
+ n.Logger.Sugar().Infof("successfully connected to peer %s!", remoteID)
+
+ go n.listenForMessages(conn)
+ }
+}
diff --git a/pkg/p2p/network_test.go b/pkg/p2p/network_test.go
new file mode 100644
index 0000000..faad9d4
--- /dev/null
+++ b/pkg/p2p/network_test.go
@@ -0,0 +1,79 @@
+package p2p
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "go.uber.org/zap"
+)
+
+// TestPeerToPeerCommunication tests if two peers can communicate.
+func TestPeerToPeerCommunication(t *testing.T) {
+ // Create a mock of the first peer (peer-1)
+ peer1Opts := TCPNetworkOpts{
+ ListenAddr: ":9001",
+ HandshakeFn: func() error { return nil },
+ RetryDelay: time.Second * 2,
+ Logger: zap.L(),
+ }
+ peer1 := NewTCPNetwork("peer-1", peer1Opts)
+
+ peer1.RegisterHandler(func(msg Message) {
+ assert.Equal(t, "Hey from peer-2!", string(msg.Payload))
+ })
+
+ // Create a mock of the second peer (peer-2)
+ peer2Opts := TCPNetworkOpts{
+ ListenAddr: ":9002",
+ HandshakeFn: func() error { return nil },
+ RetryDelay: time.Second * 2,
+ Logger: zap.L(),
+ OnReceiveFn: func(msg Message) {
+ assert.Equal(t, "Hey from peer-1!", string(msg.Payload))
+ },
+ }
+ peer2 := NewTCPNetwork("peer-2", peer2Opts)
+
+ // Start the first peer and add the second peer
+ go peer1.AddPeer("peer-2", peer2.ListenAddr)
+ go peer2.AddPeer("peer-1", peer1.ListenAddr)
+
+ // Wait for the connections to be established
+ // You might need a little more time based on network delay and retry logic
+ time.Sleep(5 * time.Second)
+
+ // Send a message from peer-1 to peer-2
+ err := peer1.Send("peer-2", []byte("Hey from peer-1!"))
+ assert.NoError(t, err)
+
+ err = peer2.Send("peer-1", []byte("Hey from peer-2!"))
+ assert.NoError(t, err)
+
+ // Allow some time for the message to be received and handled
+ time.Sleep(2 * time.Second)
+}
+
+// TestSendFailure tests if sending a message fails when no connection exists.
+func TestSendFailure(t *testing.T) {
+ peer1Opts := TCPNetworkOpts{
+ ListenAddr: ":9001",
+ HandshakeFn: DefaultHandshake,
+ RetryDelay: time.Second * 2,
+ Logger: zap.L(),
+ }
+ peer1 := NewTCPNetwork("peer-1", peer1Opts)
+
+ // Create a mock of the second peer (peer-2)
+ peer2Opts := TCPNetworkOpts{
+ ListenAddr: ":9002",
+ HandshakeFn: DefaultHandshake,
+ RetryDelay: time.Second * 2,
+ Logger: zap.L(),
+ }
+ _ = NewTCPNetwork("peer-2", peer2Opts)
+
+ // Attempt to send a message without establishing a connection first
+ err := peer1.Send("peer-2", []byte("Message without connection"))
+ assert.Error(t, err, "Expected error when sending to a non-connected peer")
+}
diff --git a/pkg/p2p/session.go b/pkg/p2p/session.go
new file mode 100644
index 0000000..5d7ab52
--- /dev/null
+++ b/pkg/p2p/session.go
@@ -0,0 +1,23 @@
+package p2p
+
+import (
+ "math/rand"
+)
+
+var adjectives = []string{
+ "adamant", "adept", "adventurous", "arcadian", "auspicious",
+ "awesome", "blossoming", "brave", "charming", "chatty",
+ "circular", "considerate", "cubic", "curious", "delighted",
+}
+
+var nouns = []string{
+ "aardvark", "accordion", "apple", "apricot", "bee",
+ "brachiosaur", "cactus", "capsicum", "clarinet", "cowbell",
+ "crab", "cuckoo", "cymbal", "diplodocus", "donkey",
+}
+
+func NewSession() string {
+ noun := nouns[rand.Intn(len(nouns))]
+ adjective := adjectives[rand.Intn(len(adjectives))]
+ return noun + "-" + adjective
+}