【原创】HPB之go leveldb数据库的tablecompaction操作源码


#1

前文介绍了memcompaction,主要是将内存数据写入磁盘。本文介绍tablecompaction,了解tablecompaction的过程,哪些表需要进行compaction,同时看看tablecompactiton有什么好处。直接上代码tableCompaction(leveldb/db_compaction.go),参数c,包含了要合并的表信息

func (db *DB) tableCompaction(c *compaction, noTrivial bool) {

​    defer c.release()

​    rec := &sessionRecord{}

​    rec.addCompPtr(c.sourceLevel, c.imax)

​    var stats [2]cStatStaging

​    for i, tables := range c.levels {

​        for _, t := range tables {

​            stats[i].read += t.size

​            // Insert deleted tables into record

​            rec.delTable(c.sourceLevel+i, t.fd.Num)

​        }

​    }

​    sourceSize := int(stats[0].read + stats[1].read)

​    minSeq := db.minSeq()

​    db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)

​    b := &tableCompactionBuilder{

​        db:        db,

​        s:         db.s,

​        c:         c,

​        rec:       rec,

​        stat1:     &stats[1],

​        minSeq:    minSeq,

​        strict:    db.s.o.GetStrict(opt.StrictCompaction),

​        tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),

​    }

​    db.compactionTransact("table@build", b)

​    // Commit.

​    stats[1].startTimer()

​    db.compactionCommit("table", rec)

​    stats[1].stopTimer()

​    resultSize := int(stats[1].write)

​    db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)

​    // Save compaction stats

​    for i := range stats {

​        db.compStats.addStat(c.sourceLevel+1, &stats[i])

​    }

}

简单点讲,tablecompaction的核心只有2步,build && commit。 其中build的过程db.compactionTransact(“table@build”, b)是将需要合并的表读出来,排序,写到新表,即read,sort,write 3个步骤。compactionTransact的核心在于run(),其他的都是变量定义和异常处理。这里重点关注下run的处理

func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {

​    snapResumed := b.snapIter > 0

​    hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.

​    lastUkey := append([]byte{}, b.snapLastUkey...)

​    lastSeq := b.snapLastSeq

​    b.kerrCnt = b.snapKerrCnt

​    b.dropCnt = b.snapDropCnt

​    // Restore compaction state.

​    b.c.restore()

​    defer b.cleanup()

​    b.stat1.startTimer()

​    defer b.stat1.stopTimer()

​    iter := b.c.newIterator()

​    defer iter.Release()

​    for i := 0; iter.Next(); i++ {

​        // Incr transact counter.

​        cnt.incr()

​        // Skip until last state.

​        if i < b.snapIter {

​            continue

​        }

​        resumed := false

​        if snapResumed {

​            resumed = true

​            snapResumed = false

​        }

​        ikey := iter.Key()

​        ukey, seq, kt, kerr := parseInternalKey(ikey)

​        if kerr == nil {

​            shouldStop := !resumed && b.c.shouldStopBefore(ikey)

​            if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {

​                // First occurrence of this user key.

​                // Only rotate tables if ukey doesn't hop across.

​                if b.tw != nil && (shouldStop || b.needFlush()) {

​                    if err := b.flush(); err != nil {

​                        return err

​                    }

​                    // Creates snapshot of the state.

​                    b.c.save()

​                    b.snapHasLastUkey = hasLastUkey

​                    b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)

​                    b.snapLastSeq = lastSeq

​                    b.snapIter = i

​                    b.snapKerrCnt = b.kerrCnt

​                    b.snapDropCnt = b.dropCnt

​                }

​                hasLastUkey = true

​                lastUkey = append(lastUkey[:0], ukey...)

​                lastSeq = keyMaxSeq

​            }

​            switch {

​            case lastSeq <= b.minSeq:

​                // Dropped because newer entry for same user key exist

​                fallthrough // (A)

​            case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):

​                lastSeq = seq

​                b.dropCnt++

​                continue

​            default:

​                lastSeq = seq

​            }

​        } else {

​            if b.strict {

​                return kerr

​            }

​            // Don't drop corrupted keys.

​            hasLastUkey = false

​            lastUkey = lastUkey[:0]

​            lastSeq = keyMaxSeq

​            b.kerrCnt++

​        }

