作为区块链分布式网络结构的基础,以太坊P2P网络是如何发展的呢?
在设计公链时,节点与节点之间建立连接需要 P2P 协议,从而实现数据的同步,于此同时上层应用还需要封装一些通信逻辑,比如节点之间的区块同步、交易数据同步等。
本篇文章将对 P2P 网络发展进行简单概述,同时将从源码角度对以太坊中的节点发现机制、分布式哈希表、节点查找、节点新增、节点移除等进行简单介绍,并对其 P2P 网络安全性设计进行简要分析
P2P网络不同于传统的 CS 结构,在 P2P 网络中每个节点既可以是客户端也可以是服务端,节点之间的通信协议一般直接通过 Socket 实现,P2P技术发展至今经历了以下四个发展阶段:
四种网络结构对比如下:
节点发现是任何节点接入 P2P 网络的第一步,节点发现可以分为两种:
在 P2P 网络中,初始节点在启动时会通过一些长期稳定运行的节点快速发现网络中的其他节点,这些节点被称为"种子节点"(一般代码中会硬编码种子节点信息),一般情况下种子节点可以分为两种:
Kademlia 是一种分布式哈希表( DHT )技术,与其他 DHT 技术相比,KDA 算法使用异或算法计算节点之间的距离,进而建立了全新的 DHT 拓扑结构,这种算法可以极大地提高路由的查询速度。
哈希表是用于存储键值对的一种容器,键值对有被称为 Key/Value 对,哈希表数据结构中包含 N 个 bucket (桶),对于某个具体的哈希表,N (桶的数量)通常是固定不变的,于是可以对每个桶编号,0~N-1,桶是用来存储键值对的,可以简单的将其理解为一个动态数组,里面存放多个键值对。
下图展示了哈希表的查找原理,我们可以方便快速地通过 Key 来获取 value,当使用某个 key 进行查找时,先用某个哈希函数计算这个key 的哈希值,得到的哈希值通常是一个整数,之后使用哈希值对 N (桶数)进行取模运算(除法求余数),就可以算出对应的桶编号。
说到哈希表不得不提一下哈希表碰撞,当两个不同的 Key 进行哈希计算得到相同的哈希值时,就是所谓的哈希函数碰撞,一旦出现这种情况,这两个 key 对应的两个键值对就会被存在在同一个桶中( bucket )中,另一中散列碰撞是虽然计算出来的哈希值不同,但经过取模运算之后得到相同的桶编号,这时候也会将两个键值对存储在一个桶中,哈希碰撞原理如下图所示:
如果某个哈希表在存储数据时完全没有碰撞,那么每个桶里都只有0个或1个键值对,这样查找起来就非常快,反之,如果某个哈希表在存储数据时出现严重碰撞,那么就会导致某些桶里存储了很多键值对,那么在查找key的时候需要在这个桶里面逐一对比key是否相同,查找效率会变得很低~
分布式哈希表在概念上类似于传统的哈希表,差异在于传统的哈希表主要用于单机上的某个软件中,分布式哈希表主要用于分布式系统(此时,分布式系统的节点可以通俗的理解为 hash 表中的 bucket ),分布式哈希表主要用于存储大量(甚至海量)的数据,分布式哈希表的原理如下图所示:
以太坊底层的P2PServer大致可以分为以下三层:
表数据结构如下所示:
// filedir:go-ethereum-1.10.2\p2p\discover\table.go L40
const (
alpha = 3 // Kademlia concurrency factor
bucketSize = 16 // Kademlia bucket size
maxReplacements = 10 // Size of per-bucket replacement list
// We keep buckets for the upper 1/15 of distances because
// it's very unlikely we'll ever encounter a node that's closer.
hashBits = len(common.Hash{}) * 8
nBuckets = hashBits / 15 // Number of buckets
bucketMinDistance = hashBits - nBuckets // Log distance of closest bucket
// IP address limits.
bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24
tableIPLimit, tableSubnet = 10, 24
refreshInterval = 30 * time.Minute
revalidateInterval = 10 * time.Second
copyNodesInterval = 30 * time.Second
seedMinTableTime = 5 * time.Minute
seedCount = 30
seedMaxAge = 5 * 24 * time.Hour
)
type Table struct {
mutex sync.Mutex // protects buckets, bucket content, nursery, rand
buckets [nBuckets]*bucket // index of known nodes by distance
nursery []*node // bootstrap nodes
rand *mrand.Rand // source of randomness, periodically reseeded
ips netutil.DistinctNetSet
log log.Logger
db *enode.DB // database of known nodes
net transport
refreshReq chan chan struct{}
initDone chan struct{}
closeReq chan struct{}
closed chan struct{}
nodeAddedHook func(*node) // for testing
}
type bucket struct {
entries []*node // live entries, sorted by time of last contact
replacements []*node // recently seen nodes to be used if revalidation fails
ips netutil.DistinctNetSet
}
关键的几个变量:
newTable函数用于创建新的表:
// filedir:go-ethereum-1.10.2\p2p\discover\table.go L102
func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger) (*Table, error) {
tab := &Table{
net: t,
db: db,
refreshReq: make(chan chan struct{}),
initDone: make(chan struct{}),
closeReq: make(chan struct{}),
closed: make(chan struct{}),
rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
log: log,
}
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
}
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
tab.seedRand()
tab.loadSeedNodes()
return tab, nil
}
在上述代码中首先使用传入的参数初始化了一个Table的对象tab,调用setFallbackNodes函数设置初始链接节点(即获得5个nursey节点,后面如果table为空或者数据库中没有节点信息时这些节点将被用于去链接网络),之后通过一个for循环结合函数ValidateComplete来验证节点是否有效。
之后初始化K桶:
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
之后从table.buckets中随机取30个节点加载种子节点到相应的bucket:
tab.seedRand()
tab.loadSeedNodes()
return tab, nil
loadSeedNodes函数的具体实现如下所示(这里的seedCount为table.go中最上方定义的全局变量,值为30):
// filedir: go-ethereum-1.10.2\p2p\discover\table.go L302
func (tab *Table) loadSeedNodes() {
seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
seeds = append(seeds, tab.nursery...)
for i := range seeds {
seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
tab.addSeenNode(seed)
}
}
这里的addSeenNode即用于添加节点到bucket,在这里会检查要添加的节点是否已经存在以及bucket是否已满,如果已满则调用tab.addReplacement(b, n)将节点添加到replacement列表中去,之后添加IP,之后更新bucket:
// filedir: go-ethereum-1.10.2\p2p\discover\table.go L458
func (tab *Table) addSeenNode(n *node) {
if n.ID() == tab.self().ID() {
return
}
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(n.ID())
if contains(b.entries, n.ID()) {
// Already in bucket, don't add.
return
}
if len(b.entries) >= bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, n)
return
}
if !tab.addIP(b, n.IP()) {
// Can't add: IP limit reached.
return
}
// Add to end of bucket:
b.entries = append(b.entries, n)
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
}
}
loop函数是table.go中的主循环,在函数开头出定义了后续会使用到的局部变量,之后通过deRefresh进行刷新桶操作,在这里的loop循环会每隔30分钟自动刷新一次K桶,每隔10秒钟验证K桶节点是否可以ping通,每30秒将K桶中存在超过5分钟的节点存储本地数据库,视作稳定节点:
// filedir: go-ethereum-1.10.2\p2p\discover\table.go L55
refreshInterval = 30 * time.Minute
revalidateInterval = 10 * time.Second
copyNodesInterval = 30 * time.Second
// filedir: go-ethereum-1.10.2\p2p\discover\table.go L218
// loop schedules runs of doRefresh, doRevalidate and copyLiveNodes.
func (tab *Table) loop() {
var (
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
refreshDone = make(chan struct{}) // where doRefresh reports completion
revalidateDone chan struct{} // where doRevalidate reports completion
waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
)
defer refresh.Stop()
defer revalidate.Stop()
defer copyNodes.Stop()
// Start initial refresh.
go tab.doRefresh(refreshDone)
loop:
for {
select {
case <-refresh.C: //定时刷新k桶事件,refreshInterval=30 min
tab.seedRand()
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case req := <-tab.refreshReq: //刷新k桶的请求事件
waiting = append(waiting, req)
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case <-refreshDone: // doRefresh完成
for _, ch := range waiting {
close(ch)
}
waiting, refreshDone = nil, nil
case <-revalidate.C: // 验证k桶节点有效性,10 second
revalidateDone = make(chan struct{})
go tab.doRevalidate(revalidateDone)
case <-revalidateDone: // 验证K桶节点有效性完成
revalidate.Reset(tab.nextRevalidateTime())
revalidateDone = nil
case <-copyNodes.C: // 定时(30秒)将节点存入数据库,如果某个节点在k桶中存在超过5分钟,则认为它是一个稳定的节点
go tab.copyLiveNodes()
case <-tab.closeReq:
break loop
}
}
if refreshDone != nil {
<-refreshDone
}
for _, ch := range waiting {
close(ch)
}
if revalidateDone != nil {
<-revalidateDone
}
close(tab.closed)
}
getNode函数用于根据ID来查找节点,如果不存在则返回nil:
// getNode returns the node with the given ID or nil if it isn't in the table.
func (tab *Table) getNode(id enode.ID) *enode.Node {
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(id)
for _, e := range b.entries {
if e.ID() == id {
return unwrapNode(e)
}
}
return nil
}
getNode函数用于根据ID来查找节点,如果不存在则返回nil:
// getNode returns the node with the given ID or nil if it isn't in the table.
func (tab *Table) getNode(id enode.ID) *enode.Node {
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(id)
for _, e := range b.entries {
if e.ID() == id {
return unwrapNode(e)
}
}
return nil
}
以太坊分布式网络采用了结构化网络模型,其实现方案使用Kademlia协议,下面我们对节点发现进行简单介绍,在以太坊中k值是16,也就是说每个k桶包含16个节点,一共256个k桶,K桶中记录节点的NodeId,Distance,Endpoint,IP等信息,并按照与Target节点的距离排序,节点查找由doRefresh()实现:
// filedir:go-ethereum-1.10.2\p2p\discover\table.go L278
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
tab.loadSeedNodes()
// Run self lookup to discover new neighbor nodes.
tab.net.lookupSelf()
for i := 0; i < 3; i++ {
tab.net.lookupRandom()
}
}
从上述代码中可以看到这里首先调用tab.loadSeedNodes()从数据库中加载节点并将其插入到表中去:
// filedir: go-ethereum-1.10.2\p2p\discover\table.go L302
func (tab *Table) loadSeedNodes() {
seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
seeds = append(seeds, tab.nursery...)
for i := range seeds {
seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
tab.addSeenNode(seed)
}
}
// filedir:go-ethereum-1.10.2\p2p\enode\nodedb.go L440
// QuerySeeds retrieves random nodes to be used as potential seed nodes
// for bootstrapping.
func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
var (
now = time.Now()
nodes = make([]*Node, 0, n)
it = db.lvl.NewIterator(nil, nil)
id ID
)
defer it.Release()
seek:
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
ctr := id[0]
rand.Read(id[:])
id[0] = ctr + id[0]%16
it.Seek(nodeKey(id))
n := nextNode(it)
if n == nil {
id[0] = 0
continue seek // iterator exhausted
}
if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge {
continue seek
}
for i := range nodes {
if nodes[i].ID() == n.ID() {
continue seek // duplicate
}
}
nodes = append(nodes, n)
}
return nodes
}
之后通过lookupSelf来发现新的节点,这里会优先使用当前节点的ID来运行newLookup发现邻居节点:
// filedir: go-ethereum-1.10.2\p2p\discover\v5_udp.go L281
// lookupSelf looks up our own node ID.
// This is needed to satisfy the transport interface.
func (t *UDPv5) lookupSelf() []*enode.Node {
return t.newLookup(t.closeCtx, t.Self().ID()).run()
}
// filedir: go-ethereum-1.10.2\p2p\discover\v5_udp.go L292
func (t *UDPv5) newLookup(ctx context.Context, target enode.ID) *lookup {
return newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) {
return t.lookupWorker(n, target)
})
}
func newLookup(ctx context.Context, tab *Table, target enode.ID, q queryFunc) *lookup {
it := &lookup{
tab: tab,
queryfunc: q,
asked: make(map[enode.ID]bool),
seen: make(map[enode.ID]bool),
result: nodesByDistance{target: target},
replyCh: make(chan []*node, alpha),
cancelCh: ctx.Done(),
queries: -1,
}
// Don't query further if we hit ourself.
// Unlikely to happen often in practice.
it.asked[tab.self().ID()] = true
return it
}
最后随机一个target,进行lookup:
// filedir: go-ethereum-1.10.2\p2p\discover\v5_udp.go L275
func (t *UDPv5) lookupRandom() []*enode.Node {
return t.newRandomLookup(t.closeCtx).run()
}
// filedir: go-ethereum-1.10.2\p2p\discover\v5_udp.go L287
func (t *UDPv5) newRandomLookup(ctx context.Context) *lookup {
var target enode.ID
crand.Read(target[:])
return t.newLookup(ctx, target)
}
func (t *UDPv5) newLookup(ctx context.Context, target enode.ID) *lookup {
return newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) {
return t.lookupWorker(n, target)
})
}
// lookupWorker performs FINDNODE calls against a single node during lookup.
func (t *UDPv5) lookupWorker(destNode *node, target enode.ID) ([]*node, error) {
var (
dists = lookupDistances(target, destNode.ID())
nodes = nodesByDistance{target: target}
err error
)
var r []*enode.Node
r, err = t.findnode(unwrapNode(destNode), dists)
if err == errClosed {
return nil, err
}
for _, n := range r {
if n.ID() != t.Self().ID() {
nodes.push(wrapNode(n), findnodeResultLimit)
}
}
return nodes.entries, err
}
// filedir:go-ethereum-1.10.2\p2p\discover\v5_udp.go L362
// findnode calls FINDNODE on a node and waits for responses.
func (t *UDPv5) findnode(n *enode.Node, distances []uint) ([]*enode.Node, error) {
resp := t.call(n, v5wire.NodesMsg, &v5wire.Findnode{Distances: distances})
return t.waitForNodes(resp, distances)
}
func (t *UDPv5) call(node *enode.Node, responseType byte, packet v5wire.Packet) *callV5 {
c := &callV5{
node: node,
packet: packet,
responseType: responseType,
reqid: make([]byte, 8),
ch: make(chan v5wire.Packet, 1),
err: make(chan error, 1),
}
// Assign request ID.
crand.Read(c.reqid)
packet.SetRequestID(c.reqid)
// Send call to dispatch.
select {
case t.callCh <- c:
case <-t.closeCtx.Done():
c.err <- errClosed
}
return c
}
server端的的数据结构如下所示:
// filedir:go-ethereum-1.10.2\p2p\server.go L160
// Server manages all peer connections.
type Server struct {
// Config fields may not be modified while the server is running.
Config
// Hooks for testing. These are useful because we can inhibit
// the whole protocol stack.
newTransport func(net.Conn, *ecdsa.PublicKey) transport
newPeerHook func(*Peer)
listenFunc func(network, addr string) (net.Listener, error)
lock sync.Mutex // protects running
running bool
listener net.Listener
ourHandshake *protoHandshake
loopWG sync.WaitGroup // loop, listenLoop
peerFeed event.Feed
log log.Logger
nodedb *enode.DB
localnode *enode.LocalNode
ntab *discover.UDPv4
DiscV5 *discover.UDPv5
discmix *enode.FairMix
dialsched *dialScheduler
// Channels into the run loop.
quit chan struct{}
addtrusted chan *enode.Node
removetrusted chan *enode.Node
peerOp chan peerOpFunc
peerOpDone chan struct{}
delpeer chan peerDrop
checkpointPostHandshake chan *conn
checkpointAddPeer chan *conn
// State of run loop and listenLoop.
inboundHistory expHeap
}
Server配置(本地节点秘钥、拨号比率、节点最大链接数、拨号比率、事件记录等):
// Config holds Server options.
type Config struct {
PrivateKey *ecdsa.PrivateKey `toml:"-"`
MaxPeers int
MaxPendingPeers int `toml:",omitempty"`
DialRatio int `toml:",omitempty"`
NoDiscovery bool
DiscoveryV5 bool `toml:",omitempty"`
Name string `toml:"-"`
BootstrapNodes []*enode.Node
BootstrapNodesV5 []*enode.Node `toml:",omitempty"`
StaticNodes []*enode.Node
TrustedNodes []*enode.Node
NetRestrict *netutil.Netlist `toml:",omitempty"`
NodeDatabase string `toml:",omitempty"`
Protocols []Protocol `toml:"-"`
ListenAddr string
NAT nat.Interface `toml:",omitempty"`
Dialer NodeDialer `toml:"-"`
NoDial bool `toml:",omitempty"`
EnableMsgEvents bool
Logger log.Logger `toml:",omitempty"`
clock mclock.Clock
}
AddPeer函数用于新增一个给定的节点,其实现代码如下所示:
// filedir:go-ethereum-1.10.2\p2p\server.go L318
func (srv *Server) AddPeer(node *enode.Node) {
srv.dialsched.addStatic(node)
}
// filedir:go-ethereum-1.10.2\p2p\dial.go L190
// addStatic adds a static dial candidate.
func (d *dialScheduler) addStatic(n *enode.Node) {
select {
case d.addStaticCh <- n:
case <-d.ctx.Done():
}
}
AddTrustedPeer函数用于新增一个可信任节点:
// AddTrustedPeer adds the given node to a reserved whitelist which allows the
// node to always connect, even if the slot are full.
func (srv *Server) AddTrustedPeer(node *enode.Node) {
select {
case srv.addtrusted <- node:
case <-srv.quit:
}
}
RemovePeer函数用于移除节点并断开与节点之间的连接:
// filedir:go-ethereum-1.10.2\p2p\server.go L325
func (srv *Server) RemovePeer(node *enode.Node) {
var (
ch chan *PeerEvent
sub event.Subscription
)
// Disconnect the peer on the main loop.
srv.doPeerOp(func(peers map[enode.ID]*Peer) {
srv.dialsched.removeStatic(node)
if peer := peers[node.ID()]; peer != nil {
ch = make(chan *PeerEvent, 1)
sub = srv.peerFeed.Subscribe(ch)
peer.Disconnect(DiscRequested)
}
})
// Wait for the peer connection to end.
if ch != nil {
defer sub.Unsubscribe()
for ev := range ch {
if ev.Peer == node.ID() && ev.Type == PeerEventTypeDrop {
return
}
}
}
}
// filedir:go-ethereum-1.10.2\p2p\dial.go L198
// removeStatic removes a static dial candidate.
func (d *dialScheduler) removeStatic(n *enode.Node) {
select {
case d.remStaticCh <- n:
case <-d.ctx.Done():
}
}
RemoveTrustedPeer函数用于移除一个可信任节点:
// RemoveTrustedPeer removes the given node from the trusted peer set.
func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
select {
case srv.removetrusted <- node:
case <-srv.quit:
}
}
Stop函数用于终止节点运行,具体代码如下所示:
func (srv *Server) Stop() {
srv.lock.Lock()
if !srv.running {
srv.lock.Unlock()
return
}
srv.running = false
if srv.listener != nil {
// this unblocks listener Accept
srv.listener.Close()
}
close(srv.quit)
srv.lock.Unlock()
srv.loopWG.Wait()
}
位于go-ethereum-1.10.2\p2p\server.go中的start函数用于启动一个P2P节点:
// filedir:go-ethereum-1.10.2\p2p\server.go L433
func (srv *Server) Start() (err error) {
srv.lock.Lock()
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")
}
srv.running = true
srv.log = srv.Config.Logger
if srv.log == nil {
srv.log = log.Root()
}
if srv.clock == nil {
srv.clock = mclock.System{}
}
if srv.NoDial && srv.ListenAddr == "" {
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
// static fields
if srv.PrivateKey == nil {
return errors.New("Server.PrivateKey must be set to a non-nil key")
}
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
if srv.listenFunc == nil {
srv.listenFunc = net.Listen
}
srv.quit = make(chan struct{})
srv.delpeer = make(chan peerDrop)
srv.checkpointPostHandshake = make(chan *conn)
srv.checkpointAddPeer = make(chan *conn)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
if err := srv.setupLocalNode(); err != nil {
return err
}
if srv.ListenAddr != "" {
if err := srv.setupListening(); err != nil {
return err
}
}
if err := srv.setupDiscovery(); err != nil {
return err
}
srv.setupDialScheduler()
srv.loopWG.Add(1)
go srv.run()
return nil
}
在这里首先检查当前节点是否处于运行状态,如果是则直接返回并给出错误提示信息,如果不是则将srv.running设置为true,之后进入服务启动流程,之后检查log是否开启等,之后初始化配置P2P服务信息:
// Start starts running the server.
// Servers can not be re-used after stopping.
func (srv *Server) Start() (err error) {
srv.lock.Lock()
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")
}
srv.running = true
srv.log = srv.Config.Logger
if srv.log == nil {
srv.log = log.Root()
}
if srv.clock == nil {
srv.clock = mclock.System{}
}
if srv.NoDial && srv.ListenAddr == "" {
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
// static fields
if srv.PrivateKey == nil {
return errors.New("Server.PrivateKey must be set to a non-nil key")
}
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
if srv.listenFunc == nil {
srv.listenFunc = net.Listen
}
srv.quit = make(chan struct{})
srv.delpeer = make(chan peerDrop)
srv.checkpointPostHandshake = make(chan *conn)
srv.checkpointAddPeer = make(chan *conn)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
之后调用setupLocalNode来启动一个本地节点,并建立本地监听,然后配置一个DiscoveryV5网络协议,生成节点路由表。
调用setupDialScheduler启动主动拨号连接过程,然后开一个协程,在其中做peer的维护:
srv.setupDialScheduler()
srv.loopWG.Add(1)
go srv.run()
return nil
}
setupDialScheduler代码如下所示,这里通过newDialScheduler来建立连接,参数discmix确定了进行主动建立连接时的节点集,它是一个迭代器 ,同时将setupConn连接建立函数传入:
func (srv *Server) setupDialScheduler() {
config := dialConfig{
self: srv.localnode.ID(),
maxDialPeers: srv.maxDialedConns(),
maxActiveDials: srv.MaxPendingPeers,
log: srv.Logger,
netRestrict: srv.NetRestrict,
dialer: srv.Dialer,
clock: srv.clock,
}
if srv.ntab != nil {
config.resolver = srv.ntab
}
if config.dialer == nil {
config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn)
for _, n := range srv.StaticNodes {
srv.dialsched.addStatic(n)
}
}
newDialScheduler函数如下所示,在这里通过d.readNodes(it)从迭代器中取得节点,之后通过通道传入d.loop(it)中进行连接:
// filedir:go-ethereum-1.10.2\p2p\dial.go L162
func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
d := &dialScheduler{
dialConfig: config.withDefaults(),
setupFunc: setupFunc,
dialing: make(map[enode.ID]*dialTask),
static: make(map[enode.ID]*dialTask),
peers: make(map[enode.ID]connFlag),
doneCh: make(chan *dialTask),
nodesIn: make(chan *enode.Node),
addStaticCh: make(chan *enode.Node),
remStaticCh: make(chan *enode.Node),
addPeerCh: make(chan *conn),
remPeerCh: make(chan *conn),
}
d.lastStatsLog = d.clock.Now()
d.ctx, d.cancel = context.WithCancel(context.Background())
d.wg.Add(2)
go d.readNodes(it)
go d.loop(it)
return d
}
在上面的服务启动过程中有一个setupListening函数,该函数用于监听事件,具体代码如下所示:
func (srv *Server) setupListening() error {
// Launch the listener.
listener, err := srv.listenFunc("tcp", srv.ListenAddr)
if err != nil {
return err
}
srv.listener = listener
srv.ListenAddr = listener.Addr().String()
// Update the local node record and map the TCP listening port if NAT is configured.
if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
srv.localnode.Set(enr.TCP(tcp.Port))
if !tcp.IP.IsLoopback() && srv.NAT != nil {
srv.loopWG.Add(1)
go func() {
nat.Map(srv.NAT, srv.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
srv.loopWG.Done()
}()
}
}
srv.loopWG.Add(1)
go srv.listenLoop()
return nil
}
在上述代码中又调用了一个srv.listenLoop(),该函数是一个死循环的goroutine,它会监听端口并接收外部的请求:
// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop() {
srv.log.Debug("TCP listener up", "addr", srv.listener.Addr())
// The slots channel limits accepts of new connections.
tokens := defaultMaxPendingPeers
if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
slots <- struct{}{}
}
// Wait for slots to be returned on exit. This ensures all connection goroutines
// are down before listenLoop returns.
defer srv.loopWG.Done()
defer func() {
for i := 0; i < cap(slots); i++ {
<-slots
}
}()
for {
// Wait for a free slot before accepting.
<-slots
var (
fd net.Conn
err error
lastLog time.Time
)
for {
fd, err = srv.listener.Accept()
if netutil.IsTemporaryError(err) {
if time.Since(lastLog) > 1*time.Second {
srv.log.Debug("Temporary read error", "err", err)
lastLog = time.Now()
}
time.Sleep(time.Millisecond * 200)
continue
} else if err != nil {
srv.log.Debug("Read error", "err", err)
slots <- struct{}{}
return
}
break
}
remoteIP := netutil.AddrIP(fd.RemoteAddr())
if err := srv.checkInboundConn(remoteIP); err != nil {
srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
fd.Close()
slots <- struct{}{}
continue
}
if remoteIP != nil {
var addr *net.TCPAddr
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
addr = tcp
}
fd = newMeteredConn(fd, true, addr)
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
}
go func() {
srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{}
}()
}
}
这里的SetupConn主要执行执行握手协议,并尝试把链接创建为一个peer对象:
// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
c := &conn{fd: fd, flags: flags, cont: make(chan error)}
if dialDest == nil {
c.transport = srv.newTransport(fd, nil)
} else {
c.transport = srv.newTransport(fd, dialDest.Pubkey())
}
err := srv.setupConn(c, flags, dialDest)
if err != nil {
c.close(err)
}
return err
}
在上述代码中又去调用了srv.setupConn(c, flags, dialDest)函数,该函数用于执行握手协议:
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {
// Prevent leftover pending conns from entering the handshake.
srv.lock.Lock()
running := srv.running
srv.lock.Unlock()
if !running {
return errServerStopped
}
// If dialing, figure out the remote public key.
var dialPubkey *ecdsa.PublicKey
if dialDest != nil { // dest=nil 被动连接,dest!=nil主动连接诶
dialPubkey = new(ecdsa.PublicKey)
if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
err = errors.New("dial destination doesn't have a secp256k1 public key")
srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return err
}
}
// Run the RLPx handshake.
remotePubkey, err := c.doEncHandshake(srv.PrivateKey) // 公钥交换,确定共享秘钥RLPx层面的握手一来一去
if err != nil {
srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return err
}
if dialDest != nil {
c.node = dialDest
} else {
c.node = nodeFromConn(remotePubkey, c.fd)
}
clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)
err = srv.checkpoint(c, srv.checkpointPostHandshake)
if err != nil {
clog.Trace("Rejected peer", "err", err)
return err
}
// Run the capability negotiation handshake.
phs, err := c.doProtoHandshake(srv.ourHandshake) // 进行协议层面的握手,也即p2p握手,一来一去
if err != nil {
clog.Trace("Failed p2p handshake", "err", err)
return err
}
if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) {
clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID))
return DiscUnexpectedIdentity
}
c.caps, c.name = phs.Caps, phs.Name
err = srv.checkpoint(c, srv.checkpointAddPeer) // 状态校验
if err != nil {
clog.Trace("Rejected peer", "err", err)
return err
}
return nil
}
秘钥握手通过deEncHandshake函数实现,在函数之中调用了Handshake()函数:
// filedir:go-ethereum-1.10.2\p2p\transport.go L123
func (t *rlpxTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
t.conn.SetDeadline(time.Now().Add(handshakeTimeout))
return t.conn.Handshake(prv)
}
Handshake代码如下所示,在这里会根据是主动握手还是被动握手来进行执行对应的握手逻辑:
// filedir:go-ethereum-1.10.2\p2p\rlpx\rlpx.go L253
func (c *Conn) Handshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
var (
sec Secrets
err error
)
if c.dialDest != nil { //主动握手
sec, err = initiatorEncHandshake(c.conn, prv, c.dialDest) //主动发起秘钥验证握手结束,确定共享秘钥
} else { // 被动握手
sec, err = receiverEncHandshake(c.conn, prv)
}
if err != nil {
return nil, err
}
c.InitWithSecrets(sec)
return sec.remote, err
}
主动发起握手过程过程如下,在这里会调用makeAuthMsg来生成Auth身份信息,包含签名,随机nonce生成的与签名对应的公钥和版本号,之后调用sealEIP8方法进行rlpx编码,之后发起加密握手,之后接收返回的authResp消息,并验证解密,获取对方公钥,之后生成AES,MAC:
// filedir:go-ethereum-1.10.2\p2p\rlpx\rlpx.go L477
func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s Secrets, err error) {
h := &encHandshake{initiator: true, remote: ecies.ImportECDSAPublic(remote)}
authMsg, err := h.makeAuthMsg(prv)
if err != nil {
return s, err
}
authPacket, err := sealEIP8(authMsg, h)
if err != nil {
return s, err
}
if _, err = conn.Write(authPacket); err != nil {
return s, err
}
authRespMsg := new(authRespV4)
authRespPacket, err := readHandshakeMsg(authRespMsg, encAuthRespLen, prv, conn)
if err != nil {
return s, err
}
if err := h.handleAuthResp(authRespMsg); err != nil {
return s, err
}
return h.secrets(authPacket, authRespPacket)
}
receiverEncHandshake如下所示,和initiatorEncHandshake相差无几:
func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s Secrets, err error) {
authMsg := new(authMsgV4)
authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn)
if err != nil {
return s, err
}
h := new(encHandshake)
if err := h.handleAuthMsg(authMsg, prv); err != nil {
return s, err
}
authRespMsg, err := h.makeAuthResp()
if err != nil {
return s, err
}
var authRespPacket []byte
if authMsg.gotPlain {
authRespPacket, err = authRespMsg.sealPlain(h)
} else {
authRespPacket, err = sealEIP8(authRespMsg, h)
}
if err != nil {
return s, err
}
if _, err = conn.Write(authRespPacket); err != nil {
return s, err
}
return h.secrets(authPacket, authRespPacket)
}
之后通过doProtoHandshake来完成协议握手操作,在这里调用send发送一次握手操作,之后通过readProtocolHandshake来读取返回信息,之后进行检查:
func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {
werr := make(chan error, 1)
go func() { werr <- Send(t, handshakeMsg, our) }()
if their, err = readProtocolHandshake(t); err != nil {
<-werr // make sure the write terminates too
return nil, err
}
if err := <-werr; err != nil {
return nil, fmt.Errorf("write error: %v", err)
}
// If the protocol version supports Snappy encoding, upgrade immediately
t.conn.SetSnappy(their.Version >= snappyProtocolVersion)
return their, nil
}
run函数是服务的主循环,监听服务器终止、增加信任节点、移除信任节点、增加检查节点等:
func (srv *Server) run() {
srv.log.Info("Started P2P networking", "self", srv.localnode.Node().URLv4())
defer srv.loopWG.Done()
defer srv.nodedb.Close()
defer srv.discmix.Close()
defer srv.dialsched.stop()
var (
peers = make(map[enode.ID]*Peer)
inboundCount = 0
trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
)
for _, n := range srv.TrustedNodes {
trusted[n.ID()] = true
}
running:
for {
select {
case <-srv.quit:
break running
case n := <-srv.addtrusted:
srv.log.Trace("Adding trusted node", "node", n)
trusted[n.ID()] = true
if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, true)
}
case n := <-srv.removetrusted:
srv.log.Trace("Removing trusted node", "node", n)
delete(trusted, n.ID())
if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, false)
}
case op := <-srv.peerOp:
op(peers)
srv.peerOpDone <- struct{}{}
case c := <-srv.checkpointPostHandshake:
if trusted[c.node.ID()] {
c.flags |= trustedConn
}
c.cont <- srv.postHandshakeChecks(peers, inboundCount, c)
case c := <-srv.checkpointAddPeer:
err := srv.addPeerChecks(peers, inboundCount, c)
if err == nil {
p := srv.launchPeer(c)
peers[c.node.ID()] = p
srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
srv.dialsched.peerAdded(c)
if p.Inbound() {
inboundCount++
}
}
c.cont <- err
case pd := <-srv.delpeer:
// A peer disconnected.
d := common.PrettyDuration(mclock.Now() - pd.created)
delete(peers, pd.ID())
srv.log.Debug("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err)
srv.dialsched.peerRemoved(pd.rw)
if pd.Inbound() {
inboundCount--
}
}
}
srv.log.Trace("P2P networking is spinning down")
if srv.ntab != nil {
srv.ntab.Close()
}
if srv.DiscV5 != nil {
srv.DiscV5.Close()
}
for _, p := range peers {
p.Disconnect(DiscQuitting)
}
for len(peers) > 0 {
p := <-srv.delpeer
p.log.Trace("<-delpeer (spindown)")
delete(peers, p.ID())
}
}
NodeInfo用于查看节点信息,PeersInfo用于查看连接的节点信息:
// NodeInfo gathers and returns a collection of metadata known about the host.
func (srv *Server) NodeInfo() *NodeInfo {
node := srv.Self()
info := &NodeInfo{
Name: srv.Name,
Enode: node.URLv4(),
ID: node.ID().String(),
IP: node.IP().String(),
ListenAddr: srv.ListenAddr,
Protocols: make(map[string]interface{}),
}
info.Ports.Discovery = node.UDP()
info.Ports.Listener = node.TCP()
info.ENR = node.String()
for _, proto := range srv.Protocols {
if _, ok := info.Protocols[proto.Name]; !ok {
nodeInfo := interface{}("unknown")
if query := proto.NodeInfo; query != nil {
nodeInfo = proto.NodeInfo()
}
info.Protocols[proto.Name] = nodeInfo
}
}
return info
}
func (srv *Server) PeersInfo() []*PeerInfo {
// Gather all the generic and sub-protocol specific infos
infos := make([]*PeerInfo, 0, srv.PeerCount())
for _, peer := range srv.Peers() {
if peer != nil {
infos = append(infos, peer.Info())
}
}
// Sort the result array alphabetically by node identifier
for i := 0; i < len(infos); i++ {
for j := i + 1; j < len(infos); j++ {
if infos[i].ID > infos[j].ID {
infos[i], infos[j] = infos[j], infos[i]
}
}
}
return infos
}
下面为peer.run函数的代码:
func (p *Peer) run() (remoteRequested bool, err error) {
var (
writeStart = make(chan struct{}, 1)
writeErr = make(chan error, 1)
readErr = make(chan error, 1)
reason DiscReason // sent to the peer
)
p.wg.Add(2)
go p.readLoop(readErr)
go p.pingLoop()
// Start all protocol handlers.
writeStart <- struct{}{}
p.startProtocols(writeStart, writeErr)
// Wait for an error or disconnect.
loop:
for {
select {
case err = <-writeErr:
// A write finished. Allow the next write to start if
// there was no error.
if err != nil {
reason = DiscNetworkError
break loop
}
writeStart <- struct{}{}
case err = <-readErr:
if r, ok := err.(DiscReason); ok {
remoteRequested = true
reason = r
} else {
reason = DiscNetworkError
}
break loop
case err = <-p.protoErr:
reason = discReasonForError(err)
break loop
case err = <-p.disc:
reason = discReasonForError(err)
break loop
}
}
close(p.closed)
p.rw.close(reason)
p.wg.Wait()
return remoteRequested, err
}
从上述代码中可以看到函数的开头首先定义了一些局部变量,之后启用了两个协程,一个是readLoop,它通过调用ReadMsg()读取msg,之后又通过调用peer.handle(msg)来处理msg
如果msg是pingMsg,则发送一个pong回应,如果msg与下述特殊情况不相匹配则将msg交给proto.in通道,等待protocolManager.handleMsg()从通道中取出。另一个协程是pingLoop,它主要通过调用SendItems(p.rw, pingMsg)来发起ping请求
之后调用starProtocols()函数让协议运行起来:
func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
p.wg.Add(len(p.running))
for _, proto := range p.running {
proto := proto
proto.closed = p.closed
proto.wstart = writeStart
proto.werr = writeErr
var rw MsgReadWriter = proto
if p.events != nil {
rw = newMsgEventer(rw, p.events, p.ID(), proto.Name, p.Info().Network.RemoteAddress, p.Info().Network.LocalAddress)
}
p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
go func() {
defer p.wg.Done()
err := proto.Run(p, rw)
if err == nil {
p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
err = errProtocolReturned
} else if err != io.EOF {
p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
}
p.protoErr <- err
}()
}
}
最后通过一个loop循环来处理错误或者断开连接等操作:
// Wait for an error or disconnect.
loop:
for {
select {
case err = <-writeErr:
// A write finished. Allow the next write to start if
// there was no error.
if err != nil {
reason = DiscNetworkError
break loop
}
writeStart <- struct{}{}
case err = <-readErr:
if r, ok := err.(DiscReason); ok {
remoteRequested = true
reason = r
} else {
reason = DiscNetworkError
}
break loop
case err = <-p.protoErr:
reason = discReasonForError(err)
break loop
case err = <-p.disc:
reason = discReasonForError(err)
break loop
}
}
close(p.closed)
p.rw.close(reason)
p.wg.Wait()
return remoteRequested, err创数据库
newPersistentDB函数用于创建一个持久化的数据库用于存储节点信息:
// filedir:go-ethereum-1.10.2\p2p\enode\nodedb.go L95
// newPersistentNodeDB creates/opens a leveldb backed persistent node database,
// also flushing its contents in case of a version mismatch.
func newPersistentDB(path string) (*DB, error) {
opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, opts)
if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
db, err = leveldb.RecoverFile(path, nil)
}
if err != nil {
return nil, err
}
currentVer := make([]byte, binary.MaxVarintLen64)
currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]
blob, err := db.Get([]byte(dbVersionKey), nil)
switch err {
case leveldb.ErrNotFound:
// Version not found (i.e. empty cache), insert it
if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
db.Close()
return nil, err
}
case nil:
// Version present, flush if different
if !bytes.Equal(blob, currentVer) {
db.Close()
if err = os.RemoveAll(path); err != nil {
return nil, err
}
return newPersistentDB(path)
}
}
return &DB{lvl: db, quit: make(chan struct{})}, nil
}
ensureExpirer函数用于检查节点是否超时,具体实现代码如下所示:
func (db *DB) ensureExpirer() {
db.runner.Do(func() { go db.expirer() })
}
func (db *DB) expirer() {
tick := time.NewTicker(dbCleanupCycle)
defer tick.Stop()
for {
select {
case <-tick.C:
db.expireNodes()
case <-db.quit:
return
}
}
}
func (db *DB) expireNodes() {
it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)
defer it.Release()
if !it.Next() {
return
}
var (
threshold = time.Now().Add(-dbNodeExpiration).Unix()
youngestPong int64
atEnd = false
)
for !atEnd {
id, ip, field := splitNodeItemKey(it.Key())
if field == dbNodePong {
time, _ := binary.Varint(it.Value())
if time > youngestPong {
youngestPong = time
}
if time < threshold {
// Last pong from this IP older than threshold, remove fields belonging to it.
deleteRange(db.lvl, nodeItemKey(id, ip, ""))
}
}
atEnd = !it.Next()
nextID, _ := splitNodeKey(it.Key())
if atEnd || nextID != id {
if youngestPong > 0 && youngestPong < threshold {
deleteRange(db.lvl, nodeKey(id))
}
youngestPong = 0
}
}
}
下面是一些状态更新函数:
// LastPingReceived retrieves the time of the last ping packet received from
// a remote node.
func (db *DB) LastPingReceived(id ID, ip net.IP) time.Time {
if ip = ip.To16(); ip == nil {
return time.Time{}
}
return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePing)), 0)
}
// UpdateLastPingReceived updates the last time we tried contacting a remote node.
func (db *DB) UpdateLastPingReceived(id ID, ip net.IP, instance time.Time) error {
if ip = ip.To16(); ip == nil {
return errInvalidIP
}
return db.storeInt64(nodeItemKey(id, ip, dbNodePing), instance.Unix())
}
// LastPongReceived retrieves the time of the last successful pong from remote node.
func (db *DB) LastPongReceived(id ID, ip net.IP) time.Time {
if ip = ip.To16(); ip == nil {
return time.Time{}
}
// Launch expirer
db.ensureExpirer()
return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePong)), 0)
}
// UpdateLastPongReceived updates the last pong time of a node.
func (db *DB) UpdateLastPongReceived(id ID, ip net.IP, instance time.Time) error {
if ip = ip.To16(); ip == nil {
return errInvalidIP
}
return db.storeInt64(nodeItemKey(id, ip, dbNodePong), instance.Unix())
}
// FindFails retrieves the number of findnode failures since bonding.
func (db *DB) FindFails(id ID, ip net.IP) int {
if ip = ip.To16(); ip == nil {
return 0
}
return int(db.fetchInt64(nodeItemKey(id, ip, dbNodeFindFails)))
}
// UpdateFindFails updates the number of findnode failures since bonding.
func (db *DB) UpdateFindFails(id ID, ip net.IP, fails int) error {
if ip = ip.To16(); ip == nil {
return errInvalidIP
}
return db.storeInt64(nodeItemKey(id, ip, dbNodeFindFails), int64(fails))
}
// FindFailsV5 retrieves the discv5 findnode failure counter.
func (db *DB) FindFailsV5(id ID, ip net.IP) int {
if ip = ip.To16(); ip == nil {
return 0
}
return int(db.fetchInt64(v5Key(id, ip, dbNodeFindFails)))
}
// UpdateFindFailsV5 stores the discv5 findnode failure counter.
func (db *DB) UpdateFindFailsV5(id ID, ip net.IP, fails int) error {
if ip = ip.To16(); ip == nil {
return errInvalidIP
}
return db.storeInt64(v5Key(id, ip, dbNodeFindFails), int64(fails))
}
QuerySeeds函数用于从数据库里面随机挑选合适种子节点:
// QuerySeeds retrieves random nodes to be used as potential seed nodes
// for bootstrapping.
func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
var (
now = time.Now()
nodes = make([]*Node, 0, n)
it = db.lvl.NewIterator(nil, nil)
id ID
)
defer it.Release()
seek:
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
ctr := id[0]
rand.Read(id[:])
id[0] = ctr + id[0]%16
it.Seek(nodeKey(id))
n := nextNode(it)
if n == nil {
id[0] = 0
continue seek // iterator exhausted
}
if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge {
continue seek
}
for i := range nodes {
if nodes[i].ID() == n.ID() {
continue seek // duplicate
}
}
nodes = append(nodes, n)
}
return nodes
}
func nextNode(it iterator.Iterator) *Node {
for end := false; !end; end = !it.Next() {
id, rest := splitNodeKey(it.Key())
if string(rest) != dbDiscoverRoot {
continue
}
return mustDecodeNode(id[:], it.Value())
}
return nil
}
P2P网络是区块链分布式网络结构的基础,本篇文章详细介绍了P2P网络的基本原理,包括节点发现机制、分布式哈希表、节点查找、节点新增、节点移除、请求处理等,同时从源码角度对以太坊源码中P2P网络的实现做了较为细致的分析,探索了以太坊P2P网络的工作流程以以及安全设计,而公链安全体系的建设依旧是长路漫漫,有待进一步深入探索
实验室官网:www.knownseclab.com
知道创宇唯一指定存证平台:www.attest.im
联系我们:[email protected]
知道创宇区块链安全实验室导航
微信公众号
@ 创宇区块链安全实验室
官方网站
@ 知道创宇区块链安全实验室
微博
@ 知道创宇区块链实验室
https://weibo.com/BlockchainLab
知乎
@ 知道创宇区块链安全实验室
https://www.zhihu.com/org/zhi-dao-chuang-yu-qu-kuai-lian-an-quan-shi-yan-shi
@KS_Blockchain
https://twitter.com/KSBlockchain