289 lines
6.4 KiB
Go
289 lines
6.4 KiB
Go
package p2p
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
// "log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"thesis/core"
|
|
"time"
|
|
|
|
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
|
|
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
|
|
|
|
"github.com/ipfs/go-log/v2"
|
|
"github.com/libp2p/go-libp2p"
|
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/peerstore"
|
|
multiaddr "github.com/multiformats/go-multiaddr"
|
|
)
|
|
|
|
var logger = log.Logger("rendezvous")
|
|
|
|
type addrList []multiaddr.Multiaddr
|
|
|
|
type Config struct {
|
|
RendezvousString string
|
|
BootstrapPeers addrList
|
|
ListenAddresses addrList
|
|
ProtocolID string
|
|
}
|
|
|
|
func StartNode(port int) host.Host {
|
|
log.SetAllLoggers(log.LevelWarn)
|
|
node, err := libp2p.New(
|
|
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)),
|
|
)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
peerInfo := peer.AddrInfo{
|
|
ID: node.ID(),
|
|
Addrs: node.Addrs(),
|
|
}
|
|
|
|
addrs, _ := peer.AddrInfoToP2pAddrs(&peerInfo)
|
|
fmt.Println("libp2p node address:", addrs[0])
|
|
return node
|
|
}
|
|
|
|
func ConnectToTargetNode(sourceNode host.Host, targetNode host.Host) {
|
|
|
|
targetAddr := host.InfoFromHost(targetNode)
|
|
err := sourceNode.Connect(context.Background(), *targetAddr)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func CountSourceNodePeers(sourceNode host.Host) int {
|
|
return len(sourceNode.Network().Peers())
|
|
}
|
|
|
|
func getHostAddress(ha host.Host) string {
|
|
// Build host multiaddress
|
|
hostAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", ha.ID()))
|
|
|
|
// Now we can build a full multiaddress to reach this host
|
|
// by encapsulating both addresses:
|
|
addr := ha.Addrs()[0]
|
|
return addr.Encapsulate(hostAddr).String()
|
|
}
|
|
|
|
func StartListener(ctx context.Context, ha host.Host) {
|
|
fullAddr := getHostAddress(ha)
|
|
logger.Warn("I am %s\n", fullAddr)
|
|
|
|
ha.SetStreamHandler("/echo/1.0.0", handleStream)
|
|
logger.Warn("listening for connections...")
|
|
bootstrapPeers := make([]peer.AddrInfo, len(dht.DefaultBootstrapPeers))
|
|
for i, addr := range dht.DefaultBootstrapPeers {
|
|
peerinfo, _ := peer.AddrInfoFromP2pAddr(addr)
|
|
bootstrapPeers[i] = *peerinfo
|
|
}
|
|
kademliaDHT, err := dht.New(ctx, ha, dht.BootstrapPeers(bootstrapPeers...))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
logger.Warn("Bootstrapping the DHT")
|
|
if err = kademliaDHT.Bootstrap(ctx); err != nil {
|
|
panic(err)
|
|
}
|
|
rendezvousString := "meet me here"
|
|
|
|
// Wait a bit to let bootstrapping finish (really bootstrap should block until it's ready, but that isn't the case yet.)
|
|
time.Sleep(1 * time.Second)
|
|
|
|
// We use a rendezvous point "meet me here" to announce our location.
|
|
// This is like telling your friends to meet you at the Eiffel Tower.
|
|
logger.Warn("Announcing ourselves...")
|
|
routingDiscovery := drouting.NewRoutingDiscovery(kademliaDHT)
|
|
dutil.Advertise(ctx, routingDiscovery, rendezvousString)
|
|
logger.Warn("Successfully announced!")
|
|
|
|
// Now, look for others who have announced
|
|
// This is like your friend telling you the location to meet you.
|
|
myctx := context.Background()
|
|
logger.Warn("Searching for other peers...")
|
|
peerChan, err := routingDiscovery.FindPeers(myctx, rendezvousString)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
for peer := range peerChan {
|
|
if peer.ID == ha.ID() {
|
|
logger.Warn("wew")
|
|
continue
|
|
}
|
|
|
|
logger.Warn("Found peer:", peer)
|
|
|
|
logger.Warn("Connecting to:", peer)
|
|
stream, err := ha.NewStream(myctx, peer.ID, "/echo/1.0.0")
|
|
|
|
if err != nil {
|
|
logger.Warn("Connection failed:", err)
|
|
continue
|
|
} else {
|
|
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
|
|
|
|
go writeData(rw)
|
|
go readData(rw)
|
|
}
|
|
|
|
logger.Warn("Connected to:", peer)
|
|
}
|
|
|
|
select {}
|
|
|
|
}
|
|
|
|
func handleStream(s network.Stream) {
|
|
logger.Warn("Got a new stream!")
|
|
|
|
// Create a buffer stream for non-blocking read and write.
|
|
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
|
|
|
|
go readData(rw)
|
|
go writeData(rw)
|
|
|
|
// stream 's' will stay open until you close it (or the other side closes it).
|
|
}
|
|
|
|
func readData(rw *bufio.ReadWriter) {
|
|
for {
|
|
str, _ := rw.ReadString('\n')
|
|
|
|
if str == "" {
|
|
return
|
|
}
|
|
if str != "\n" {
|
|
|
|
fmt.Println(str)
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
func writeData(rw *bufio.ReadWriter) {
|
|
rw.WriteString("{\"Status\": \"ok\"}\n")
|
|
rw.Flush()
|
|
}
|
|
|
|
// doEcho reads a line of data a stream and writes it back
|
|
func doEcho(s network.Stream) error {
|
|
|
|
buf := make([]byte, 1024)
|
|
n, err := s.Read(buf)
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
|
|
var msg core.TransactionContent
|
|
err = json.Unmarshal(buf[:n], &msg)
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
|
|
fmt.Println("Received message:", msg)
|
|
return err
|
|
}
|
|
|
|
func CloseNode(sourceNode host.Host) {
|
|
if err := sourceNode.Close(); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func RunSender(ctx context.Context, ha host.Host, targetPeer string, content []byte) {
|
|
fullAddr := getHostAddress(ha)
|
|
logger.Warn("I am %s\n", fullAddr)
|
|
maddr, err := multiaddr.NewMultiaddr(targetPeer)
|
|
handleError(err)
|
|
info, err := peer.AddrInfoFromP2pAddr(maddr)
|
|
handleError(err)
|
|
ha.Peerstore().AddAddr(info.ID, info.Addrs[0], peerstore.PermanentAddrTTL)
|
|
logger.Warn("Sender stream opened")
|
|
|
|
s, err := ha.NewStream(context.Background(), info.ID, "/echo/1.0.0")
|
|
handleError(err)
|
|
logger.Warn("Sending...")
|
|
_, err = s.Write(content)
|
|
if err != nil {
|
|
logger.Warn(err)
|
|
return
|
|
}
|
|
// s.Close()
|
|
|
|
buf := make([]byte, 1024)
|
|
_, err = s.Read(buf)
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
|
|
// var msg core.TransactionContent
|
|
// err = json.Unmarshal(buf[:n], &msg)
|
|
// if err != nil {
|
|
// log.Fatal(err)
|
|
// }
|
|
|
|
logger.Warn("Read reply: ", string(buf))
|
|
s.Close()
|
|
}
|
|
|
|
func handleError(err error) {
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func POC() {
|
|
node, err := libp2p.New(
|
|
libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"),
|
|
)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
peerInfo := peer.AddrInfo{
|
|
ID: node.ID(),
|
|
Addrs: node.Addrs(),
|
|
}
|
|
|
|
addrs, _ := peer.AddrInfoToP2pAddrs(&peerInfo)
|
|
fmt.Println("libp2p node address:", addrs[0])
|
|
|
|
if len(os.Args) > 1 {
|
|
addr, err := multiaddr.NewMultiaddr(os.Args[1])
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
peer, err := peer.AddrInfoFromP2pAddr(addr)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
if err := node.Connect(context.Background(), *peer); err != nil {
|
|
panic(err)
|
|
}
|
|
} else {
|
|
ch := make(chan os.Signal, 1)
|
|
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
|
|
<-ch
|
|
fmt.Println("Received signal, shutting down...")
|
|
}
|
|
|
|
if err := node.Close(); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
}
|