以太坊之区块交易的广播

问题描述

在区块链的p2p网络中,当自己产生或者收到一个新区块的消息时,在自己校验了区块之后应该会广播出去,这里就会思考一个问题,如果每收到一个块都广播给周围的所有人,会不会形成一次广播的风暴?因此我们开始来观察在以太坊中是如何处理广播区块这个问题的。

以太坊的区块广播

首先,我们确定有哪些情况会导致区块广播,进行怎么样的广播,这里我们可以从protocalManger入手,因为在以太坊的devp2p通信中,所有的信息都是通过protocal传输的,从protocalManger的start方法中可以看到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers

// 广播交易
pm.txCh = make(chan core.TxPreEvent, txChanSize)
pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
go pm.txBroadcastLoop()

// 广播挖出的区块
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()

// 同步处理
go pm.syncer()
go pm.txsyncLoop()
}

广播交易

广播交易这个做法很干脆,订阅了TxPreEvent之后,就开始等待事件的到来,一旦收到交易,就开始进行如下的广播

1
2
3
4
5
6
7
8
9
10
11
12
// 找到周围没有这条交易的peer,对这条交易进行广播
func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {

// 找到周围的没有这条hash的peer
peers := pm.peers.PeersWithoutTx(hash)

//向这些peer发送这条交易,并且在自己的peer集中增加这条交易的记录
for _, peer := range peers {
peer.SendTransactions(types.Transactions{tx})
}
log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers))
}

这里就留下了一个有趣的问题,如何知道周围节点有哪些交易的

对这个p.knownTxs的add操作一共有两处地方,一处是在

1
2
3
4
5
6
func (p *peer) SendTransactions(txs types.Transactions) error {
for _, tx := range txs {
p.knownTxs.Add(tx.Hash())
}
return p2p.Send(p.rw, TxMsg, txs)
}

可以看到,这里是在发送交易前就主动在自己的peerSet中对需要被发送交易的peer进行了记录有哪些交易已经被发送了

另外一处是在处理协议消息处

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// /eth/handler.go:657
case msg.Code == TxMsg:
// 交易消息到达时,确保目前是处于一个可以处理交易的状态
if atomic.LoadUint32(&pm.acceptTxs) == 0 {
break
}
//处理交易,并且将其发送到交易池中
var txs []*types.Transaction
if err := msg.Decode(&txs); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
for i, tx := range txs {
if tx == nil {
return errResp(ErrDecode, "transaction %d is nil", i)
}
// 标记该节点的交易有效
p.MarkTransaction(tx.Hash())
}
pm.txpool.AddRemotes(txs)

这里是在协议收到了消息后的处理,可以看到,对于A,B对tx1交易的广播,A在向B广播时,A自己先会标记B已经有了tx1,B在收到A广播时,会标记A有了tx1,这样就不会重复广播了。

区块产生广播

与交易广播类似,区块产生广播也是订阅了挖矿事件,之后开始对挖矿事件进行监听,然后就进入了如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

//向周围广播区块
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {

// 找到周围节点中没有该区块hash的节点
hash := block.Hash()
peers := pm.peers.PeersWithoutBlock(hash)

// 根据标志位进行处理
if propagate {
// 计算区块的TD
var td *big.Int
if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
} else {
log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
return
}
// 发送区块到peer的子集中
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range transfer {
peer.SendNewBlock(block, td)
}
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
return
}
// 如果块确实已经在自己的链中了,广播Hash
if pm.blockchain.HasBlock(hash, block.NumberU64()) {
for _, peer := range peers {
peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
}
log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
}
}

可以看到区块广播和交易的广播很类似,只是区别在于多了一个标志位,当propagate标志位为false时,这个广播只向周围区块广播区块的hash,同样的会在knownBlocks中加上标记,这里和交易广播完全一样。
当propagate标志位为true时,会广播整个区块,不过广播方式会有所区别:
首先需要计算这个区块的TD

TD:total difficulty,所有父区块的复杂度的和,这个不会存在区块中,但是会存在数据库中的一个字段

计算之后广播给当前区块集的子集,子集的算法是总peer数量的开方(暂时不知道为什么用这种方法)

同步区块处理

同步区块协程会周期性的从网络同步hash以及区块数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (pm *ProtocolManager) syncer() {
// 启动同步进程,对各种同步数据进行处理
pm.fetcher.Start()
defer pm.fetcher.Stop()
defer pm.downloader.Terminate()

// 启动定时器强行同步
forceSync := time.NewTicker(forceSyncCycle)
defer forceSync.Stop()

for {
select {
case <-pm.newPeerCh:

if pm.peers.Len() < minDesiredPeerCount {
break
}
go pm.synchronise(pm.peers.BestPeer())

case <-forceSync.C:
// 强制从最佳节点上同步数据
go pm.synchronise(pm.peers.BestPeer())

case <-pm.noMorePeers:
return
}
}
}

