知道创宇区块链安全实验室 | 深入理解以太坊 P2P 网络设计

知道创宇区块链安全实验室
企业专栏
热度: 31240

作为区块链分布式网络结构的基础,以太坊P2P网络是如何发展的呢?

文章前言

在设计公链时,节点与节点之间建立连接需要 P2P 协议,从而实现数据的同步,于此同时上层应用还需要封装一些通信逻辑,比如节点之间的区块同步、交易数据同步等。

本篇文章将对 P2P 网络发展进行简单概述,同时将从源码角度对以太坊中的节点发现机制、分布式哈希表、节点查找、节点新增、节点移除等进行简单介绍,并对其 P2P 网络安全性设计进行简要分析

基础知识


P2P网络

P2P网络不同于传统的 CS 结构,在 P2P 网络中每个节点既可以是客户端也可以是服务端,节点之间的通信协议一般直接通过 Socket 实现,P2P技术发展至今经历了以下四个发展阶段:

  • 集中式:是P2P网络模式中最简单的路由方式,即存在一个中心节点,它保存了其他所有节点的索引信息,索引信息一般包括节点IP、端口、节点资源等,集中式路由的优点是结构简单、实现容易,但缺点也很明显,由于中心节点需要存储所有节点的路由信息,当节点规模不断扩展时,就很容易出现性能瓶颈,而且也存在单节点故障问题。


  • 分布式:是指移除了中心节点,在 P2P 节点之间建立随机网络,在新加入节点与 P2P 网络中的某个节点之间随机建立连接通道,从而形成一个随机拓扑结构,新节点加入该网络时随机选择一个已经存在的节点并建立邻居关系,在节点与邻居节点建立连接后,还需要进行全网广播,让整个网络知道该节点的存在。具体的广播步骤是:该节点首先向邻居节点广播,邻居节点收到广播消息后,在继续向自己的邻居节点广播,以此类推,这种广播方式也被称之为"泛洪机制",而泛洪机制的问题在于可控性差,其主要包括两个较大的问题:一个是容易形成泛洪循环,比如节点A给发出的消息结果节点 B 到节点C,节点C再广播到节点 A,这就形成了一个循环,另一个问题是消息响应分包,比如节点 A 想要请求的资源被很多节点所拥有,那么在短时间内,会出现大量节点同时向 A 节点发送响应消息,这就很可能让节点 A 瞬间奔溃。而消除泛洪循环的方法可以借鉴IP网络路由协议中有关泛洪广播的控制,一种方法是对每一个查询消息设置 TTL 值,泛洪消息每被转发一次,TTL 值减1,当节点接受的TTL 为0时,不再转发消息,这样可以避免查询消息在网络中产生死循环,还可以为泛洪消息设置唯一的标志,对接收到的重复消息不再进行转发从而规避死循环,解决响应风暴的方法可以在数据链路层进行网络分段,减少消息跨段广播。


  • 混合式:混合式其实就是混合集中式和分布式结构,网络中存在很多超级节点组成的分布式网络,而每个超级节点有多个普通节点与它组成局部集中网络,一个新的普通节点加入是可以先选择一个超级节点进行通信,该超级节点再推送其他超级节点列表给新加入节点,加入节点根据列表中的超级节点状态决定选择那个具体的超级节点作为父节点,这种结构的泛洪广播只是发生在超级节点之间,因此可以避免大规模泛洪问题,在实际应用中,混合式结构是相对灵活且比较有效的组网架构,实现难度也相对较小,因此目前较多系统基于混合式结构进行开发实现。


  • 结构化:结构化 P2P 网络是一种分布式网络结构,与上面所讲的分布式结构不同,分布式网络就是一个随机网络,而结构化网络则将所有节点按照某种结构进行有序组织,比如形成一个环状网络或树状网络,结构化网络在具体实现上普遍基于分布式哈希表( Distributed Hash Table,DHI )算法,具体的实现方案有 Chord、Pasty、CAN、Kademlia 等算法


     四种网络结构对比如下:

网络


节点发现

节点发现是任何节点接入 P2P 网络的第一步,节点发现可以分为两种:

  • 初始节点发现:指节点是一个全新的、从未运行的节点,该节点没有网络中的其他节点的任何数据,此时节点发现只能依靠节点中的硬编码的种子节点获得P2P 网络的信息
  • 已知节点发现:节点之前运行过,节点数据库中保存着网络中的其他节点信息,此时节点发现可以依靠节点数据库汇总的节点获取 P2P 网络的信息,从而构建自己的网络拓扑


种子节点

在 P2P 网络中,初始节点在启动时会通过一些长期稳定运行的节点快速发现网络中的其他节点,这些节点被称为"种子节点"(一般代码中会硬编码种子节点信息),一般情况下种子节点可以分为两种:

  • DNS-Seed:也被称之为" DNS 种子节点",DNS是互联网提供的一种域名查询服务,它将域名和 IP 地址相互映射保存在一个分布式的数据库中,当我们访问 DNS 服务器时,给它提供一个域名,DNS服务器会将该域名对应的 IP 地址返回 
  • IP-Seed:即将种子节点的 IP 地址硬编码到代码中去,硬编码的这些节点的地址被称为种子节点


KDA算法

Kademlia 是一种分布式哈希表( DHT )技术,与其他 DHT 技术相比,KDA 算法使用异或算法计算节点之间的距离,进而建立了全新的 DHT 拓扑结构,这种算法可以极大地提高路由的查询速度。

HashTable

哈希表是用于存储键值对的一种容器,键值对有被称为 Key/Value 对,哈希表数据结构中包含 N 个 bucket (桶),对于某个具体的哈希表,N (桶的数量)通常是固定不变的,于是可以对每个桶编号,0~N-1,桶是用来存储键值对的,可以简单的将其理解为一个动态数组,里面存放多个键值对。

下图展示了哈希表的查找原理,我们可以方便快速地通过 Key 来获取 value,当使用某个 key 进行查找时,先用某个哈希函数计算这个key 的哈希值,得到的哈希值通常是一个整数,之后使用哈希值对 N (桶数)进行取模运算(除法求余数),就可以算出对应的桶编号。

网络

HashCollision

说到哈希表不得不提一下哈希表碰撞,当两个不同的 Key 进行哈希计算得到相同的哈希值时,就是所谓的哈希函数碰撞,一旦出现这种情况,这两个 key 对应的两个键值对就会被存在在同一个桶中( bucket )中,另一中散列碰撞是虽然计算出来的哈希值不同,但经过取模运算之后得到相同的桶编号,这时候也会将两个键值对存储在一个桶中,哈希碰撞原理如下图所示:

网络

如果某个哈希表在存储数据时完全没有碰撞,那么每个桶里都只有0个或1个键值对,这样查找起来就非常快,反之,如果某个哈希表在存储数据时出现严重碰撞,那么就会导致某些桶里存储了很多键值对,那么在查找key的时候需要在这个桶里面逐一对比key是否相同,查找效率会变得很低~

分布式哈希表

