【原创】HPB快速同步之块数据处理源码解析(四)


#1

前文介绍了快速同步的header与body的同步流程,此时在resultCache中已经了完整的block数据。接下来看看要如何处理block,即是如何将block插入到链中。同时也可以了解下快速同步与全量同步的区别。

func (this *fastSync) processFastSyncContent(latest *types.Header) error {

​    stateSync := this.syncState(latest.Root)

​    defer stateSync.Cancel()

​    go func() {

​        if err := stateSync.Wait(); err != nil {

​            this.syncer.sch.Close() // wake up WaitResults

​        }

​    }()

​    pivot := this.syncer.sch.FastSyncPivot()

​    for {

​        results := this.syncer.sch.WaitResults()

​        if len(results) == 0 {

​            return stateSync.Cancel()

​        }

​        if this.chainInsertHook != nil {

​            this.chainInsertHook(results)

​        }

​        P, beforeP, afterP := splitAroundPivot(pivot, results)

​        if err := this.commitFastSyncData(beforeP, stateSync); err != nil {

​            return err

​        }

​        if P != nil {

​            stateSync.Cancel()

​            if err := this.commitPivotBlock(P); err != nil {

​                return err

​            }

​        }

​        if err := this.importBlockResults(afterP); err != nil {

​            return err

​        }

​    }

}

节点处理同步数据时先同步了下最新的stateroot,为什么要同步stateroot后文再说。然后获取到快速同步的pivot。然后通过waitresults()获取到results,其中包含了block需要的header与body。接下来就是利用splitAroudPivot函数根据pivot将results划分为3个阶段。pivot块之前的blocks,pivot块,pivot之后的blocks。

func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
	for _, result := range results {
		num := result.Header.Number.Uint64()
		switch {
		case num < pivot:
			before = append(before, result)
		case num == pivot:
			p = result
		default:
			after = append(after, result)
		}
	}
	return p, before, after
}

为什么要划分成3段,这个其实就是快速同步与全量同步的区别,全量同步的处理相当于所有的块都是进行与after块相同的处理。那before与after有什么不同呢,继续看代码,commitFastSyncData

func (this *fastSync) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
	for len(results) != 0 {
		// Check for any termination requests.
		select {
		case <-this.syncer.quitCh:
			return errCancelContentProcessing
		case <-stateSync.done:
			if err := stateSync.Wait(); err != nil {
				return err
			}
		default:
		}
		// Retrieve the a batch of results to import
		items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
		first, last := results[0].Header, results[items-1].Header
		log.Debug("Inserting fast-sync blocks", "items", len(results),
			"firstnum", first.Number, "firsthash", first.Hash(),
			"lastnumn", last.Number, "lasthash", last.Hash(),
		)
		blocks := make([]*types.Block, items)
		receipts := make([]types.Receipts, items)
		for i, result := range results[:items] {
			blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
			receipts[i] = result.Receipts
		}
		if index, err := bc.InstanceBlockChain().InsertReceiptChain(blocks, receipts); err != nil {
			log.Debug("fast synced item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
			return errInvalidChain
		}
		// Shift the results to the next batch
		results = results[items:]
	}
	return nil
}

从代码逻辑上看,主要是进行insertReceiptChain处理。具体代码在blockchain.go中,这里就不贴出来了,主要是为了writebody,writereceipt。在进行insertheaderchain的时候已经进行过writeheader了。所以,至此pivot之前的块数据都写入到数据库中了。既然pivot块是单独处理的,那必然有不同于before与after共同处理的地方,看看是为啥。

