以太坊的网络层详细实现

动机

在了解了底层的kad网络后,明确了基础的网络存储方案以及发现方法,但是我们还是不清楚在以太坊中底层的网络层是如何运作的,毕竟上层应用其实根本不关注下层网络层的运转,而是直接调用各种节点信息进行点对点通信。

基础结构

首先我们关注网络层的基础结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type Server struct {

Config // 网络层的配置


newTransport func(net.Conn) transport // 测试使用的协议钩子
newPeerHook func(*Peer) // 测试使用的钩子

lock sync.Mutex // 服务锁
running bool // 是否运行的标志位

ntab discoverTable // 发现表接口
listener net.Listener // 网络监听接口
ourHandshake *protoHandshake // 握手协议
lastLookup time.Time // 上一次执行发现任务的时间
DiscV5 *discv5.Network // discv5版本的发现网络

peerOp chan peerOpFunc // 传入相关操作peer的函数
peerOpDone chan struct{} // peer操作完成的标志位

quit chan struct{} // 退出标志位
addstatic chan *discover.Node //增加静态节点操作
removestatic chan *discover.Node //移除静态节点操作
posthandshake chan *conn //发出握手连接操作
addpeer chan *conn //增加节点操作
delpeer chan peerDrop //删除节点操作
loopWG sync.WaitGroup // 用来等待协程完成
peerFeed event.Feed // 发送事件通知
log log.Logger // 日志
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
type Config struct {
PrivateKey *ecdsa.PrivateKey `toml:"-"` // 私钥

MaxPeers int // 允许的最大连接数量的peer

MaxPendingPeers int `toml:",omitempty"` // 允许的最大在握手等待阶段的节点数量

DialRatio int `toml:",omitempty"` // 允许的正在连接的比例

NoDiscovery bool // 不发现标志位

DiscoveryV5 bool `toml:",omitempty"` // 使用v5版本发现模式


Name string `toml:"-"` // 当前节点的名称


BootstrapNodes []*discover.Node // 初始节点列表

BootstrapNodesV5 []*discv5.Node `toml:",omitempty"` // v5版本初始节点列表

StaticNodes []*discover.Node // 预配置的静态节点列表,长期保持连接

TrustedNodes []*discover.Node // 预先配置的信任节点列表,长期保持连接

NetRestrict *netutil.Netlist `toml:",omitempty"` // 限制访问ip列表

NodeDatabase string `toml:",omitempty"` // 节点数据库路径

Protocols []Protocol `toml:"-"` // 支持协议列表

ListenAddr string // 监听地址

NAT nat.Interface `toml:",omitempty"` // NAT映射端口


Dialer NodeDialer `toml:"-"` // 节点连接维护

NoDial bool `toml:",omitempty"` // 不与任何节点连接标志位

// If EnableMsgEvents is set then the server will emit PeerEvents
// whenever a message is sent to or received from a peer
EnableMsgEvents bool // 允许节点的事件通知

Logger log.Logger `toml:",omitempty"` // log
}

从基础结构可以看出来,整个网络层核心做的事情主要有三件

  • 对网络发现表(K桶)的管理
  • 与节点连接进行管理
  • 对底层网络消息协议进行处理,并且执行对应操作

服务启动流程

接下来我们关注一下整个server在启动时会做哪些事情,server的启动是通过server.Start()方法进行启动的,在这部分我们将对此进行分析

签名与验签

接下来我们就开始一个一个分析每部分做了啥

初始化数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 初始化
srv.lock.Lock()
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")
}
srv.running = true
srv.log = srv.Config.Logger
if srv.log == nil {
srv.log = log.New()
}
srv.log.Info("Starting P2P networking")

// static fields
if srv.PrivateKey == nil {
return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
}
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
if srv.Dialer == nil {
srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn)
srv.delpeer = make(chan peerDrop)
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *discover.Node)
srv.removestatic = make(chan *discover.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})

可以看到初始化的时期主要是对协议和初始连接进行了配置,设置默认协议为newRLPX,连接为net.Dialer

发现机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// 发现机制
if !srv.NoDiscovery || srv.DiscoveryV5 {
addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
if err != nil {
return err
}
conn, err = net.ListenUDP("udp", addr)
if err != nil {
return err
}
realaddr = conn.LocalAddr().(*net.UDPAddr)
if srv.NAT != nil {
if !realaddr.IP.IsLoopback() {
go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
}
// TODO: react to external IP changes over time.
if ext, err := srv.NAT.ExternalIP(); err == nil {
realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
}
}
}