​        if err := b.appendKV(ikey, iter.Value()); err != nil {

​            return err

​        }

​    }

​    if err := iter.Error(); err != nil {

​        return err

​    }

​    // Finish last table.

​    if b.tw != nil && !b.tw.empty() {

​        return b.flush()

​    }

​    return nil

}

对run的过程进行拆分,之前说到build的过程是read,sort,write的过程。那从这3步说明下整个过程。

read:读取需要合并的文件,iter := b.c.newIterator(),newiterator()的就是将文件一一打开并缓存同时返回对应的iter。具体代码如下,打开文件的操作封装在newIterator,newIndexIterator之中,这里不再追溯细节了。

func (c *compaction) newIterator() iterator.Iterator {

​	 ..........

​    for i, tables := range c.levels {

​        if len(tables) == 0 {

​            continue

​        }

​        // Level-0 is not sorted and may overlaps each other.

​        if c.sourceLevel+i == 0 {

​            for _, t := range tables {

​                its = append(its, c.s.tops.newIterator(t, nil, ro))

​            }

​        } else {

​            it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)

​            its = append(its, it)

​        }

​    }

​    return iterator.NewMergedIterator(its, c.s.icmp, strict)

}

sort:将打开的文件按照key的大小读出来,for i := 0; iter.Next(); i++ 。其中iter.Next()每次取的就是当前所有打开文件的最小的key。这里看看是如何取出最小的key的,代码如下:

func (i *mergedIterator) next() bool {

​    var key []byte

​    if i.dir == dirForward {

​        key = i.keys[i.index]

​    }

​    for x, tkey := range i.keys {

​        if tkey != nil && (key == nil || i.cmp.Compare(tkey, key) < 0) {

​            key = tkey

​            i.index = x

​        }

​    }

​    if key == nil {

​        i.dir = dirEOI

​        return false

​    }

​    i.dir = dirForward

​    return true

}

重点在for循环里,每次都获取每个文件的最小key,与当前的index位置的key进行比较,如果比当前key小,就更新index,将index指向最小key。这个处理类似于归并排序的处理。通过index的移动,保证每次ikey := iter.Key()的时候取到的都是几个文件中当前没有取过的最小key.

write:当通过b.appendKV(ikey, iter.Value())将key,value写入新的文件对象时,如果文件写满了needflush(), 就需要将文件b.flush()到磁盘了,过程类似于memcompaction的flush,同时要记录log用于异常时恢复。

以上就是build的过程,下面看下commit都做了啥

func (s *session) commit(r *sessionRecord) (err error) {

​    v := s.version()

​    defer v.release()

​    // spawn new version based on current version

​    nv := v.spawn(r)

​    if s.manifest == nil {

​        // manifest journal writer not yet created, create one

​        err = s.newManifest(r, nv)

​    } else {

​        err = s.flushManifest(r)

​    }

​    // finally, apply new version if no error rise

​    if err == nil {

​        s.setVersion(nv)

​    }

​    return

}

看以上代码可知主要是通过spawn()生成新的版本信息,同时flushmanifest将新版本信息写入MANIFEST文件,最后有个setversion设置当前版本为最新生成的版本。

生成新版本的过程就不细讲了,主要是获取各个level层的tables组织在一起。

flushmanifest是将版本信息写入到文件中,也不细讲了。

通过以上分析可以知道,tablecompaction的过程,是将多个文件合并到一起的过程。那什么情况下应该执行tablecompaction,这么做有什么好处呢?

既然tablecompaction是将多个文件合并到一起,而每个文件本身存储的key都是有序的,因此如果只是将两个无关的文件合并也没什么意义。因此compaction的目的主要是将有key范围重叠的文件进行合并。另外由于leveldb是分层的,那每层的大小自然是有限,因此需要在某个level的文件写满后需要合并到下一层去,所以合并的过程是将level N层的文件和levelN+1层的文件合并为levelN+1的文件。

通过合并,减少了key范围重叠的概率,那么在进行查找的时候,就可以减少查询文件的次数,提高查询效率。同时,tablecompaction在合并的过程中,还起到了压缩的作用,减少了磁盘的占用。