func (this *fastSync) commitPivotBlock(result *fetchResult) error {
	b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
	// Sync the pivot block state. This should complete reasonably quickly because
	// we've already synced up to the reported head block state earlier.
	if err := this.syncState(b.Root()).Wait(); err != nil {
		return err
	}
	log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash())
	if _, err := bc.InstanceBlockChain().InsertReceiptChain([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil {
		return err
	}
	return bc.InstanceBlockChain().FastSyncCommitHead(b.Hash())
}

与before相比,主要区别在于syncState,对于pivot块增加了一个state的同步。为什么要同步state呢?看下after的处理就知道了

func (this *fastSync) importBlockResults(results []*fetchResult) error {
	for len(results) != 0 {
		// Check for any termination requests. This makes clean shutdown faster.
		select {
		case <-this.syncer.quitCh:
			return errCancelContentProcessing
		default:
		}
		// Retrieve the a batch of results to import
		items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
		first, last := results[0].Header, results[items-1].Header
		log.Debug("Inserting fast synced chain", "items", len(results),
			"firstnum", first.Number, "firsthash", first.Hash(),
			"lastnum", last.Number, "lasthash", last.Hash(),
		)
		blocks := make([]*types.Block, items)
		for i, result := range results[:items] {
			blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
		}
		if index, err := bc.InstanceBlockChain().InsertChain(blocks); err != nil {
			log.Debug("fast synced item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
			if err == consensus.ErrInvalidblockbutnodrop {
				return consensus.ErrInvalidblockbutnodrop
			}
			return errInvalidChain
		}
		// Shift the results to the next batch
		results = results[items:]
	}
	return nil
}

针对after块的处理走的是InsertChain(blocks),而insertchain的主要代码如下:

func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) {
	// Do a sanity check that the provided chain is actually ordered and linked
	for i := 1; i < len(chain); i++ {
		if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
			// Chain broke ancestry, log a messge (programming error) and skip insertion
			log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(),
				"parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash())

			return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(),
				chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
		}
	}
	// Pre-checks passed, start the full block imports
	bc.wg.Add(1)
	defer bc.wg.Done()

	bc.chainmu.Lock()
	defer bc.chainmu.Unlock()

	// A queued approach to delivering events. This is generally
	// faster than direct delivery and requires much less mutex
	// acquiring.
	var (
		stats         = insertStats{startTime: mclock.Now()}
		events        = make([]interface{}, 0, len(chain))
		lastCanon     *types.Block
		coalescedLogs []*types.Log
	)
	// Start the parallel header verifier
	headers := make([]*types.Header, len(chain))
	seals := make([]bool, len(chain))

	for i, block := range chain {
		headers[i] = block.Header()
		seals[i] = true
	}

	var mode config.SyncMode
	if len(headers) == 1 {
		mode = config.FullSync
	} else {
		mode = config.FastSync
	}
	abort, results := bc.engine.VerifyHeaders(bc, headers, seals, mode)

	defer close(abort)

	// Iterate over the blocks and insert when the verifier permits
	for i, block := range chain {
		// If the chain is terminating, stop processing blocks
		if atomic.LoadInt32(&bc.procInterrupt) == 1 {
			log.Debug("Premature abort during blocks processing")
			break
		}

		// Wait for the block's verification to complete
		bstart := time.Now()

		err := <-results
		if err == nil {
			err = bc.Validator().ValidateBody(block)
		}
		// Create a new statedb using the parent block and report an
		// error if it fails.
		var parent *types.Block
		if i == 0 {
			parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
		} else {
			parent = chain[i-1]
		}
		state, err := state.New(parent.Root(), bc.stateCache)
		if err != nil {
			return i, events, coalescedLogs, err
		}
		// Process block using the parent state as reference point.
		receipts, logs, usedGas, err := bc.processor.Process(block, state)
		if err != nil {
			bc.reportBlock(block, receipts, err)
			return i, events, coalescedLogs, err
		}
		// Validate the state using the default validator
		err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas)
		if err != nil {
			//for i, tx := range block.Transactions() {
			//添加清除cache代码,cache是针对合约的,因此账户以合约地址为主
			//state.ClearCachebyaddress(tx.data.Recipient)
			//}
			bc.reportBlock(block, receipts, err)
			return i, events, coalescedLogs, err
		}
		// Write the block to the chain and get the status.
		log.Info("----> Write Block and State From Outside", "number", block.Number(), "hash", block.Hash(), "difficulty", block.Difficulty())
		status, err := bc.WriteBlockAndState(block, receipts, state)
		if err != nil {
			return i, events, coalescedLogs, err
		}
		switch status {
		case CanonStatTy:
			log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
				"txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))

			log.Info("Inserted new block", "number", block.Number(), "hash", block.Hash())

			coalescedLogs = append(coalescedLogs, logs...)
			blockInsertTimer.UpdateSince(bstart)
			events = append(events, ChainEvent{block, block.Hash(), logs})
			lastCanon = block

		case SideStatTy:
			log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
				common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()))

			blockInsertTimer.UpdateSince(bstart)
			events = append(events, ChainSideEvent{block})

		}
		stats.processed++
		stats.usedGas += usedGas.Uint64()
		stats.report(chain, i)
	}

	// Append a single chain head event if we've progressed the chain
	if lastCanon != nil && bc.LastBlockHash() == lastCanon.Hash() {
		events = append(events, ChainHeadEvent{lastCanon})
	}
	return 0, events, coalescedLogs, nil
}

