【原创】HPB快速同步之header数据处理源码解析(二)


#1

前文介绍了header是如何同步下来的,本文继续介绍同步下来的header是如何处理的。

前文介绍header同步的时候有个fetchParts函数,当header数据同步下来后,会触发deliveryCh事件,执行deliver(packet)操作。deliver的定义如下

deliver = func(packet dataPack) (int, error) {
			pack := packet.(*headerPack)
			return this.syncer.sch.DeliverHeaders(pack.peerId, pack.headers, this.headerProcCh)
		}

接着看DeliverHeaders,代码如下:

func (this *scheduler) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
	this.lock.Lock()
	defer this.lock.Unlock()

	// Short circuit if the data was never requested
	request := this.headerPendPool[id]
	if request == nil {
		return 0, errNoFetchesPending
	}
	headerReqTimer.UpdateSince(request.Time)
	delete(this.headerPendPool, id)

	// Ensure headers can be mapped onto the skeleton chain
	target := this.headerTaskPool[request.From].Hash()
	log.Error("DeliverHeaders", "length", len(headers))
	accepted := len(headers) == MaxHeaderFetch
	if accepted {
		if headers[0].Number.Uint64() != request.From {
			log.Error("First header broke chain ordering", "peer", id, "number", headers[0].Number, "hash", headers[0].Hash(), request.From)
			accepted = false
		} else if headers[len(headers)-1].Hash() != target {
			log.Error("Last header broke skeleton structure ", "peer", id, "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target)
			accepted = false
		}
	}
	if accepted {
		for i, header := range headers[1:] {
			hash := header.Hash()
			if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
				log.Warn("Header broke chain ordering", "peer", id, "number", header.Number, "hash", hash, "expected", want)
				accepted = false
				break
			}
			if headers[i].Hash() != header.ParentHash {
				log.Warn("Header broke chain ancestry", "peer", id, "number", header.Number, "hash", hash)
				accepted = false
				break
			}
		}
	}
	// If the batch of headers wasn't accepted, mark as unavailable
	if !accepted {
		log.Trace("Skeleton filling not accepted", "peer", id, "from", request.From)

		miss := this.headerPeerMiss[id]
		if miss == nil {
			this.headerPeerMiss[id] = make(map[uint64]struct{})
			miss = this.headerPeerMiss[id]
		}
		miss[request.From] = struct{}{}

		this.headerTaskQueue.Push(request.From, -float32(request.From))
		return 0, errors.New("delivery not accepted")
	}
	// Clean up a successful fetch and try to deliver any sub-results
	copy(this.headerResults[request.From-this.headerOffset:], headers)
	delete(this.headerTaskPool, request.From)

	ready := 0
	for this.headerProced+ready < len(this.headerResults) && this.headerResults[this.headerProced+ready] != nil {
		ready += MaxHeaderFetch
	}
	if ready > 0 {
		//Headers are ready for delivery, gather them and push forward (non blocking)
		process := make([]*types.Header, ready)
		copy(process, this.headerResults[this.headerProced:this.headerProced+ready])

		select {
		case headerProcCh <- process:
			log.Trace("Pre-scheduled new headers", "peer", id, "count", len(process), "from", process[0].Number)
			this.headerProced += len(process)
		default:
		}
	}
	// Check for termination and return
	if len(this.headerTaskPool) == 0 {
		this.headerContCh <- false
	}
	return len(headers), nil
}

DeliverHeaders函数主要目的是对headers进行序号和hash校验,校验通过后触发headerProcCh事件。headerProch的处理是在processHeaders函数中,看看都发生了什么,代码如下:

func (this *fastSync) processHeaders(origin uint64, td *big.Int) error {
	// Calculate the pivoting point for switching from fast to slow sync
	pivot := this.syncer.sch.FastSyncPivot()
	rollback := []*types.Header{}

	// Wait for batches of headers to process
	gotHeaders := false

	for {
		select {
		case <-this.cancelCh:
			return errCancelHeaderProcessing

		case headers := <-this.headerProcCh:
			
			for len(headers) > 0 {
				gotHeaders = true
				// Terminate if something failed in between processing chunks
				select {
				case <-this.cancelCh:
					return errCancelHeaderProcessing
				default:
				}
				// Select the next chunk of headers to import
				limit := maxHeadersProcess
				if limit > len(headers) {
					limit = len(headers)
				}
				chunk := headers[:limit]
				unknown := make([]*types.Header, 0, len(headers))
				for _, header := range chunk {
					if !this.syncer.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
						unknown = append(unknown, header)
					}
				}
				// If we're importing pure headers, verify based on their recentness
				frequency := fsHeaderCheckFrequency
				if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
					frequency = 1
				}
				if n, err := this.syncer.lightchain.InsertHeaderChain(chunk, frequency, config.FastSync); err != nil {
					// If some headers were inserted, add them too to the rollback list
					if n > 0 {
						rollback = append(rollback, chunk[:n]...)
					}
					log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
					if err == consensus.ErrInvalidblockbutnodrop {
						return consensus.ErrInvalidblockbutnodrop
					}
					return errInvalidChain
				}
				// All verifications passed, store newly found uncertain headers
				rollback = append(rollback, unknown...)
				if len(rollback) > fsHeaderSafetyNet {
					rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
				}
				// If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
				if this.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
					if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != this.fsPivotLock.Hash() {
						log.Warn("Pivot doesn't match locked in one", "remoteNumber", pivot.Number, "remoteHash", pivot.Hash(), "localNumber", this.fsPivotLock.Number, "localHash", this.fsPivotLock.Hash())
						return errInvalidChain
					}
				}
				
				for this.syncer.sch.PendingBlocks() >= maxQueuedHeaders || this.syncer.sch.PendingReceipts() >= maxQueuedHeaders {
					select {
					case <-this.cancelCh:
						return errCancelHeaderProcessing
					case <-time.After(time.Second):
					}
				}
				// Otherwise insert the headers for content retrieval
				inserts := this.syncer.sch.Schedule(chunk, origin)
				if len(inserts) != len(chunk) {
					log.Debug("Stale headers")
					return errBadPeer
				}
				headers = headers[limit:]
				origin += uint64(limit)
			}
			// Signal the content fast syncers of the availablility of new tasks
			for _, ch := range []chan bool{this.bodyWakeCh, this.receiptWakeCh} {
				select {
				case ch <- true:
				default:
				}
			}
		}
	}
}

如果headers不为空的话,在unknown中存储未放到链中的header,然后就调用InsertHeaderChain将headers插入到headerchain中,插入的过程也是先对header进行校验,然后将header写入数据库,代码在headerchain.go中。校验和写入的过程这里不细说了。
然后this.syncer.sch.Schedule(chunk, origin),通过Schedule添加body同步的任务。
现在header插入成功了,可以根据header下载body了,因此触发bodyWakeCh事件。另外需要注意的是,区块在插入到链中的时候,会涉及到交易收条的生成,如果要查询交易收条则需要同步收条,快速同步中是需要同步收条的。

本文介绍到这里,已触发了body的同步,下文介绍body的同步与处理过程。