分布式哈希表在概念上类似于传统的哈希表,差异在于传统的哈希表主要用于单机上的某个软件中,分布式哈希表主要用于分布式系统(此时,分布式系统的节点可以通俗的理解为 hash 表中的 bucket ),分布式哈希表主要用于存储大量(甚至海量)的数据,分布式哈希表的原理如下图所示:

网络

源码分析

以太坊底层的P2PServer大致可以分为以下三层:

  • 顶层:以太坊中各个协议的具体实现
  • 中层:以太坊中的p2p通信链路层,负责启动监听、处理新加入连接或维护连接
  • 底层:以太坊中的数据通信网络IO层,主要负责路由表的管理以及数据库的读写操作


网络

表的结构

表数据结构如下所示:

// filedir:go-ethereum-1.10.2\p2p\discover\table.go L40

const (

  alpha       = 3  // Kademlia concurrency factor

  bucketSize    = 16 // Kademlia bucket size

  maxReplacements = 10 // Size of per-bucket replacement list

  // We keep buckets for the upper 1/15 of distances because

  // it's very unlikely we'll ever encounter a node that's closer.

  hashBits      = len(common.Hash{}) * 8

  nBuckets      = hashBits / 15     // Number of buckets

  bucketMinDistance = hashBits - nBuckets // Log distance of closest bucket

  // IP address limits.

  bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24

  tableIPLimit, tableSubnet   = 10, 24

  refreshInterval   = 30 * time.Minute

  revalidateInterval = 10 * time.Second

  copyNodesInterval  = 30 * time.Second

  seedMinTableTime   = 5 * time.Minute

  seedCount      = 30

  seedMaxAge      = 5 * 24 * time.Hour

)

type Table struct {

  mutex   sync.Mutex     // protects buckets, bucket content, nursery, rand

  buckets [nBuckets]*bucket // index of known nodes by distance

  nursery []*node       // bootstrap nodes

  rand   *mrand.Rand     // source of randomness, periodically reseeded

  ips    netutil.DistinctNetSet

  log     log.Logger

  db      *enode.DB // database of known nodes

  net     transport

  refreshReq chan chan struct{}

  initDone   chan struct{}

  closeReq   chan struct{}

  closed    chan struct{}

  nodeAddedHook func(*node) // for testing

}

type bucket struct {

  entries   []*node // live entries, sorted by time of last contact

  replacements []*node // recently seen nodes to be used if revalidation fails

  ips      netutil.DistinctNetSet

}

关键的几个变量

  • buckets:K桶,每个K桶包含节点(依据最近活跃情况进行降序排列),用于按距离列出已知节点索引
  • nursery:种子节点,一个节点启动的时候最多能够链接35个种子节点,其中有五个是以太坊官方指定的,另外30个从数据库里面提取
  • db:用于存储P2P节点的数据库(以太坊中有两个,另一个用于存储链上数据)
  • refreshReq:刷新K-桶事件的管道


表的创建

newTable函数用于创建新的表:

// filedir:go-ethereum-1.10.2\p2p\discover\table.go L102

func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger) (*Table, error) {

  tab := &Table{

    net:    t,

    db:     db,

    refreshReq: make(chan chan struct{}),

    initDone:  make(chan struct{}),

    closeReq:  make(chan struct{}),

    closed:   make(chan struct{}),

    rand:    mrand.New(mrand.NewSource(0)),

    ips:    netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},

    log:    log,

 }

  if err := tab.setFallbackNodes(bootnodes); err != nil {

    return nil, err

 }

  for i := range tab.buckets {

    tab.buckets[i] = &bucket{

      ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},

   }

 }

  tab.seedRand()

  tab.loadSeedNodes()

  return tab, nil

}


在上述代码中首先使用传入的参数初始化了一个Table的对象tab,调用setFallbackNodes函数设置初始链接节点(即获得5个nursey节点,后面如果table为空或者数据库中没有节点信息时这些节点将被用于去链接网络),之后通过一个for循环结合函数ValidateComplete来验证节点是否有效。


之后初始化K桶:

  for i := range tab.buckets {

    tab.buckets[i] = &bucket{

      ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},

   }

 }


之后从table.buckets中随机取30个节点加载种子节点到相应的bucket:

  tab.seedRand()

  tab.loadSeedNodes()

  return tab, nil


loadSeedNodes函数的具体实现如下所示(这里的seedCount为table.go中最上方定义的全局变量,值为30):

// filedir: go-ethereum-1.10.2\p2p\discover\table.go L302

func (tab *Table) loadSeedNodes() {

  seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))

  seeds = append(seeds, tab.nursery...)

  for i := range seeds {

    seed := seeds[i]

    age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}

    tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)

    tab.addSeenNode(seed)

 }

}


这里的addSeenNode即用于添加节点到bucket,在这里会检查要添加的节点是否已经存在以及bucket是否已满,如果已满则调用tab.addReplacement(b, n)将节点添加到replacement列表中去,之后添加IP,之后更新bucket:

// filedir: go-ethereum-1.10.2\p2p\discover\table.go L458

func (tab *Table) addSeenNode(n *node) {

  if n.ID() == tab.self().ID() {

    return

 }

  tab.mutex.Lock()

  defer tab.mutex.Unlock()

  b := tab.bucket(n.ID())

  if contains(b.entries, n.ID()) {

    // Already in bucket, don't add.

    return

 }

  if len(b.entries) >= bucketSize {

    // Bucket full, maybe add as replacement.

    tab.addReplacement(b, n)

    return

 }

  if !tab.addIP(b, n.IP()) {

    // Can't add: IP limit reached.

    return

 }

  // Add to end of bucket:

  b.entries = append(b.entries, n)

  b.replacements = deleteNode(b.replacements, n)

  n.addedAt = time.Now()

  if tab.nodeAddedHook != nil {

    tab.nodeAddedHook(n)

 }

}


事件监听

loop函数是table.go中的主循环,在函数开头出定义了后续会使用到的局部变量,之后通过deRefresh进行刷新桶操作,在这里的loop循环会每隔30分钟自动刷新一次K桶,每隔10秒钟验证K桶节点是否可以ping通,每30秒将K桶中存在超过5分钟的节点存储本地数据库,视作稳定节点:

// filedir: go-ethereum-1.10.2\p2p\discover\table.go L55

  refreshInterval   = 30 * time.Minute

  revalidateInterval = 10 * time.Second

  copyNodesInterval  = 30 * time.Second

// filedir: go-ethereum-1.10.2\p2p\discover\table.go L218

// loop schedules runs of doRefresh, doRevalidate and copyLiveNodes.