看以上代码发现,先进行了header的校验,然后进行了body的校验,再然后就state.New(parent.Root()),针对after处理的第一个块的parent,那就是pivot块了。这里用到了pivot的state,所以在处理pivot的时候需要同步下state。(state中存储的是链上所有账号相关信息)。

那为什么before块的处理不需要state呢?原因在于after块的处理还有一个process(block,state)。process的处理如下:

func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB) (types.Receipts, []*types.Log, *big.Int, error) {
	var (
		receipts     types.Receipts
		receipt      *types.Receipt
		errs         error
		totalUsedGas = big.NewInt(0)
		header       = block.Header()
		allLogs      []*types.Log
		gp           = new(GasPool).AddGas(block.GasLimit())
	)
	// Iterate over and process the individual transactions
	for i, tx := range block.Transactions() {
		statedb.Prepare(tx.Hash(), block.Hash(), i)
		msg, err := tx.AsMessage(types.MakeSigner(p.config))
		if err != nil {
			return nil, nil, nil, err
		}
		//the tx without contract
		if len(msg.Data()) != 0 {
			receipt, _, errs = ApplyTransaction(p.config, p.bc, nil, gp, statedb, header, tx, totalUsedGas)
			if errs != nil {
				types.Deletesynsinger(synsigner, tx)
				return nil, nil, nil, errs
			}
		} else {
			receipt, _, errs = ApplyTransactionNonContract(p.config, p.bc, nil, gp, statedb, header, tx, totalUsedGas)
			if errs != nil {
				types.Deletesynsinger(synsigner, tx)
				return nil, nil, nil, errs
			}
		}

		receipts = append(receipts, receipt)
		allLogs = append(allLogs, receipt.Logs...)
	}
	go func(txs []*types.Transaction) {
		//types.ASynSender(synsigner, nil)
		for _, tx := range txs {
			types.Deletesynsinger(synsigner, tx)
		}
	}(block.Transactions())
	// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
	if _, errfinalize := p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles(), receipts); nil != errfinalize {
		return nil, nil, nil, errfinalize
	}

	return receipts, allLogs, totalUsedGas, nil
}

从上面代码可以看出,process主要是执行交易,更新state。而在处理before块的时候是不处理交易的,只是将块数据写入数据库。而交易的执行一旦出错,则会被认为是坏块。

至此,快速同步完成。

从快速同步的过程分析中,可以看出快速同步与全同步的区别主要有两个:

1 是state。快速同步的before块是没有state的。而全量同步需要每一个块的state

2 是交易处理。快速同步不校验交易,全量同步同步需要校验每一个交易,而校验交易的基础就是state。

至此,快速同步的流程解析完毕。