package p2p import ( "bufio" "context" "encoding/json" "fmt" // "log" "os" "os/signal" "syscall" "thesis/core" "time" drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing" dutil "github.com/libp2p/go-libp2p/p2p/discovery/util" "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" multiaddr "github.com/multiformats/go-multiaddr" ) var logger = log.Logger("rendezvous") type addrList []multiaddr.Multiaddr type Config struct { RendezvousString string BootstrapPeers addrList ListenAddresses addrList ProtocolID string } func StartNode(port int) host.Host { log.SetAllLoggers(log.LevelWarn) node, err := libp2p.New( libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)), ) if err != nil { panic(err) } peerInfo := peer.AddrInfo{ ID: node.ID(), Addrs: node.Addrs(), } addrs, _ := peer.AddrInfoToP2pAddrs(&peerInfo) fmt.Println("libp2p node address:", addrs[0]) return node } func ConnectToTargetNode(sourceNode host.Host, targetNode host.Host) { targetAddr := host.InfoFromHost(targetNode) err := sourceNode.Connect(context.Background(), *targetAddr) if err != nil { panic(err) } } func CountSourceNodePeers(sourceNode host.Host) int { return len(sourceNode.Network().Peers()) } func getHostAddress(ha host.Host) string { // Build host multiaddress hostAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", ha.ID())) // Now we can build a full multiaddress to reach this host // by encapsulating both addresses: addr := ha.Addrs()[0] return addr.Encapsulate(hostAddr).String() } func StartListener(ctx context.Context, ha host.Host) { fullAddr := getHostAddress(ha) logger.Warn("I am %s\n", fullAddr) ha.SetStreamHandler("/echo/1.0.0", handleStream) logger.Warn("listening for connections...") bootstrapPeers := make([]peer.AddrInfo, len(dht.DefaultBootstrapPeers)) for i, addr := range dht.DefaultBootstrapPeers { peerinfo, _ := peer.AddrInfoFromP2pAddr(addr) bootstrapPeers[i] = *peerinfo } kademliaDHT, err := dht.New(ctx, ha, dht.BootstrapPeers(bootstrapPeers...)) if err != nil { panic(err) } logger.Warn("Bootstrapping the DHT") if err = kademliaDHT.Bootstrap(ctx); err != nil { panic(err) } rendezvousString := "meet me here" // Wait a bit to let bootstrapping finish (really bootstrap should block until it's ready, but that isn't the case yet.) time.Sleep(1 * time.Second) // We use a rendezvous point "meet me here" to announce our location. // This is like telling your friends to meet you at the Eiffel Tower. logger.Warn("Announcing ourselves...") routingDiscovery := drouting.NewRoutingDiscovery(kademliaDHT) dutil.Advertise(ctx, routingDiscovery, rendezvousString) logger.Warn("Successfully announced!") // Now, look for others who have announced // This is like your friend telling you the location to meet you. myctx := context.Background() logger.Warn("Searching for other peers...") peerChan, err := routingDiscovery.FindPeers(myctx, rendezvousString) if err != nil { panic(err) } for peer := range peerChan { if peer.ID == ha.ID() { logger.Warn("wew") continue } logger.Warn("Found peer:", peer) logger.Warn("Connecting to:", peer) stream, err := ha.NewStream(myctx, peer.ID, "/echo/1.0.0") if err != nil { logger.Warn("Connection failed:", err) continue } else { rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) go writeData(rw) go readData(rw) } logger.Warn("Connected to:", peer) } select {} } func handleStream(s network.Stream) { logger.Warn("Got a new stream!") // Create a buffer stream for non-blocking read and write. rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s)) go readData(rw) go writeData(rw) // stream 's' will stay open until you close it (or the other side closes it). } func readData(rw *bufio.ReadWriter) { for { str, _ := rw.ReadString('\n') if str == "" { return } if str != "\n" { fmt.Println(str) } } } func writeData(rw *bufio.ReadWriter) { rw.WriteString("{\"Status\": \"ok\"}\n") rw.Flush() } // doEcho reads a line of data a stream and writes it back func doEcho(s network.Stream) error { buf := make([]byte, 1024) n, err := s.Read(buf) if err != nil { logger.Fatal(err) } var msg core.TransactionContent err = json.Unmarshal(buf[:n], &msg) if err != nil { logger.Fatal(err) } fmt.Println("Received message:", msg) return err } func CloseNode(sourceNode host.Host) { if err := sourceNode.Close(); err != nil { panic(err) } } func RunSender(ctx context.Context, ha host.Host, targetPeer string, content []byte) { fullAddr := getHostAddress(ha) logger.Warn("I am %s\n", fullAddr) maddr, err := multiaddr.NewMultiaddr(targetPeer) handleError(err) info, err := peer.AddrInfoFromP2pAddr(maddr) handleError(err) ha.Peerstore().AddAddr(info.ID, info.Addrs[0], peerstore.PermanentAddrTTL) logger.Warn("Sender stream opened") s, err := ha.NewStream(context.Background(), info.ID, "/echo/1.0.0") handleError(err) logger.Warn("Sending...") _, err = s.Write(content) if err != nil { logger.Warn(err) return } // s.Close() buf := make([]byte, 1024) _, err = s.Read(buf) if err != nil { logger.Fatal(err) } // var msg core.TransactionContent // err = json.Unmarshal(buf[:n], &msg) // if err != nil { // log.Fatal(err) // } logger.Warn("Read reply: ", string(buf)) s.Close() } func handleError(err error) { if err != nil { panic(err) } } func POC() { node, err := libp2p.New( libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), ) if err != nil { panic(err) } peerInfo := peer.AddrInfo{ ID: node.ID(), Addrs: node.Addrs(), } addrs, _ := peer.AddrInfoToP2pAddrs(&peerInfo) fmt.Println("libp2p node address:", addrs[0]) if len(os.Args) > 1 { addr, err := multiaddr.NewMultiaddr(os.Args[1]) if err != nil { panic(err) } peer, err := peer.AddrInfoFromP2pAddr(addr) if err != nil { panic(err) } if err := node.Connect(context.Background(), *peer); err != nil { panic(err) } } else { ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) <-ch fmt.Println("Received signal, shutting down...") } if err := node.Close(); err != nil { panic(err) } }