func (tab *Table) loop() {

  var (

    revalidate    = time.NewTimer(tab.nextRevalidateTime())

    refresh     = time.NewTicker(refreshInterval)

    copyNodes    = time.NewTicker(copyNodesInterval)

    refreshDone   = make(chan struct{})      // where doRefresh reports completion

    revalidateDone chan struct{}          // where doRevalidate reports completion

    waiting     = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs

 )

  defer refresh.Stop()

  defer revalidate.Stop()

  defer copyNodes.Stop()

  // Start initial refresh.

  go tab.doRefresh(refreshDone)

loop:

  for {

    select {

    case <-refresh.C:      //定时刷新k桶事件,refreshInterval=30 min

      tab.seedRand()

      if refreshDone == nil {

        refreshDone = make(chan struct{})

        go tab.doRefresh(refreshDone)

     }

    case req := <-tab.refreshReq:  //刷新k桶的请求事件

      waiting = append(waiting, req)

      if refreshDone == nil {

        refreshDone = make(chan struct{})

        go tab.doRefresh(refreshDone)

     }

    case <-refreshDone:     // doRefresh完成

      for _, ch := range waiting {

        close(ch)

     }

      waiting, refreshDone = nil, nil

    case <-revalidate.C:    // 验证k桶节点有效性,10 second

      revalidateDone = make(chan struct{})

      go tab.doRevalidate(revalidateDone)

    case <-revalidateDone:  // 验证K桶节点有效性完成

      revalidate.Reset(tab.nextRevalidateTime())

      revalidateDone = nil

      case <-copyNodes.C:  // 定时(30秒)将节点存入数据库,如果某个节点在k桶中存在超过5分钟,则认为它是一个稳定的节点

      go tab.copyLiveNodes()

    case <-tab.closeReq:

      break loop

   }

 }

  if refreshDone != nil {

    <-refreshDone

 }

  for _, ch := range waiting {

    close(ch)

 }

  if revalidateDone != nil {

    <-revalidateDone

 }

  close(tab.closed)

}


节点查找

getNode函数用于根据ID来查找节点,如果不存在则返回nil:

// getNode returns the node with the given ID or nil if it isn't in the table.

func (tab *Table) getNode(id enode.ID) *enode.Node {

  tab.mutex.Lock()

  defer tab.mutex.Unlock()

  b := tab.bucket(id)

  for _, e := range b.entries {

    if e.ID() == id {

      return unwrapNode(e)

   }

 }

  return nil

}


节点发现

getNode函数用于根据ID来查找节点,如果不存在则返回nil:

// getNode returns the node with the given ID or nil if it isn't in the table.

func (tab *Table) getNode(id enode.ID) *enode.Node {

  tab.mutex.Lock()

  defer tab.mutex.Unlock()

  b := tab.bucket(id)

  for _, e := range b.entries {

    if e.ID() == id {

      return unwrapNode(e)

   }

 }

  return nil

}


节点发现

以太坊分布式网络采用了结构化网络模型,其实现方案使用Kademlia协议,下面我们对节点发现进行简单介绍,在以太坊中k值是16,也就是说每个k桶包含16个节点,一共256个k桶,K桶中记录节点的NodeId,Distance,Endpoint,IP等信息,并按照与Target节点的距离排序,节点查找由doRefresh()实现:

// filedir:go-ethereum-1.10.2\p2p\discover\table.go L278

func (tab *Table) doRefresh(done chan struct{}) {

  defer close(done)

  tab.loadSeedNodes()

  // Run self lookup to discover new neighbor nodes.

  tab.net.lookupSelf()

  for i := 0; i < 3; i++ {

    tab.net.lookupRandom()

 }

}


从上述代码中可以看到这里首先调用tab.loadSeedNodes()从数据库中加载节点并将其插入到表中去:

// filedir: go-ethereum-1.10.2\p2p\discover\table.go L302

func (tab *Table) loadSeedNodes() {

  seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))

  seeds = append(seeds, tab.nursery...)

  for i := range seeds {

    seed := seeds[i]

    age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}

    tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)

    tab.addSeenNode(seed)

 }

}

// filedir:go-ethereum-1.10.2\p2p\enode\nodedb.go L440

// QuerySeeds retrieves random nodes to be used as potential seed nodes

// for bootstrapping.

func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {

  var (

    now   = time.Now()

    nodes = make([]*Node, 0, n)

    it   = db.lvl.NewIterator(nil, nil)

    id   ID

 )

  defer it.Release()

seek:

  for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {

    ctr := id[0]

    rand.Read(id[:])

    id[0] = ctr + id[0]%16

    it.Seek(nodeKey(id))

    n := nextNode(it)

    if n == nil {

      id[0] = 0

      continue seek // iterator exhausted

   }

    if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge {

      continue seek

   }

    for i := range nodes {

      if nodes[i].ID() == n.ID() {

        continue seek // duplicate

     }

   }

    nodes = append(nodes, n)

 }

  return nodes

}


之后通过lookupSelf来发现新的节点,这里会优先使用当前节点的ID来运行newLookup发现邻居节点:

// filedir: go-ethereum-1.10.2\p2p\discover\v5_udp.go L281

// lookupSelf looks up our own node ID.

// This is needed to satisfy the transport interface.

func (t *UDPv5) lookupSelf() []*enode.Node {

  return t.newLookup(t.closeCtx, t.Self().ID()).run()

}

// filedir: go-ethereum-1.10.2\p2p\discover\v5_udp.go L292

func (t *UDPv5) newLookup(ctx context.Context, target enode.ID) *lookup {

  return newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) {

    return t.lookupWorker(n, target)

 })

}

func newLookup(ctx context.Context, tab *Table, target enode.ID, q queryFunc) *lookup {

  it := &lookup{

    tab:    tab,

    queryfunc: q,

    asked:   make(map[enode.ID]bool),

    seen:   make(map[enode.ID]bool),

    result:  nodesByDistance{target: target},

    replyCh:  make(chan []*node, alpha),

    cancelCh: ctx.Done(),

    queries:  -1,

 }

  // Don't query further if we hit ourself.

  // Unlikely to happen often in practice.

  it.asked[tab.self().ID()] = true

  return it

}


最后随机一个target,进行lookup:

// filedir: go-ethereum-1.10.2\p2p\discover\v5_udp.go L275

func (t *UDPv5) lookupRandom() []*enode.Node {

  return t.newRandomLookup(t.closeCtx).run()

}

// filedir: go-ethereum-1.10.2\p2p\discover\v5_udp.go L287

func (t *UDPv5) newRandomLookup(ctx context.Context) *lookup {

  var target enode.ID

  crand.Read(target[:])

  return t.newLookup(ctx, target)

}

func (t *UDPv5) newLookup(ctx context.Context, target enode.ID) *lookup {

  return newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) {

    return t.lookupWorker(n, target)

 })

}

// lookupWorker performs FINDNODE calls against a single node during lookup.

func (t *UDPv5) lookupWorker(destNode *node, target enode.ID) ([]*node, error) {

  var (

    dists = lookupDistances(target, destNode.ID())

    nodes = nodesByDistance{target: target}

    err   error

 )

  var r []*enode.Node

  r, err = t.findnode(unwrapNode(destNode), dists)

  if err == errClosed {

    return nil, err

 }

  for _, n := range r {

    if n.ID() != t.Self().ID() {

      nodes.push(wrapNode(n), findnodeResultLimit)

   }

 }

  return nodes.entries, err

}

// filedir:go-ethereum-1.10.2\p2p\discover\v5_udp.go L362

// findnode calls FINDNODE on a node and waits for responses.

func (t *UDPv5) findnode(n *enode.Node, distances []uint) ([]*enode.Node, error) {

  resp := t.call(n, v5wire.NodesMsg, &v5wire.Findnode{Distances: distances})

  return t.waitForNodes(resp, distances)

}

