【原创】HPB快速同步之body同步与数据处理源码解析(三)


#1

前文header的处理触发了body的同步,即insertheaderchain成功后,触发了bodywakech事件,这里看看body的同步与处理是怎样的,入口函数是fetchBodies,注意函数内定义的几个函数

func (this *fastSync) fetchBodies(from uint64) error {
	log.Debug("fast syncing block bodies", "origin", from)

	var (
		deliver = func(packet dataPack) (int, error) {
			pack := packet.(*bodyPack)
			return this.syncer.sch.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
		}
		expire   = func() map[string]int { return this.syncer.sch.ExpireBodies(this.syncer.requestTTL()) }
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
		capacity = func(p *peerConnection) int { return p.BlockCapacity(this.requestRTT()) }
		setIdle  = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
	)
	err := this.fetchParts(errCancelBodyFetch, this.bodyCh, deliver, this.bodyWakeCh, expire,
		this.syncer.sch.PendingBlocks, this.syncer.sch.InFlightBlocks, this.syncer.sch.ShouldThrottleBlocks, this.syncer.sch.ReserveBodies,
		this.bodyFetchHook, fetch, this.syncer.sch.CancelBodies, capacity, this.syncer.peers.BodyIdlePeers, setIdle, "bodies")

	log.Debug("Block body fast sync terminated", "err", err)
	return err
}

fetchParts的内部处理与headers调用的是一样的,这里不贴代码了,只是参数不一样。bodyWakeCh分支触发了update事件,进行body的同步。ReserveBodies用于获取同步body的任务,核心仍然在reserveHeaders。

func (this *scheduler) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) {
	isNoop := func(header *types.Header) bool {
		return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
	}
	this.lock.Lock()
	defer this.lock.Unlock()

	return this.reserveHeaders(p, count, this.blockTaskPool, this.blockTaskQueue, this.blockPendPool, this.blockDonePool, isNoop)
}

注意调用reserveHeaders的参数,blockTaskPool,blockTashQueue 都是在processheader后,通知bodywakech之前通过Schedule函数写入的。

func (this *scheduler) Schedule(headers []*types.Header, from uint64) []*types.Header {
	this.lock.Lock()
	defer this.lock.Unlock()

	// Insert all the headers prioritised by the contained block number
	inserts := make([]*types.Header, 0, len(headers))
	for _, header := range headers {
		// Make sure chain order is honoured and preserved throughout
		hash := header.Hash()
		if header.Number == nil || header.Number.Uint64() != from {
			log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from)
			break
		}
		if this.headerHead != (common.Hash{}) && this.headerHead != header.ParentHash {
			log.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
			break
		}
		// Make sure no duplicate requests are executed
		if _, ok := this.blockTaskPool[hash]; ok {
			log.Warn("Header  already scheduled for block fetch", "number", header.Number, "hash", hash)
			continue
		}
		if _, ok := this.receiptTaskPool[hash]; ok {
			log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)
			continue
		}
		// Queue the header for content retrieval
		this.blockTaskPool[hash] = header
		this.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))

		if this.mode == config.FastSync && header.Number.Uint64() <= this.fastSyncPivot {
			// Fast phase of the fast sync, retrieve receipts too
			this.receiptTaskPool[hash] = header
			this.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
		}
		inserts = append(inserts, header)
		this.headerHead = hash
		from++
	}
	return inserts
}

到这里,通过reserveHeaders获取到body的下载任务后,即可以通过FetchBodies开启下载了。

func (p *peerConnection) FetchBodies(request *fetchRequest) error {
	// Sanity check the protocol version
	if p.version < config.ProtocolV111 {
		panic(fmt.Sprintf("body fetch [protocol version/d%+] requested on [protocol version/%d]", config.ProtocolV111, p.version))
	}
	// Short circuit if the peer is already fetching
	if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
		return errAlreadyFetching
	}
	p.blockStarted = time.Now()

	// Convert the header set to a retrievable slice
	hashes := make([]common.Hash, 0, len(request.Headers))
	for _, header := range request.Headers {
		hashes = append(hashes, header.Hash())
	}
	go p.peer.RequestBodies(hashes)

	return nil
}

从上面代码可以看出,实际上对bodies的下载是通过块的hash进行的,而不是根据块号执行的。

go p.peer.RequestBodies(hashes) 实际上是向连接的peer发送GetBlockBodiesMsg请求。peer收到请求后,根据hash查出body然后发回来。