我们还是从发送方进行分析,首先需要选择一个peer进行主动同步,也即是BestPeer,简单来说,就是当前时刻td最大的peer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 获取最大td的peer
func (ps *peerSet) BestPeer() *peer {
ps.lock.RLock()
defer ps.lock.RUnlock()

var (
bestPeer *peer
bestTd *big.Int
)
for _, p := range ps.peers {
if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
bestPeer, bestTd = p, td
}
}
return bestPeer
}

接下来,已经找到了最佳的peer,我们就开始关注同步区块的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (pm *ProtocolManager) synchronise(peer *peer) {
if peer == nil {
return
}
// 对自己目前的Td进行计算,确保远程peer的td是大于本地td的,如果大才开始进行同步操作
currentBlock := pm.blockchain.CurrentBlock()
td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())

pHead, pTd := peer.Head()
if pTd.Cmp(td) <= 0 {
return
}
// 下载模式选择
mode := downloader.FullSync
if atomic.LoadUint32(&pm.fastSync) == 1 {
// 如果显式要求是fastSync模式,则设置为fastSync模式
mode = downloader.FastSync
} else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 {
atomic.StoreUint32(&pm.fastSync, 1)
mode = downloader.FastSync
}

if mode == downloader.FastSync {
// 同样确保快速同步模式下的td也是更高的
if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 {
return
}
}

// 运行同步方法
if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
return
}
if atomic.LoadUint32(&pm.fastSync) == 1 {
log.Info("Fast sync complete, auto disabling")
atomic.StoreUint32(&pm.fastSync, 0)
}
atomic.StoreUint32(&pm.acceptTxs, 1)
if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
// 对当前同步到的区块进行广播
go pm.BroadcastBlock(head, false)
}
}

可以看到,主动从远程peer同步区块的核心还是在于td的比较,一切都要远程peer的td比自己大,然后才开始进行区块同步,具体同步的方法是downloader.Synchronise方法,接下来我们细细分析一下这个方法(等待细化)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{})
defer func() {
// reset on error
if err != nil {
d.mux.Post(FailedEvent{err})
} else {
d.mux.Post(DoneEvent{})
}
}()
if p.version < 62 {
return errTooOld
}

log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)
defer func(start time.Time) {
log.Debug("Synchronisation terminated", "elapsed", time.Since(start))
}(time.Now())

// Look up the sync boundaries: the common ancestor and the target block
latest, err := d.fetchHeight(p)
if err != nil {
return err
}
height := latest.Number.Uint64()

origin, err := d.findAncestor(p, height)
if err != nil {
return err
}
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
d.syncStatsChainOrigin = origin
}
d.syncStatsChainHeight = height
d.syncStatsLock.Unlock()

// Ensure our origin point is below any fast sync pivot point
pivot := uint64(0)
if d.mode == FastSync {
if height <= uint64(fsMinFullBlocks) {
origin = 0
} else {
pivot = height - uint64(fsMinFullBlocks)
if pivot <= origin {
origin = pivot - 1
}
}
}
d.committed = 1
if d.mode == FastSync && pivot != 0 {
d.committed = 0
}
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, d.mode)
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}

fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin+1, pivot, td) },
}
if d.mode == FastSync {
fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
} else if d.mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
}
return d.spawnSync(fetchers)
}

同步交易处理

当有节点到来,我们会将现有的等待打包的交易发送给一个新节点,为了利用带宽,我们会打包交易并且一次性发送给一个peer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// /eth/sync.go:65
func (pm *ProtocolManager) txsyncLoop() {
var (
pending = make(map[discover.NodeID]*txsync)
sending = false // whether a send is active
pack = new(txsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
)

// 发送一个包到远程节点,并且从pending中移除已经发送的交易
send := func(s *txsync) {
// Fill pack with transactions up to the target size.
size := common.StorageSize(0)
pack.p = s.p
pack.txs = pack.txs[:0]
for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
pack.txs = append(pack.txs, s.txs[i])
size += s.txs[i].Size()
}

s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
if len(s.txs) == 0 {
delete(pending, s.p.ID())
}
s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
sending = true
go func() { done <- pack.p.SendTransactions(pack.txs) }()
}

// 随机选择下一个等待同步的peer去发送
pick := func() *txsync {
if len(pending) == 0 {
return nil
}
n := rand.Intn(len(pending)) + 1
for _, s := range pending {
if n--; n == 0 {
return s
}
}
return nil
}

for {
select {
case s := <-pm.txsyncCh:
pending[s.p.ID()] = s
if !sending {
send(s)
}
case err := <-done:
sending = false

if err != nil {
pack.p.Log().Debug("Transaction send failed", "err", err)
delete(pending, pack.p.ID())
}
开始调度下一个peer发送情况
if s := pick(); s != nil {
send(s)
}
case <-pm.quitSync:
return
}
}
}