func (t *UDPv5) call(node *enode.Node, responseType byte, packet v5wire.Packet) *callV5 {

  c := &callV5{

    node:     node,

    packet:    packet,

    responseType: responseType,

    reqid:    make([]byte, 8),

    ch:      make(chan v5wire.Packet, 1),

    err:     make(chan error, 1),

 }

  // Assign request ID.

  crand.Read(c.reqid)

  packet.SetRequestID(c.reqid)

  // Send call to dispatch.

  select {

  case t.callCh <- c:

  case <-t.closeCtx.Done():

    c.err <- errClosed

 }

  return c

}


服务结构

server端的的数据结构如下所示:

// filedir:go-ethereum-1.10.2\p2p\server.go L160

// Server manages all peer connections.

type Server struct {

  // Config fields may not be modified while the server is running.

  Config

  // Hooks for testing. These are useful because we can inhibit

  // the whole protocol stack.

  newTransport func(net.Conn, *ecdsa.PublicKey) transport

  newPeerHook  func(*Peer)

  listenFunc   func(network, addr string) (net.Listener, error)

  lock   sync.Mutex // protects running

  running bool

  listener    net.Listener

  ourHandshake *protoHandshake

  loopWG     sync.WaitGroup // loop, listenLoop

  peerFeed    event.Feed

  log      log.Logger

  nodedb   *enode.DB

  localnode *enode.LocalNode

  ntab    *discover.UDPv4

  DiscV5   *discover.UDPv5

  discmix   *enode.FairMix

  dialsched *dialScheduler

  // Channels into the run loop.

  quit           chan struct{}

  addtrusted        chan *enode.Node

  removetrusted       chan *enode.Node

  peerOp          chan peerOpFunc

  peerOpDone        chan struct{}

  delpeer          chan peerDrop

  checkpointPostHandshake chan *conn

  checkpointAddPeer     chan *conn

  // State of run loop and listenLoop.

  inboundHistory expHeap

}


Server配置(本地节点秘钥、拨号比率、节点最大链接数、拨号比率、事件记录等):

// Config holds Server options.

type Config struct {

  PrivateKey *ecdsa.PrivateKey `toml:"-"`

  MaxPeers int

  MaxPendingPeers int `toml:",omitempty"`

  DialRatio int `toml:",omitempty"`

  NoDiscovery bool

  DiscoveryV5 bool `toml:",omitempty"`

  Name string `toml:"-"`

  BootstrapNodes []*enode.Node

  BootstrapNodesV5 []*enode.Node `toml:",omitempty"`

  StaticNodes []*enode.Node

  TrustedNodes []*enode.Node

  NetRestrict *netutil.Netlist `toml:",omitempty"`

  NodeDatabase string `toml:",omitempty"`

  Protocols []Protocol `toml:"-"`

  ListenAddr string

  NAT nat.Interface `toml:",omitempty"`

  Dialer NodeDialer `toml:"-"`

  NoDial bool `toml:",omitempty"`

  EnableMsgEvents bool

  Logger log.Logger `toml:",omitempty"`

  clock mclock.Clock

}


新增节点

AddPeer函数用于新增一个给定的节点,其实现代码如下所示:

// filedir:go-ethereum-1.10.2\p2p\server.go L318

func (srv *Server) AddPeer(node *enode.Node) {

  srv.dialsched.addStatic(node)

}

// filedir:go-ethereum-1.10.2\p2p\dial.go L190

// addStatic adds a static dial candidate.

func (d *dialScheduler) addStatic(n *enode.Node) {

  select {

  case d.addStaticCh <- n:

  case <-d.ctx.Done():

 }

}


AddTrustedPeer函数用于新增一个可信任节点:

// AddTrustedPeer adds the given node to a reserved whitelist which allows the

// node to always connect, even if the slot are full.

func (srv *Server) AddTrustedPeer(node *enode.Node) {

  select {

  case srv.addtrusted <- node:

  case <-srv.quit:

 }

}


移除节点

RemovePeer函数用于移除节点并断开与节点之间的连接:

// filedir:go-ethereum-1.10.2\p2p\server.go L325

func (srv *Server) RemovePeer(node *enode.Node) {

  var (

    ch  chan *PeerEvent

    sub event.Subscription

 )

  // Disconnect the peer on the main loop.

  srv.doPeerOp(func(peers map[enode.ID]*Peer) {

    srv.dialsched.removeStatic(node)

    if peer := peers[node.ID()]; peer != nil {

      ch = make(chan *PeerEvent, 1)

      sub = srv.peerFeed.Subscribe(ch)

      peer.Disconnect(DiscRequested)

   }

 })

  // Wait for the peer connection to end.

  if ch != nil {

    defer sub.Unsubscribe()

    for ev := range ch {

      if ev.Peer == node.ID() && ev.Type == PeerEventTypeDrop {

        return

     }

   }

 }

}

// filedir:go-ethereum-1.10.2\p2p\dial.go L198

// removeStatic removes a static dial candidate.

func (d *dialScheduler) removeStatic(n *enode.Node) {

  select {

  case d.remStaticCh <- n:

  case <-d.ctx.Done():

 }

}


RemoveTrustedPeer函数用于移除一个可信任节点:

// RemoveTrustedPeer removes the given node from the trusted peer set.

func (srv *Server) RemoveTrustedPeer(node *enode.Node) {

  select {

  case srv.removetrusted <- node:

  case <-srv.quit:

 }

}


终止服务

Stop函数用于终止节点运行,具体代码如下所示:

func (srv *Server) Stop() {

  srv.lock.Lock()

  if !srv.running {

    srv.lock.Unlock()

    return

 }

  srv.running = false

  if srv.listener != nil {

    // this unblocks listener Accept

    srv.listener.Close()

 }

  close(srv.quit)

  srv.lock.Unlock()

  srv.loopWG.Wait()

}


服务启动

位于go-ethereum-1.10.2\p2p\server.go中的start函数用于启动一个P2P节点:

// filedir:go-ethereum-1.10.2\p2p\server.go L433

func (srv *Server) Start() (err error) {

  srv.lock.Lock()

  defer srv.lock.Unlock()

  if srv.running {

    return errors.New("server already running")

 }

  srv.running = true

  srv.log = srv.Config.Logger

  if srv.log == nil {

    srv.log = log.Root()

 }

  if srv.clock == nil {

    srv.clock = mclock.System{}

 }

  if srv.NoDial && srv.ListenAddr == "" {

    srv.log.Warn("P2P server will be useless, neither dialing nor listening")

 }

  // static fields

  if srv.PrivateKey == nil {

    return errors.New("Server.PrivateKey must be set to a non-nil key")

 }

  if srv.newTransport == nil {

    srv.newTransport = newRLPX

 }

  if srv.listenFunc == nil {

    srv.listenFunc = net.Listen

 }

  srv.quit = make(chan struct{})

  srv.delpeer = make(chan peerDrop)

  srv.checkpointPostHandshake = make(chan *conn)

  srv.checkpointAddPeer = make(chan *conn)

  srv.addtrusted = make(chan *enode.Node)

  srv.removetrusted = make(chan *enode.Node)

  srv.peerOp = make(chan peerOpFunc)

  srv.peerOpDone = make(chan struct{})

  if err := srv.setupLocalNode(); err != nil {

    return err

 }

  if srv.ListenAddr != "" {

    if err := srv.setupListening(); err != nil {

      return err

   }

 }

  if err := srv.setupDiscovery(); err != nil {

    return err

 }

  srv.setupDialScheduler()

  srv.loopWG.Add(1)

  go srv.run()

  return nil

}


