【原创】HPB之go leveldb数据库的memcompaction操作源码


#1

之前介绍了leveldb数据库的put和get操作。put操作将数据写入内存并记录log就算完事,但内存并不是无限的,还是需要将内存中的数据写入到磁盘中的,完成这一步的操作叫做memcompaction。而get操作的过程需要根据key的范围进行搜索定位到某一个文件中,如果仅是将内存数据写入到磁盘文件中,虽然key在单个文件内是有序,随着数据量的增加,多个文件的key的范围重叠的几率就会越来越大,为了提高查询的效率,需要将有重叠key的多个文件进行合并,这个过程是通过tablecompaction完成的。下面就分别看下两个compaction是怎么做的。

先看看memcompation (leveldb/db_compaction.go),以下代码只关注主体部分。

func (db *DB) memCompaction() {

​    mdb := db.getFrozenMem()

​    if mdb.Len() == 0 {

​        db.logf("memdb@flush skipping")

​        db.dropFrozenMem()

​        return

​    }

​    // Pause table compaction.

​    resumeC := make(chan struct{})

​    select {

​    case db.tcompPauseC <- (chan<- struct{})(resumeC):

​    case <-db.compPerErrC:

​        close(resumeC)

​        resumeC = nil

​    case <-db.closeC:

​        db.compactionExitTransact()

​    }

​    var (

​        rec        = &sessionRecord{}

​        stats      = &cStatStaging{}

​        flushLevel int

​    )

​    // Generate tables.

​    db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {

​        stats.startTimer()

​        flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)

​        stats.stopTimer()

​        return

​    }, func() error {

​       ........

​    })

​    rec.setJournalNum(db.journalFd.Num)

​    rec.setSeqNum(db.frozenSeq)

​    db.compactionCommit("memdb", rec)

​    db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)

​    for _, r := range rec.addedTables {

​        stats.write += r.size

​    }

​    db.compStats.addStat(flushLevel, stats)

​    db.dropFrozenMem()

​    // Trigger table compaction.

​    db.compTrigger(db.tcompCmdC)

}

从上述函数中可以看出,memcompaction的主要部分分为以下几步:

1、获取到frozenmemdb并判断frozenmemdb是否为空,frozenmemdb是从put写入时的memdb写满后转化来的

2、中断tablecompaction,由此可知tablecompaction与memcompaction不会同时进行。

3、通过flushMemdb将数据刷新到磁盘

4、将fulshmemdb的结果进行提交,并记录log,提交的过程主要是为了将新生成的表信息写入到MANIFEST文件中,同时生成新的version。

5、恢复tablecompaction

这里重点看下flushmemdb具体做了哪些事情。

func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (int, error) {

​    iter := mdb.NewIterator(nil)

​    defer iter.Release()

​    t, n, err := s.tops.createFrom(iter)

​    if err != nil {

​        return 0, err

​    }

​    flushLevel := s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)

​    rec.addTableFile(flushLevel, t)

​    s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)

​    return flushLevel, nil

}

flushmemdb通过createfrom函数将数据写入磁盘,记录日志并返回当前文件所在的level。在memcompaction操作中,level为0。而createfrom函数的主要功能是创建新的文件,将frozenmemdb中的数据取出,然后刷新到磁盘。

func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {

​    w, err := t.create()

​    ........

​    for src.Next() {

​        err = w.append(src.Key(), src.Value())

​        if err != nil {

​            return

​        }

​    }
​    ........
​    n = w.tw.EntriesLen()

​    f, err = w.finish()

​    return

}

memcompaction的主要目的就是将内存数据写入到磁盘。磁盘上的文件合并即tablecompaction下回分解。