【原创】HPB之go leveldb数据库的缓存介绍


#1

前文在介绍get操作的时候说过,get操作查询数据库,是需要先根据key定位到是哪个文件,然后去该文件中查询。那问题来了,如果每次都打开文件,查询磁盘,必然会导致有大量的磁盘IO,那查询的效率会大大降低。为了提升查询的效率,leveldb增加了缓存,用于缓存最近打开的文件数据。当定位到具体的文件时,先去缓存查看有没有该文件的缓存,有的话,直接在缓存中查找,没有的话才查询磁盘。下面看看代码实现。

func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) {

​    ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) {

​        var r storage.Reader

​        //log.Error("cacheGet", "fromfile", f.fd)

​        r, err = t.s.stor.Open(f.fd)

​        if err != nil {

​            return 0, nil

​        }

​        var bcache *cache.NamespaceGetter

​        if t.bcache != nil {

​            bcache = &cache.NamespaceGetter{Cache: t.bcache, NS: uint64(f.fd.Num)}

​        }

​        var tr *table.Reader

​        tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options)

​        if err != nil {

​            r.Close()

​            return 0, nil

​        }

​        return 1, tr

​    })

​    return

}

cache.Get传入了3个参数,第一个是0,第二个是文件名,第三个是一个打开并读取文件的函数。看下Get内部的实现。代码如下:

func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle {

​    r.mu.RLock()

​    defer r.mu.RUnlock()

​    hash := murmur32(ns, key, 0xf00)

​    for {

​        h, b := r.getBucket(hash)

​        done, _, n := b.get(r, h, hash, ns, key, setFunc == nil)

​        if done {

​            if n != nil {

​                n.mu.Lock()

​                if n.value == nil {

​                    if setFunc == nil {

​                        n.mu.Unlock()

​                        n.unref()

​                        return nil

​                    }

​                    n.size, n.value = setFunc()

​                    if n.value == nil {

​                        n.size = 0

​                        n.mu.Unlock()

​                        n.unref()

​                        return nil

​                    }

​                    atomic.AddInt32(&r.size, int32(n.size))

​                } 
​                n.mu.Unlock()

​                if r.cacher != nil {

​                    r.cacher.Promote(n)

​                }

​                return &Handle{unsafe.Pointer(n)}

​            }

​            break

​        }

​    }

​    return nil

}

1、先根据key生成hash,然后根据hash在buckets中找到对应的bucket,

2、从bucket中找到对应的node,如果没有现成的node,说明不存在文件对应的缓存,即n.value==nil

3、调用setfun打开文件,读出数据

4、通过cacher.Promote(n)将缓存放入buckets.

5、返回handle指针。

看看promote(n)中都做了什么,就可以理解getbucket了。代码如下

func (r *lru) Promote(n *Node) {

​    var evicted []*lruNode

​    r.mu.Lock()

​    if n.CacheData == nil {

​        if n.Size() <= r.capacity {

​            rn := &lruNode{n: n, h: n.GetHandle()}

​            rn.insert(&r.recent)

​            n.CacheData = unsafe.Pointer(rn)

​            r.used += n.Size()

​            for r.used > r.capacity {

​                rn := r.recent.prev

​                if rn == nil {

​                    panic("BUG: invalid LRU used or capacity counter")

​                }

​                rn.remove()

​                rn.n.CacheData = nil

​                r.used -= rn.n.Size()

​                evicted = append(evicted, rn)

​            }

​        }

​    } else {

​        rn := (*lruNode)(n.CacheData)

​        if !rn.ban {

​            rn.remove()

​            rn.insert(&r.recent)

​        }

​    }

​    r.mu.Unlock()

​    for _, rn := range evicted {

​        rn.h.Release()

​    }

}

1、如果是从磁盘中读出来的数据,n.cachedata在之前并未赋值,因此是nil。n.Size()通过前文的setFun函数可以知道是1,而r.capacity的定义是在cmd/utils/flags.go中,默认1024.因此新数据直接插入到lru的队尾,而队头的数据也是最老的缓存则删掉。

2、如果是从缓存读出来的数据,则通过rn.insert将数据从队中提出来放到队尾,保证队尾放的数据都是最新读取的缓存。

以上可知:

​ 1、缓存是有限的,默认只能存储1024个文件的缓存,可以通过增加该值来增加缓存容量,提高效率。但相应的内存占用也会随之增加。

​ 2、如果把缓存当做队列,队尾的数据一直是最新被读到的数据,而老数据缓存如果一直不用的话,会随着新缓存的加入而被删除。

结束。