在这里首先检查当前节点是否处于运行状态,如果是则直接返回并给出错误提示信息,如果不是则将srv.running设置为true,之后进入服务启动流程,之后检查log是否开启等,之后初始化配置P2P服务信息:

// Start starts running the server.

// Servers can not be re-used after stopping.

func (srv *Server) Start() (err error) {

  srv.lock.Lock()

  defer srv.lock.Unlock()

  if srv.running {

    return errors.New("server already running")

 }

  srv.running = true

  srv.log = srv.Config.Logger

  if srv.log == nil {

    srv.log = log.Root()

 }

  if srv.clock == nil {

    srv.clock = mclock.System{}

 }

  if srv.NoDial && srv.ListenAddr == "" {

    srv.log.Warn("P2P server will be useless, neither dialing nor listening")

 }

  // static fields

  if srv.PrivateKey == nil {

    return errors.New("Server.PrivateKey must be set to a non-nil key")

 }

  if srv.newTransport == nil {

    srv.newTransport = newRLPX

 }

  if srv.listenFunc == nil {

    srv.listenFunc = net.Listen

 }

  srv.quit = make(chan struct{})

  srv.delpeer = make(chan peerDrop)

  srv.checkpointPostHandshake = make(chan *conn)

  srv.checkpointAddPeer = make(chan *conn)

  srv.addtrusted = make(chan *enode.Node)

  srv.removetrusted = make(chan *enode.Node)

  srv.peerOp = make(chan peerOpFunc)

  srv.peerOpDone = make(chan struct{})


之后调用setupLocalNode来启动一个本地节点,并建立本地监听,然后配置一个DiscoveryV5网络协议,生成节点路由表。

调用setupDialScheduler启动主动拨号连接过程,然后开一个协程,在其中做peer的维护:

  srv.setupDialScheduler()

  srv.loopWG.Add(1)

  go srv.run()

  return nil

}


setupDialScheduler代码如下所示,这里通过newDialScheduler来建立连接,参数discmix确定了进行主动建立连接时的节点集,它是一个迭代器 ,同时将setupConn连接建立函数传入:

func (srv *Server) setupDialScheduler() {

  config := dialConfig{

    self:      srv.localnode.ID(),

    maxDialPeers:  srv.maxDialedConns(),

    maxActiveDials: srv.MaxPendingPeers,

    log:      srv.Logger,

    netRestrict:  srv.NetRestrict,

    dialer:     srv.Dialer,

    clock:     srv.clock,

 }

  if srv.ntab != nil {

    config.resolver = srv.ntab

 }

  if config.dialer == nil {

    config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}

 }

  srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn)

  for _, n := range srv.StaticNodes {

    srv.dialsched.addStatic(n)

 }

}


newDialScheduler函数如下所示,在这里通过d.readNodes(it)从迭代器中取得节点,之后通过通道传入d.loop(it)中进行连接:

// filedir:go-ethereum-1.10.2\p2p\dial.go L162

func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {

  d := &dialScheduler{

    dialConfig: config.withDefaults(),

    setupFunc:  setupFunc,

    dialing:   make(map[enode.ID]*dialTask),

    static:   make(map[enode.ID]*dialTask),

    peers:    make(map[enode.ID]connFlag),

    doneCh:   make(chan *dialTask),

    nodesIn:   make(chan *enode.Node),

    addStaticCh: make(chan *enode.Node),

    remStaticCh: make(chan *enode.Node),

    addPeerCh:  make(chan *conn),

    remPeerCh:  make(chan *conn),

 }

  d.lastStatsLog = d.clock.Now()

  d.ctx, d.cancel = context.WithCancel(context.Background())

  d.wg.Add(2)

  go d.readNodes(it)

  go d.loop(it)

  return d

}


服务监听

在上面的服务启动过程中有一个setupListening函数,该函数用于监听事件,具体代码如下所示:

func (srv *Server) setupListening() error {

  // Launch the listener.

  listener, err := srv.listenFunc("tcp", srv.ListenAddr)

  if err != nil {

    return err

 }

  srv.listener = listener

  srv.ListenAddr = listener.Addr().String()

  // Update the local node record and map the TCP listening port if NAT is configured.

  if tcp, ok := listener.Addr().(*net.TCPAddr); ok {

    srv.localnode.Set(enr.TCP(tcp.Port))

    if !tcp.IP.IsLoopback() && srv.NAT != nil {

      srv.loopWG.Add(1)

      go func() {

        nat.Map(srv.NAT, srv.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")

        srv.loopWG.Done()

     }()

   }

 }

  srv.loopWG.Add(1)

  go srv.listenLoop()

  return nil

}


在上述代码中又调用了一个srv.listenLoop(),该函数是一个死循环的goroutine,它会监听端口并接收外部的请求:

// listenLoop runs in its own goroutine and accepts

// inbound connections.

func (srv *Server) listenLoop() {

  srv.log.Debug("TCP listener up", "addr", srv.listener.Addr())

  // The slots channel limits accepts of new connections.

  tokens := defaultMaxPendingPeers

  if srv.MaxPendingPeers > 0 {

    tokens = srv.MaxPendingPeers

 }

  slots := make(chan struct{}, tokens)

  for i := 0; i < tokens; i++ {

    slots <- struct{}{}

 }

  // Wait for slots to be returned on exit. This ensures all connection goroutines

  // are down before listenLoop returns.

  defer srv.loopWG.Done()

  defer func() {

    for i := 0; i < cap(slots); i++ {

      <-slots

   }

 }()

  for {

    // Wait for a free slot before accepting.

    <-slots

    var (

      fd    net.Conn

      err    error

      lastLog time.Time

   )

    for {

      fd, err = srv.listener.Accept()

      if netutil.IsTemporaryError(err) {

        if time.Since(lastLog) > 1*time.Second {

          srv.log.Debug("Temporary read error", "err", err)

          lastLog = time.Now()

       }

        time.Sleep(time.Millisecond * 200)

        continue

     } else if err != nil {

        srv.log.Debug("Read error", "err", err)

        slots <- struct{}{}

        return

     }

      break

   }

    remoteIP := netutil.AddrIP(fd.RemoteAddr())

    if err := srv.checkInboundConn(remoteIP); err != nil {

      srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)

      fd.Close()

      slots <- struct{}{}

      continue

   }

    if remoteIP != nil {

      var addr *net.TCPAddr

      if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {

        addr = tcp

     }

      fd = newMeteredConn(fd, true, addr)

      srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())

   }

    go func() {

      srv.SetupConn(fd, inboundConn, nil)

      slots <- struct{}{}

   }()

 }

}


这里的SetupConn主要执行执行握手协议,并尝试把链接创建为一个peer对象:

// SetupConn runs the handshakes and attempts to add the connection

// as a peer. It returns when the connection has been added as a peer

// or the handshakes have failed.

func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {

  c := &conn{fd: fd, flags: flags, cont: make(chan error)}

  if dialDest == nil {

    c.transport = srv.newTransport(fd, nil)

 } else {

    c.transport = srv.newTransport(fd, dialDest.Pubkey())

 }

  err := srv.setupConn(c, flags, dialDest)

  if err != nil {

    c.close(err)

 }

  return err

}


在上述代码中又去调用了srv.setupConn(c, flags, dialDest)函数,该函数用于执行握手协议:

func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {

  // Prevent leftover pending conns from entering the handshake.

  srv.lock.Lock()

  running := srv.running

  srv.lock.Unlock()

  if !running {

    return errServerStopped

 }

  // If dialing, figure out the remote public key.

  var dialPubkey *ecdsa.PublicKey

  if dialDest != nil {  // dest=nil 被动连接,dest!=nil主动连接诶

    dialPubkey = new(ecdsa.PublicKey)

    if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {

      err = errors.New("dial destination doesn't have a secp256k1 public key")

      srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)

      return err

   }

 }

  // Run the RLPx handshake.

  remotePubkey, err := c.doEncHandshake(srv.PrivateKey)  // 公钥交换,确定共享秘钥RLPx层面的握手一来一去

  if err != nil {

    srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)

    return err

 }

  if dialDest != nil {

    c.node = dialDest

 } else {

    c.node = nodeFromConn(remotePubkey, c.fd)

 }

  clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)

  err = srv.checkpoint(c, srv.checkpointPostHandshake)

  if err != nil {

    clog.Trace("Rejected peer", "err", err)

    return err

 }

  // Run the capability negotiation handshake.

  phs, err := c.doProtoHandshake(srv.ourHandshake) // 进行协议层面的握手,也即p2p握手,一来一去

  if err != nil {

    clog.Trace("Failed p2p handshake", "err", err)

    return err

 }

  if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) {

    clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID))

    return DiscUnexpectedIdentity

 }

  c.caps, c.name = phs.Caps, phs.Name

  err = srv.checkpoint(c, srv.checkpointAddPeer) // 状态校验

  if err != nil {

    clog.Trace("Rejected peer", "err", err)

    return err

 }

  return nil

}


秘钥握手通过deEncHandshake函数实现,在函数之中调用了Handshake()函数:

// filedir:go-ethereum-1.10.2\p2p\transport.go L123

func (t *rlpxTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {

  t.conn.SetDeadline(time.Now().Add(handshakeTimeout))

  return t.conn.Handshake(prv)

}


Handshake代码如下所示,在这里会根据是主动握手还是被动握手来进行执行对应的握手逻辑:

// filedir:go-ethereum-1.10.2\p2p\rlpx\rlpx.go L253

func (c *Conn) Handshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {

  var (

    sec Secrets

    err error

 )

  if c.dialDest != nil {  //主动握手

    sec, err = initiatorEncHandshake(c.conn, prv, c.dialDest) //主动发起秘钥验证握手结束,确定共享秘钥

 } else { // 被动握手

    sec, err = receiverEncHandshake(c.conn, prv)

 }

  if err != nil {

    return nil, err

 }

  c.InitWithSecrets(sec)

  return sec.remote, err

}


主动发起握手过程过程如下,在这里会调用makeAuthMsg来生成Auth身份信息,包含签名,随机nonce生成的与签名对应的公钥和版本号,之后调用sealEIP8方法进行rlpx编码,之后发起加密握手,之后接收返回的authResp消息,并验证解密,获取对方公钥,之后生成AES,MAC:

// filedir:go-ethereum-1.10.2\p2p\rlpx\rlpx.go L477

func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s Secrets, err error) {

  h := &encHandshake{initiator: true, remote: ecies.ImportECDSAPublic(remote)}

  authMsg, err := h.makeAuthMsg(prv)

  if err != nil {

    return s, err

 }

  authPacket, err := sealEIP8(authMsg, h)

  if err != nil {

    return s, err

 }

  if _, err = conn.Write(authPacket); err != nil {

    return s, err

 }

  authRespMsg := new(authRespV4)

  authRespPacket, err := readHandshakeMsg(authRespMsg, encAuthRespLen, prv, conn)

  if err != nil {

    return s, err

 }

  if err := h.handleAuthResp(authRespMsg); err != nil {

    return s, err

 }

  return h.secrets(authPacket, authRespPacket)

}

receiverEncHandshake如下所示,和initiatorEncHandshake相差无几:

func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s Secrets, err error) {

  authMsg := new(authMsgV4)

  authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn)

  if err != nil {

    return s, err

 }

  h := new(encHandshake)

  if err := h.handleAuthMsg(authMsg, prv); err != nil {

    return s, err

 }

  authRespMsg, err := h.makeAuthResp()

  if err != nil {

    return s, err

 }

  var authRespPacket []byte

  if authMsg.gotPlain {

    authRespPacket, err = authRespMsg.sealPlain(h)

 } else {

    authRespPacket, err = sealEIP8(authRespMsg, h)

 }

  if err != nil {

    return s, err

 }

  if _, err = conn.Write(authRespPacket); err != nil {

    return s, err

 }

  return h.secrets(authPacket, authRespPacket)

}


之后通过doProtoHandshake来完成协议握手操作,在这里调用send发送一次握手操作,之后通过readProtocolHandshake来读取返回信息,之后进行检查:

func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {

  werr := make(chan error, 1)

  go func() { werr <- Send(t, handshakeMsg, our) }()  

  if their, err = readProtocolHandshake(t); err != nil {

    <-werr // make sure the write terminates too

    return nil, err

 }

  if err := <-werr; err != nil {

    return nil, fmt.Errorf("write error: %v", err)

 }

  // If the protocol version supports Snappy encoding, upgrade immediately

  t.conn.SetSnappy(their.Version >= snappyProtocolVersion)

  return their, nil

}


服务循环

run函数是服务的主循环,监听服务器终止、增加信任节点、移除信任节点、增加检查节点等:

func (srv *Server) run() {

  srv.log.Info("Started P2P networking", "self", srv.localnode.Node().URLv4())

  defer srv.loopWG.Done()

  defer srv.nodedb.Close()

  defer srv.discmix.Close()

  defer srv.dialsched.stop()

  var (

    peers     = make(map[enode.ID]*Peer)

    inboundCount = 0

    trusted    = make(map[enode.ID]bool, len(srv.TrustedNodes))

 )

  for _, n := range srv.TrustedNodes {

    trusted[n.ID()] = true

 }

running:

  for {

    select {

    case <-srv.quit:

      break running

    case n := <-srv.addtrusted:

      srv.log.Trace("Adding trusted node", "node", n)

      trusted[n.ID()] = true

      if p, ok := peers[n.ID()]; ok {

        p.rw.set(trustedConn, true)

     }

    case n := <-srv.removetrusted:

      srv.log.Trace("Removing trusted node", "node", n)

      delete(trusted, n.ID())

      if p, ok := peers[n.ID()]; ok {

        p.rw.set(trustedConn, false)

     }

    case op := <-srv.peerOp:

      op(peers)

      srv.peerOpDone <- struct{}{}

    case c := <-srv.checkpointPostHandshake:

      if trusted[c.node.ID()] {

        c.flags |= trustedConn

     }

      c.cont <- srv.postHandshakeChecks(peers, inboundCount, c)

    case c := <-srv.checkpointAddPeer:

      err := srv.addPeerChecks(peers, inboundCount, c)

      if err == nil {

        p := srv.launchPeer(c)

        peers[c.node.ID()] = p

        srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())

        srv.dialsched.peerAdded(c)

        if p.Inbound() {

          inboundCount++

       }

     }

      c.cont <- err

    case pd := <-srv.delpeer:

      // A peer disconnected.

      d := common.PrettyDuration(mclock.Now() - pd.created)

      delete(peers, pd.ID())

      srv.log.Debug("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err)

      srv.dialsched.peerRemoved(pd.rw)

      if pd.Inbound() {

        inboundCount--

     }

   }

 }

  srv.log.Trace("P2P networking is spinning down")

  if srv.ntab != nil {

    srv.ntab.Close()

 }

  if srv.DiscV5 != nil {

    srv.DiscV5.Close()

 }

  for _, p := range peers {

    p.Disconnect(DiscQuitting)

 }

  for len(peers) > 0 {

    p := <-srv.delpeer

    p.log.Trace("<-delpeer (spindown)")

    delete(peers, p.ID())

 }

}


节点信息

NodeInfo用于查看节点信息,PeersInfo用于查看连接的节点信息:

// NodeInfo gathers and returns a collection of metadata known about the host.

func (srv *Server) NodeInfo() *NodeInfo {

  node := srv.Self()

  info := &NodeInfo{

    Name:    srv.Name,

    Enode:   node.URLv4(),

    ID:     node.ID().String(),

    IP:     node.IP().String(),

    ListenAddr: srv.ListenAddr,

    Protocols: make(map[string]interface{}),

 }

  info.Ports.Discovery = node.UDP()

  info.Ports.Listener = node.TCP()

  info.ENR = node.String()

  for _, proto := range srv.Protocols {

    if _, ok := info.Protocols[proto.Name]; !ok {

      nodeInfo := interface{}("unknown")

      if query := proto.NodeInfo; query != nil {

        nodeInfo = proto.NodeInfo()

     }

      info.Protocols[proto.Name] = nodeInfo

   }

 }

  return info

}

func (srv *Server) PeersInfo() []*PeerInfo {

  // Gather all the generic and sub-protocol specific infos

  infos := make([]*PeerInfo, 0, srv.PeerCount())

  for _, peer := range srv.Peers() {

    if peer != nil {

      infos = append(infos, peer.Info())

   }

 }

  // Sort the result array alphabetically by node identifier

  for i := 0; i < len(infos); i++ {

    for j := i + 1; j < len(infos); j++ {

      if infos[i].ID > infos[j].ID {

        infos[i], infos[j] = infos[j], infos[i]

     }

   }

 }

  return infos

}


请求处理

下面为peer.run函数的代码:

func (p *Peer) run() (remoteRequested bool, err error) {

  var (

    writeStart = make(chan struct{}, 1)

    writeErr   = make(chan error, 1)

    readErr   = make(chan error, 1)

    reason    DiscReason // sent to the peer

 )

  p.wg.Add(2)

  go p.readLoop(readErr)

  go p.pingLoop()

  // Start all protocol handlers.

  writeStart <- struct{}{}

  p.startProtocols(writeStart, writeErr)

  // Wait for an error or disconnect.

loop:

  for {

    select {

    case err = <-writeErr:

      // A write finished. Allow the next write to start if

      // there was no error.

      if err != nil {

        reason = DiscNetworkError

        break loop

     }

      writeStart <- struct{}{}

    case err = <-readErr:

      if r, ok := err.(DiscReason); ok {

        remoteRequested = true

        reason = r

     } else {

        reason = DiscNetworkError

     }

      break loop

    case err = <-p.protoErr:

      reason = discReasonForError(err)

      break loop

    case err = <-p.disc:

      reason = discReasonForError(err)

      break loop

   }

 }

  close(p.closed)

  p.rw.close(reason)

  p.wg.Wait()

  return remoteRequested, err

}


从上述代码中可以看到函数的开头首先定义了一些局部变量,之后启用了两个协程,一个是readLoop,它通过调用ReadMsg()读取msg,之后又通过调用peer.handle(msg)来处理msg

如果msg是pingMsg,则发送一个pong回应,如果msg与下述特殊情况不相匹配则将msg交给proto.in通道,等待protocolManager.handleMsg()从通道中取出。另一个协程是pingLoop,它主要通过调用SendItems(p.rw, pingMsg)来发起ping请求

之后调用starProtocols()函数让协议运行起来:

func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {

  p.wg.Add(len(p.running))

  for _, proto := range p.running {

    proto := proto

    proto.closed = p.closed

    proto.wstart = writeStart

    proto.werr = writeErr

    var rw MsgReadWriter = proto

    if p.events != nil {

      rw = newMsgEventer(rw, p.events, p.ID(), proto.Name, p.Info().Network.RemoteAddress, p.Info().Network.LocalAddress)

   }

    p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))

    go func() {

      defer p.wg.Done()

      err := proto.Run(p, rw)

      if err == nil {

        p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))

        err = errProtocolReturned

     } else if err != io.EOF {

        p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)

     }

      p.protoErr <- err

   }()

 }

}


最后通过一个loop循环来处理错误或者断开连接等操作:

  // Wait for an error or disconnect.

loop:

  for {

    select {

    case err = <-writeErr:

      // A write finished. Allow the next write to start if

      // there was no error.

      if err != nil {

        reason = DiscNetworkError

        break loop

     }

      writeStart <- struct{}{}

    case err = <-readErr:

      if r, ok := err.(DiscReason); ok {

        remoteRequested = true

        reason = r

     } else {

        reason = DiscNetworkError

     }

      break loop

    case err = <-p.protoErr:

      reason = discReasonForError(err)

      break loop

    case err = <-p.disc:

      reason = discReasonForError(err)

      break loop

   }

 }

  close(p.closed)

  p.rw.close(reason)

  p.wg.Wait()

  return remoteRequested, err创数据库


newPersistentDB函数用于创建一个持久化的数据库用于存储节点信息:

// filedir:go-ethereum-1.10.2\p2p\enode\nodedb.go L95

// newPersistentNodeDB creates/opens a leveldb backed persistent node database,

// also flushing its contents in case of a version mismatch.

func newPersistentDB(path string) (*DB, error) {

  opts := &opt.Options{OpenFilesCacheCapacity: 5}

  db, err := leveldb.OpenFile(path, opts)

  if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {

    db, err = leveldb.RecoverFile(path, nil)

 }

  if err != nil {

    return nil, err

 }

  currentVer := make([]byte, binary.MaxVarintLen64)

  currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]

  blob, err := db.Get([]byte(dbVersionKey), nil)

  switch err {

  case leveldb.ErrNotFound:

    // Version not found (i.e. empty cache), insert it

    if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {

      db.Close()

      return nil, err

   }

  case nil:

    // Version present, flush if different

    if !bytes.Equal(blob, currentVer) {

      db.Close()

      if err = os.RemoveAll(path); err != nil {

        return nil, err

     }

      return newPersistentDB(path)

   }

 }

  return &DB{lvl: db, quit: make(chan struct{})}, nil

}