if !srv.NoDiscovery && srv.DiscoveryV5 {
unhandled = make(chan discover.ReadPacket, 100)
sconn = &sharedUDPConn{conn, unhandled}
}

// node table
if !srv.NoDiscovery {
cfg := discover.Config{
PrivateKey: srv.PrivateKey,
AnnounceAddr: realaddr,
NodeDBPath: srv.NodeDatabase,
NetRestrict: srv.NetRestrict,
Bootnodes: srv.BootstrapNodes,
Unhandled: unhandled,
}
ntab, err := discover.ListenUDP(conn, cfg)
if err != nil {
return err
}
srv.ntab = ntab
}

if srv.DiscoveryV5 {
var (
ntab *discv5.Network
err error
)
if sconn != nil {
ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
} else {
ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
}
if err != nil {
return err
}
if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
return err
}
srv.DiscV5 = ntab
}

发现机制在这个版本的以太坊中实现了两种,一种是普通版本的发现机制,另外一种是v5版本的发现机制,这个发现机制就是严格的按照KAD网络实现的一个基础K桶操作,具体内容可以看之前的kad网络实现机制

监听协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (srv *Server) listenLoop() {
defer srv.loopWG.Done()
srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))

tokens := defaultMaxPendingPeers
if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
slots <- struct{}{}
}

for {
// Wait for a handshake slot before accepting.
<-slots

var (
fd net.Conn
err error
)
for {
fd, err = srv.listener.Accept()
if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
srv.log.Debug("Temporary read error", "err", err)
continue
} else if err != nil {
srv.log.Debug("Read error", "err", err)
return
}
break
}

// Reject connections that do not match NetRestrict.
if srv.NetRestrict != nil {
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
fd.Close()
slots <- struct{}{}
continue
}
}

fd = newMeteredConn(fd, true)
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
go func() {
srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{}
}()
}
}

监听协程其实本质上是给一些连接比较慢的节点一些连接机会,容忍一些普通的错误,并且对于限制名单进行处理,如果不满足限制名单则关闭连接。对于这个连接机会同样开启了一个数量限制,等待连接中的连接数不能超过太多。当连接能够被允许时,则运行srv.SetupConn来接受连接,开始连接流程。

简单流程图如下:

签名与验签

主协程

主协程做的事情稍微多一些,以下是简单的流程图

签名与验签

调度任务的执行是比较简单的

1
2
3
4
5
6
7
8
9
10
// 调度任务
scheduleTasks := func() {
// Start from queue first.
queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
// Query dialer for new tasks and start as many as possible now.
if len(runningTasks) < maxActiveDialTasks {
nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
queuedTasks = append(queuedTasks, startTasks(nt)...)
}
}

当任务队列数量不满时,就会生成一个newTasks并且执行之,保证整个主协程一直有任务在执行

协议分类执行的分支相对较多,以下是具体的分类协议

  • 服务退出
  • 增加静态节点
  • 移除静态节点
  • 节点操作
  • 任务完成
  • 握手连接
  • 增加节点
  • 删除节点

节点管理

接下来我们关注整个流程中如何来管理节点的

节点管理整体上来看是两个部分,扩充预备的节点数量以及对已经发现的节点进行存储

扩充节点数量

整体的服务中与节点相关的主要是如下接口

1
2
3
4
5
6
7
type discoverTable interface {
Self() *discover.Node
Close()
Resolve(target discover.NodeID) *discover.Node
Lookup(target discover.NodeID) []*discover.Node
ReadRandomNodes([]*discover.Node) int
}

整体流程还是通过发现任务和KAD基础网络,在添加节点上最核心的方法在于lookup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
var (
target = crypto.Keccak256Hash(targetID[:])
asked = make(map[NodeID]bool)
seen = make(map[NodeID]bool)
reply = make(chan []*Node, alpha)
pendingQueries = 0
result *nodesByDistance
)
// don't query further if we hit ourself.
// unlikely to happen often in practice.
asked[tab.self.ID] = true