下面看看收到bodies后是如何处理的。

func HandleBlockBodiesMsg(p *p2p.Peer, msg p2p.Msg) error {
	// A batch of block bodies arrived to one of our previous requests
	var request blockBodiesData
	if err := msg.Decode(&request); err != nil {
		return p2p.ErrResp(p2p.ErrDecode, "msg %v: %v", msg, err)
	}
	// Deliver them all to the downloader for queuing
	trasactions := make([][]*types.Transaction, len(request))
	uncles := make([][]*types.Header, len(request))

	for i, body := range request {
		trasactions[i] = body.Transactions
		uncles[i] = body.Uncles
	}
	// Filter out any explicitly requested bodies, deliver the rest to the downloader
	filter := len(trasactions) > 0 || len(uncles) > 0
	if filter {
		trasactions, uncles = InstanceSynCtrl().puller.FilterBodies(p.GetID(), trasactions, uncles, time.Now())
	}
	if len(trasactions) > 0 || len(uncles) > 0 || !filter {
		err := InstanceSynCtrl().syner.DeliverBodies(p.GetID(), trasactions, uncles)
		if err != nil {
			log.Debug("Failed to deliver bodies", "err", err)
		}
	}
	return nil
}

收到对方peer返回的bodies后, 将bodies拆分成transactions和uncles, 因为body就是有transactions和uncles组成。然后是DeliverBodies,而DeliverBodies主要目的就是触发bodyCh事件。这里不贴代码了。直接看收到bodych后是如何处理的。

这里又回到了fetchBodies的fetchParts处理里。

case packet := <-deliveryCh:
			if peer := this.syncer.peers.Peer(packet.PeerId()); peer != nil {
				// Deliver the received chunk of data and check chain validity
				accepted, err := deliver(packet)
				if err == errInvalidChain {
					return err
				}
				if err != errStaleDelivery {
					setIdle(peer, accepted)
				}
			}
			// Blocks assembled, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

先校验数据来源的peer是否合法,然后就是deliver了,deliver的定义在fetchBodies函数中,实际调用的是DeliverBodies函数。

func (this *scheduler) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
	this.lock.Lock()
	defer this.lock.Unlock()

	reconstruct := func(header *types.Header, index int, result *fetchResult) error {
		if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
			return errInvalidBody
		}
		result.Transactions = txLists[index]
		result.Uncles = uncleLists[index]
		return nil
	}
	return this.deliver(id, this.blockTaskPool, this.blockTaskQueue, this.blockPendPool, this.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
}

这里的reconstruct函数将txLists根据header放入对应的result中。看deliver中的具体实现。

func (this *scheduler) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
	pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
	results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
	request := pendPool[id]
	for i, header := range request.Headers {
		// Short circuit assembly if no more fetch results are found
		if i >= results {
			break
		}
		// Reconstruct the next result if contents match up
		index := int(header.Number.Int64() - int64(this.resultOffset))
		if index >= len(this.resultCache) || index < 0 || this.resultCache[index] == nil {
			failure = errInvalidChain
			log.Error("invalid hash chain(deliver)", "index", index, "len(resultCache)", len(this.resultCache))
			break
		}
		if err := reconstruct(header, i, this.resultCache[index]); err != nil {
			failure = err
			break
		}
		donePool[header.Hash()] = struct{}{}
		this.resultCache[index].Pending--
		useful = true
		accepted++

		// Clean up a successful fetch
		request.Headers[i] = nil
		delete(taskPool, header.Hash())
	}
	// Return all failed or missing fetches to the queue
	for _, header := range request.Headers {
		if header != nil {
			taskQueue.Push(header, -float32(header.Number.Uint64()))
		}
	}
	// Wake up WaitResults
	if accepted > 0 {
		this.active.Signal()
	}
	// If none of the data was good, it's a stale delivery
	switch {
	case failure == nil || failure == errInvalidChain:
		return accepted, failure
	case useful:
		return accepted, fmt.Errorf("partial failure: %v", failure)
	default:
		return accepted, errStaleDelivery
	}
}

先从pendpool中取出request,这里的request是请求bodies时生成的。遍历request,然后通过reconstruct将body填充到resultCache中,这样一来resultcache中既有header,又有body,就可以组成一个完整的块了。

到这里,header,body就到有了,而且统一存储在resultCache中。之后就是如何对resultCache的处理。将在下文介绍。

本文结束。