节点超时

ensureExpirer函数用于检查节点是否超时,具体实现代码如下所示:

func (db *DB) ensureExpirer() {

  db.runner.Do(func() { go db.expirer() })

}

func (db *DB) expirer() {

  tick := time.NewTicker(dbCleanupCycle)

  defer tick.Stop()

  for {

    select {

    case <-tick.C:

      db.expireNodes()

    case <-db.quit:

      return

   }

 }

}

func (db *DB) expireNodes() {

  it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)

  defer it.Release()

  if !it.Next() {

    return

 }

  var (

    threshold   = time.Now().Add(-dbNodeExpiration).Unix()

    youngestPong int64

    atEnd     = false

 )

  for !atEnd {

    id, ip, field := splitNodeItemKey(it.Key())

    if field == dbNodePong {

      time, _ := binary.Varint(it.Value())

      if time > youngestPong {

        youngestPong = time

     }

      if time < threshold {

        // Last pong from this IP older than threshold, remove fields belonging to it.

        deleteRange(db.lvl, nodeItemKey(id, ip, ""))

     }

   }

    atEnd = !it.Next()

    nextID, _ := splitNodeKey(it.Key())

    if atEnd || nextID != id {

      if youngestPong > 0 && youngestPong < threshold {

        deleteRange(db.lvl, nodeKey(id))

     }

      youngestPong = 0

   }

 }

}


状态更新

下面是一些状态更新函数:

// LastPingReceived retrieves the time of the last ping packet received from

// a remote node.

func (db *DB) LastPingReceived(id ID, ip net.IP) time.Time {

  if ip = ip.To16(); ip == nil {

    return time.Time{}

 }

  return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePing)), 0)

}

// UpdateLastPingReceived updates the last time we tried contacting a remote node.

func (db *DB) UpdateLastPingReceived(id ID, ip net.IP, instance time.Time) error {

  if ip = ip.To16(); ip == nil {

    return errInvalidIP

 }

  return db.storeInt64(nodeItemKey(id, ip, dbNodePing), instance.Unix())

}

// LastPongReceived retrieves the time of the last successful pong from remote node.

func (db *DB) LastPongReceived(id ID, ip net.IP) time.Time {

  if ip = ip.To16(); ip == nil {

    return time.Time{}

 }

  // Launch expirer

  db.ensureExpirer()

  return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePong)), 0)

}

// UpdateLastPongReceived updates the last pong time of a node.

func (db *DB) UpdateLastPongReceived(id ID, ip net.IP, instance time.Time) error {

  if ip = ip.To16(); ip == nil {

    return errInvalidIP

 }

  return db.storeInt64(nodeItemKey(id, ip, dbNodePong), instance.Unix())

}

// FindFails retrieves the number of findnode failures since bonding.

func (db *DB) FindFails(id ID, ip net.IP) int {

  if ip = ip.To16(); ip == nil {

    return 0

 }

  return int(db.fetchInt64(nodeItemKey(id, ip, dbNodeFindFails)))

}

// UpdateFindFails updates the number of findnode failures since bonding.

func (db *DB) UpdateFindFails(id ID, ip net.IP, fails int) error {

  if ip = ip.To16(); ip == nil {

    return errInvalidIP

 }

  return db.storeInt64(nodeItemKey(id, ip, dbNodeFindFails), int64(fails))

}

// FindFailsV5 retrieves the discv5 findnode failure counter.

func (db *DB) FindFailsV5(id ID, ip net.IP) int {

  if ip = ip.To16(); ip == nil {

    return 0

 }

  return int(db.fetchInt64(v5Key(id, ip, dbNodeFindFails)))

}

// UpdateFindFailsV5 stores the discv5 findnode failure counter.

func (db *DB) UpdateFindFailsV5(id ID, ip net.IP, fails int) error {

  if ip = ip.To16(); ip == nil {

    return errInvalidIP

 }

  return db.storeInt64(v5Key(id, ip, dbNodeFindFails), int64(fails))

}


节点挑选

QuerySeeds函数用于从数据库里面随机挑选合适种子节点:

// QuerySeeds retrieves random nodes to be used as potential seed nodes

// for bootstrapping.

func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {

  var (

    now   = time.Now()

    nodes = make([]*Node, 0, n)

    it   = db.lvl.NewIterator(nil, nil)

    id   ID

 )

  defer it.Release()

seek:

  for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {

    ctr := id[0]

    rand.Read(id[:])

    id[0] = ctr + id[0]%16

    it.Seek(nodeKey(id))

    n := nextNode(it)

    if n == nil {

      id[0] = 0

      continue seek // iterator exhausted

   }

    if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge {

      continue seek

   }

    for i := range nodes {

      if nodes[i].ID() == n.ID() {

        continue seek // duplicate

     }

   }

    nodes = append(nodes, n)

 }

  return nodes

}

func nextNode(it iterator.Iterator) *Node {

  for end := false; !end; end = !it.Next() {

    id, rest := splitNodeKey(it.Key())

    if string(rest) != dbDiscoverRoot {

      continue

   }

    return mustDecodeNode(id[:], it.Value())

 }

  return nil

}


总结

P2P网络是区块链分布式网络结构的基础,本篇文章详细介绍了P2P网络的基本原理,包括节点发现机制、分布式哈希表、节点查找、节点新增、节点移除、请求处理等,同时从源码角度对以太坊源码中P2P网络的实现做了较为细致的分析,探索了以太坊P2P网络的工作流程以以及安全设计,而公链安全体系的建设依旧是长路漫漫,有待进一步深入探索


实验室官网:www.knownseclab.com

知道创宇唯一指定存证平台www.attest.im

联系我们:[email protected]


知道创宇区块链安全实验室导航


微信公众号

@ 创宇区块链安全实验室

网络

官方网站

@ 知道创宇区块链安全实验室

网络

微博

@ 知道创宇区块链实验室

https://weibo.com/BlockchainLab


知乎

@ 知道创宇区块链安全实验室

https://www.zhihu.com/org/zhi-dao-chuang-yu-qu-kuai-lian-an-quan-shi-yan-shi


Twitter

@KS_Blockchain

https://twitter.com/KSBlockchain


网络

声明:本文为入驻“MarsBit 专栏”作者作品,不代表MarsBit官方立场。
转载请联系网页底部:内容合作栏目,邮件进行授权。授权后转载时请注明出处、作者和本文链接。未经许可擅自转载本站文章,将追究相关法律责任,侵权必究。
提示:投资有风险,入市须谨慎,本资讯不作为投资理财建议。
免责声明:本文不构成投资建议,用户应考虑本文中的任何意见、观点或结论是否符合其特定状况,及遵守所在国家和地区的相关法律法规。