for {
tab.mutex.Lock()
// generate initial result set
result = tab.closest(target, bucketSize)
tab.mutex.Unlock()
if len(result.entries) > 0 || !refreshIfEmpty {
break
}
// The result set is empty, all nodes were dropped, refresh.
// We actually wait for the refresh to complete here. The very
// first query will hit this case and run the bootstrapping
// logic.
<-tab.refresh()
refreshIfEmpty = false
}

for {
// ask the alpha closest nodes that we haven't asked yet
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
n := result.entries[i]
if !asked[n.ID] {
asked[n.ID] = true
pendingQueries++
go func() {
// Find potential neighbors to bond with
r, err := tab.net.findnode(n.ID, n.addr(), targetID)
if err != nil {
// Bump the failure counter to detect and evacuate non-bonded entries
fails := tab.db.findFails(n.ID) + 1
tab.db.updateFindFails(n.ID, fails)
log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)

if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
tab.delete(n)
}
}
reply <- tab.bondall(r)
}()
}
}
if pendingQueries == 0 {
// we have asked all closest nodes, stop the search
break
}
// wait for the next reply
for _, n := range <-reply {
if n != nil && !seen[n.ID] {
seen[n.ID] = true
result.push(n, bucketSize)
}
}
pendingQueries--
}
return result.entries
}

整体流程图如下

签名与验签

调用lookup的地方我们也能看到,是在发现任务时随机生成一个目标节点,然后去发现这个目标节点,通过找到周边节点来扩充节点的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
func (t *discoverTask) Do(srv *Server) {
// newTasks generates a lookup task whenever dynamic dials are
// necessary. Lookups need to take some time, otherwise the
// event loop spins too fast.
next := srv.lastLookup.Add(lookupInterval)
if now := time.Now(); now.Before(next) {
time.Sleep(next.Sub(now))
}
srv.lastLookup = time.Now()
var target discover.NodeID
rand.Read(target[:])
t.results = srv.ntab.Lookup(target)
}

节点存储

当在内存中的发现节点连接时间超过最小连接时间后,会触发copyBondedNodes方法,将内存中的发现数据存储到leveldb中。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (tab *Table) copyBondedNodes() {
tab.mutex.Lock()
defer tab.mutex.Unlock()

now := time.Now()
for _, b := range tab.buckets {
for _, n := range b.entries {
if now.Sub(n.addedAt) >= seedMinTableTime {
tab.db.updateNode(n)
}
}
}
}

最终的存储结构如下代码所示

1
2
3
4
5
6
7
func (db *nodeDB) updateNode(node *Node) error {
blob, err := rlp.EncodeToBytes(node)
if err != nil {
return err
}
return db.lvl.Put(makeKey(node.ID, nodeDBDiscoverRoot), blob, nil)
}

可以看到,这个存储是完全周期性的将内存中满足条件(主要是连接时间)的节点存储到数据库中的。

协议处理

协议处理分为两种类型,一种是初始创建连接时,这里的代表就是addpeer,另一种就是连接已经通顺之后的数据通信,例如KAD网络的基础操作PING,FINDNODE等

add peer

之前在服务启动的时候提到,当一个连接到来之后满足要求后会触发SetupConn方法,add peer就是在这个方法中完成的。
整体流程如下

签名与验签

可以看到,add peer前期主要是做一些握手检查,然后在确认可以连接之后启动了peer协程,通过readLoop协程一直接收来自该peer的信息。

普通操作

这里我们以findnode为例

发包

发包的核心其实就在于p2p.Send方法

收取包

readloop可以看到,这里调用的是协议transport中的ReadMsg方法,从这里可以辗转得到实际ReadMsg的方法为func (rw *rlpxFrameRW) ReadMsg() (msg Msg, err error)

这里就不展开了,通过ReadMsg的解包后进入了handle方法,如果是ping方法,就在这一层进行了处理,如果是子协议,就开始在更高层的protocal.Run方法进行处理,这里有兴趣可以关注func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error)
这里所写的各种协议

总结

这里我们可以看到整体来说p2p的server层结构还是很清晰的,Server.go来管理整体的服务,Peer.go是对每个连接上的节点数据进行管理和处理,Message.goy终于处理整体的消息结构体,rlpx是最底层的协议,dail.go维护任务执行,discover模块实现了KAD网络。总的来说就是基于KAD网络的一个基础模块