Badger源码导读
源码分析入口基准案例
先从Badger的基本使用入手
func main() {
// 打开db
db, _ := badger.Open(badger.DefaultOptions("tmp/badger"))
defer db.Close()
// 读写事务
err := db.Update(func(txn *badger.Txn) error {
txn.Set([]byte("answer"), []byte("42"))
txn.Get([]byte("answer"))
return nil
})
// 只读事务
err = db.View(func(txn *badger.Txn) error {
txn.Get([]byte("answer_v1"))
return nil
})
// 遍历keys
err = db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 10
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
k := item.Key()
err := item.Value(func(val []byte) error {
fmt.Printf("key=%s, value=%s\n", k, val)
return nil
})
if err != nil {
return err
}
}
return nil
})
err = db.RunValueLogGC(0.7)
_ = err
}
DB初始化过程
初始化参数
badger.open()
传入的是一个option,先看一下option结构体的字段都有哪些
type Options struct {
// Required options.
// Dir: Badger是KV分离的存储引擎,Dir位置存储的是 Key 和指向Value的逻辑指针
// ValueDir: 存储的是Value日志,即值所在的地址,默认情况下Dir和ValueDir在同一个path目录下
Dir string
ValueDir string
// Usually modified options.
// SyncWrites: 同步写,即写入的时候主动同步到磁盘(mmap不会立即刷盘)
SyncWrites bool
NumVersionsToKeep int
// ReadOnly: 如其名,是否设置为只读
ReadOnly bool
// Logger: 如其名,log对象
Logger Logger
// Compression: 压缩归并的级别
Compression options.CompressionType
// InMemory: 是否只基于内存
InMemory bool
MetricsEnabled bool
// Sets the Stream.numGo field
NumGoroutines int
// Fine tuning options.
// MemTableSize: 内存表的尺寸限制
MemTableSize int64
BaseTableSize int64
BaseLevelSize int64
LevelSizeMultiplier int
TableSizeMultiplier int
// MaxLevels: 最大容忍的level级别,LSM-T的级数L0-L(max-1)
MaxLevels int
VLogPercentile float64
// ValueThreshold: 值大小的阈值,如果Value的大小不超过这个设定值,则不会将KV进行分离
// 此处是在工业实践中的一种权衡,KV分离会造成不可避免的读放大
// (两次的随机读,先在LSM-T中读取一次指针,再通过指针从ValueLog中读取一次值)
ValueThreshold int64
// NumMemtables: 内存表的数量
NumMemtables int
// Changing BlockSize across DB runs will not break badger. The block size is
// read from the block index stored at the end of the table.
// BlockSize: 每个block的大小(sst由block和index等组成)
BlockSize int
// BloomFalsePositive: 布隆过滤器假阳性的比例
BloomFalsePositive float64
// BlockCacheSize: 块缓存的大小
BlockCacheSize int64
// IndexCacheSize: 索引缓存的大小
IndexCacheSize int64
NumLevelZeroTables int
NumLevelZeroTablesStall int
// ValueLogFileSize: 存储值的Valuelog文件的最大大小
ValueLogFileSize int64
// ValueLogMaxEntries: 存储值的Valuelog文件的最大键值对数量
ValueLogMaxEntries uint32
// NumCompactors: 日志合并压缩协程同时运行的最大数量
NumCompactors int
CompactL0OnClose bool
LmaxCompaction bool
ZSTDCompressionLevel int
// When set, checksum will be validated for each entry read from the value log file.
// VerifyValueChecksum: 是否进行参数校验值的检查
VerifyValueChecksum bool
// Encryption related options.
// EncryptionKey: 加密字段
EncryptionKey []byte // encryption key
// EncryptionKeyRotationDuration: 加密字段有效时长
EncryptionKeyRotationDuration time.Duration // key rotation duration
// BypassLockGuard will bypass the lock guard on badger. Bypassing lock
// guard can cause data corruption if multiple badger instances are using
// the same directory. Use this options with caution.
BypassLockGuard bool
// ChecksumVerificationMode decides when db should verify checksums for SSTable blocks.
ChecksumVerificationMode options.ChecksumVerificationMode
// DetectConflicts determines whether the transactions would be checked for
// conflicts. The transactions can be processed at a higher rate when
// conflict detection is disabled.
// DetectConflicts: 事务的冲突检测
DetectConflicts bool
// NamespaceOffset specifies the offset from where the next 8 bytes contains the namespace.
NamespaceOffset int
// Transaction start and commit timestamps are managed by end-user.
// This is only useful for databases built on top of Badger (like Dgraph).
// Not recommended for most users.
managedTxns bool
// 4. Flags for testing purposes
// ------------------------------
// 有关批处理的参数
maxBatchCount int64 // max entries in batch
maxBatchSize int64 // max batch size in bytes
maxValueThreshold float64
}
传入指定的路径,并默认配置信息,如果有需要更改的信息可以使用 WithX()
方法(此处使用了建造者模式
)
badger.DefaultOptions("tmp/badger")
// DefaultOptions sets a list of recommended options for good performance.
// Feel free to modify these to suit your needs with the WithX methods.
func DefaultOptions(path string) Options {
return Options{
Dir: path,
ValueDir: path,
MemTableSize: 64 << 20,
BaseTableSize: 2 << 20,
BaseLevelSize: 10 << 20,
TableSizeMultiplier: 2,
LevelSizeMultiplier: 10,
MaxLevels: 7,
NumGoroutines: 8,
MetricsEnabled: true,
NumCompactors: 4, // Run at least 2 compactors. Zero-th compactor prioritizes L0.
NumLevelZeroTables: 5,
NumLevelZeroTablesStall: 15,
NumMemtables: 5,
BloomFalsePositive: 0.01,
BlockSize: 4 * 1024,
SyncWrites: false,
NumVersionsToKeep: 1,
CompactL0OnClose: false,
VerifyValueChecksum: false,
Compression: options.Snappy,
BlockCacheSize: 256 << 20,
IndexCacheSize: 0,
// The following benchmarks were done on a 4 KB block size (default block size). The
// compression is ratio supposed to increase with increasing compression level but since the
// input for compression algorithm is small (4 KB), we don't get significant benefit at
// level 3.
// NOTE: The benchmarks are with DataDog ZSTD that requires CGO. Hence, no longer valid.
// no_compression-16 10 502848865 ns/op 165.46 MB/s -
// zstd_compression/level_1-16 7 739037966 ns/op 112.58 MB/s 2.93
// zstd_compression/level_3-16 7 756950250 ns/op 109.91 MB/s 2.72
// zstd_compression/level_15-16 1 11135686219 ns/op 7.47 MB/s 4.38
// Benchmark code can be found in table/builder_test.go file
ZSTDCompressionLevel: 1,
// Nothing to read/write value log using standard File I/O
// MemoryMap to mmap() the value log files
// (2^30 - 1)*2 when mmapping < 2^31 - 1, max int32.
// -1 so 2*ValueLogFileSize won't overflow on 32-bit systems.
ValueLogFileSize: 1<<30 - 1,
ValueLogMaxEntries: 1000000,
VLogPercentile: 0.0,
ValueThreshold: maxValueThreshold,
Logger: defaultLogger(INFO),
EncryptionKey: []byte{},
EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days.
DetectConflicts: true,
NamespaceOffset: -1,
}
}
Open函数(核心)
badger.Open(opt)
函数
此方法代码过长,在此只保留核心部分代码,部分逻辑将以伪代码或注释表示,并省去部分错误处理逻辑
func Open(opt Options) (*DB, error) {
// 检查参数
checkAndSetOptions(&opt)
// 创建了三个目录锁,防止其他进程注册到同一个目录造成冲突
var dirLockGuard, valueDirLockGuard *directoryLockGuard
// Create directories and acquire lock on it only if badger is not running in InMemory mode.
// We don't have any directories/files in InMemory mode so we don't need to acquire
// any locks on them.
// 判断参数配置为只基于内存
if !opt.InMemory {
// 创建目录
createDirs(opt)
var err error
if !opt.BypassLockGuard {
// 给Dir加目录锁
dirLockGuard, _ = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
// 方法末尾释放锁
defer func() {
if dirLockGuard != nil {
_ = dirLockGuard.release()
}
}()
// 获取Key&ValuePtr的绝对路径
absDir, _ := filepath.Abs(opt.Dir)
// 获取ValueLog的绝对路径
absValueDir, _ := filepath.Abs(opt.ValueDir)
// 如果ValueDir和Dir不相同,需要各自加锁
if absValueDir != absDir {
// 给ValueDir加目录锁
valueDirLockGuard, _ = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)
// 释放锁
defer func() {
if valueDirLockGuard != nil {
_ = valueDirLockGuard.release()
}
}()
}
}
}
// 打开或创建Manifest文件,(采用mmap方式打开,在后面详细展开)
manifestFile, manifest, _ := openOrCreateManifestFile(opt)
// 关闭Manifest文件
defer func() {
if manifestFile != nil {
_ = manifestFile.close()
}
}()
// 创建内存中的db数据结构
db := &DB{
// memtable, 因为有多个,所以要创建数组
imm: make([]*memTable, 0, opt.NumMemtables),
// 刷新请求的channel
flushChan: make(chan flushTask, opt.NumMemtables),
// 写请求的channel
writeCh: make(chan *request, kvWriteChCapacity),
// 配置信息opt
opt: opt,
// 刚初始化好的manifest实例
manifest: manifestFile,
// Key&ValuePtr目录锁
dirLockGuard: dirLockGuard,
// Value目录锁
valueDirGuard: valueDirLockGuard,
// Oracle的实例,一个KV引擎并发事务的管理器,负责分配事务的版本号,用来实现MVCC功能,在读写事务时详细展开
orc: newOracle(opt),
pub: newPublisher(),
allocPool: z.NewAllocatorPool(8),
bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
threshold: initVlogThreshold(&opt),
}
// Cleanup all the goroutines started by badger in case of an error.
// 关闭badger的所有任务协程的钩子函数
defer func() {
if err != nil {
opt.Errorf("Received err: %v. Cleaning up...", err)
db.cleanup()
db = nil
}
}()
// 块缓存相关配置
// LSM-T结构中SST里面数据是以块(block)为单位分割的
// 当开启块缓存之后,LSM-T会把最近被访问到的高热的块缓存在内存中,以加块响应速度
if opt.BlockCacheSize > 0 {
// 缓存不在此次源码阅读的讨论范围之内,不影响核心功能,暂且略过
// 值得一提的是badger是使用的缓存是badger社区研发的一个高性能本地并发缓存的库,有兴趣的同学可以自行研究
numInCache := opt.BlockCacheSize / int64(opt.BlockSize)
if numInCache == 0 {
// Make the value of this variable at least one since the cache requires
// the number of counters to be greater than zero.
numInCache = 1
}
config := ristretto.Config{
NumCounters: numInCache * 8,
MaxCost: opt.BlockCacheSize,
BufferItems: 64,
Metrics: true,
OnExit: table.BlockEvictHandler,
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
return nil, y.Wrap(err, "failed to create data cache")
}
}
// 索引缓存相关配置
// 索引是每个Key所对应的偏离量的值,每一个SSTable有一个元数据块即索引块
// 可以方便对Key的二分查找,定位当前的key在哪一个sstable文件里,在文件中的偏移量是多少
if opt.IndexCacheSize > 0 {
// Index size is around 5% of the table size.
indexSz := int64(float64(opt.MemTableSize) * 0.05)
numInCache := opt.IndexCacheSize / indexSz
if numInCache == 0 {
// Make the value of this variable at least one since the cache requires
// the number of counters to be greater than zero.
numInCache = 1
}
config := ristretto.Config{
NumCounters: numInCache * 8,
MaxCost: opt.IndexCacheSize,
BufferItems: 64,
Metrics: true,
}
db.indexCache, err = ristretto.NewCache(&config)
if err != nil {
return nil, y.Wrap(err, "failed to create bf cache")
}
}
// 对缓存模块的监控检测
db.closers.cacheHealth = z.NewCloser(1)
go db.monitorCache(db.closers.cacheHealth)
// 如果仅基于内存
if db.opt.InMemory {
// 默认关闭写同步
db.opt.SyncWrites = false
// If badger is running in memory mode, push everything into the LSM Tree.
// 把所有数据只写在LSM-T中
db.opt.ValueThreshold = math.MaxInt32
}
// Key的注册,与并发事务相关,之后再详细展开
krOpt := KeyRegistryOptions{
ReadOnly: opt.ReadOnly,
Dir: opt.Dir,
EncryptionKey: opt.EncryptionKey,
EncryptionKeyRotationDuration: opt.EncryptionKeyRotationDuration,
InMemory: opt.InMemory,
}
db.registry, _ = OpenKeyRegistry(krOpt)
// 计算消耗的内存等数据统计信息
db.calculateSize()
db.closers.updateSize = z.NewCloser(1)
go db.updateSize(db.closers.updateSize)
// 打开一个memTable实例
// memtable是在内存中的一个复杂数据结构
if err := db.openMemTables(db.opt); err != nil {
return nil, y.Wrapf(err, "while opening memtables")
}
// 检查
if !db.opt.ReadOnly {
// 创建一个新的.mem文件
// .mem文件就是LSM-T中的预写日志文件(wal)
if db.mt, err = db.newMemTable(); err != nil {
return nil, y.Wrapf(err, "cannot create memtable")
}
}
// newLevelsController potentially loads files in directory.
// 创建内存中level管理器
// LSM-T是分层结构的, LevelsController实例负责维护整个层级结构
// 进行日志归并,压缩处理等操作,通过Manifest进行初始配置
// 或者是,manifest文件就是LevelController持久化之后的ondisk版本,可以加快badger的恢复重启速度
// 先打开SSTable,加载索引块,元数据块,缓存到内存当中
if db.lc, err = newLevelsController(db, &manifest); err != nil {
return db, err
}
// Initialize vlog struct.
// 初始化vlog
db.vlog.init(db)
if !opt.ReadOnly {
// 启动日志归并的工作协程,后续再展开
db.closers.compactors = z.NewCloser(1)
db.lc.startCompact(db.closers.compactors)
db.closers.memtable = z.NewCloser(1)
go func() {
_ = db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
}()
// Flush them to disk asap.
for _, mt := range db.imm {
db.flushChan <- flushTask{mt: mt}
}
}
// We do increment nextTxnTs below. So, no need to do it here.
// 拿到启动时最大事务的版本号(时间戳)
db.orc.nextTxnTs = db.MaxVersion()
db.opt.Infof("Set nextTxnTs to %d", db.orc.nextTxnTs)
// 真正打开vlog文件
if err = db.vlog.open(db); err != nil {
return db, y.Wrapf(err, "During db.vlog.open")
}
// Let's advance nextTxnTs to one more than whatever we observed via
// replaying the logs.
// 事务相关,等待之前事务的恢复
db.orc.txnMark.Done(db.orc.nextTxnTs)
// In normal mode, we must update readMark so older versions of keys can be removed during
// compaction when run in offline mode via the flatten tool.
db.orc.readMark.Done(db.orc.nextTxnTs)
// 事务号自增
db.orc.incrementNextTs()
// 监听配置信息的更改
go db.threshold.listenForValueThresholdUpdate()
// 从数据库中检索被禁止的命名空间并更新内存结构(非重点)
if err := db.initBannedNamespaces(); err != nil {
return db, errors.Wrapf(err, "While setting banned keys")
}
// 启动处理磁盘写请求的协程
// badger的写任务是并发写任务,可以充分发挥ssd的性能
db.closers.writes = z.NewCloser(1)
go db.doWrites(db.closers.writes)
if !db.opt.InMemory {
// 真正开启vlog的GC, 后面再详细讲解
db.closers.valueGC = z.NewCloser(1)
go db.vlog.waitOnGC(db.closers.valueGC)
}
// 监听协程(非重点)
db.closers.pub = z.NewCloser(1)
go db.pub.listenForUpdates(db.closers.pub)
// 释放锁
valueDirLockGuard = nil
dirLockGuard = nil
manifestFile = nil
// 返回db
return db, nil
}
创建Manifest文件
openOrCreateManifestFile(opt)
函数
func openOrCreateManifestFile(opt Options) (ret *manifestFile, result Manifest, err error) {
// 如果Inmemory则返回空的Manifest
if opt.InMemory {
return &manifestFile{inMemory: true}, Manifest{}, nil
}
return helpOpenOrCreateManifestFile(opt.Dir, opt.ReadOnly, manifestDeletionsRewriteThreshold)
}
func helpOpenOrCreateManifestFile(dir string, readOnly bool, deletionsThreshold int) (*manifestFile, Manifest, error) {
// 拼接path
path := filepath.Join(dir, ManifestFilename)
var flags y.Flags
if readOnly {
flags |= y.ReadOnly
}
// 尝试打开文件
fp, err := y.OpenExistingFile(path, flags) // We explicitly sync in addChanges, outside the lock.
if err != nil {
// 校验文件是否存在
if !os.IsNotExist(err) {
return nil, Manifest{}, err
}
// 如果仅读则无法创建直接返回
if readOnly {
return nil, Manifest{}, fmt.Errorf("no manifest found, required for read-only db")
}
// 真正创建manifest实例
m := createManifest()
// 覆盖写,执行完此条语句后就可以在目录中看到MANIFEST文件存在了(此时MANIFEST文件中仅有魔数bdg)
fp, netCreations, _ := helpRewrite(dir, &m)
// 断言,确保创建成功
y.AssertTrue(netCreations == 0)
// 创建manifestFile实例在内存中保存信息
mf := &manifestFile{
fp: fp,
directory: dir,
manifest: m.clone(),
deletionsRewriteThreshold: deletionsThreshold,
}
return mf, m, nil
}
// 文件存在加载恢复的逻辑暂不展开
......
}
func createManifest() Manifest {
levels := make([]levelManifest, 0)
return Manifest{
Levels: levels,
Tables: make(map[uint64]TableManifest),
}
// Tables: map[uint64]TableManifest
// uint64: 行号,第n个level
}
打开Memtable
memtable
结构体
// memTable structure stores a skiplist and a corresponding WAL. Writes to memTable are written
// both to the WAL and the skiplist. On a crash, the WAL is replayed to bring the skiplist back to
// its pre-crash form.
type memTable struct {
sl *skl.Skiplist
wal *logFile
maxVersion uint64
opt Options
buf *bytes.Buffer
}
openMemTables(opt)
方法
func (db *DB) openMemTables(opt Options) error {
// We don't need to open any tables in in-memory mode.
// 如果是只基于内存则直接返回(那我走?)
if db.opt.InMemory {
return nil
}
// 读取目录中的全部文件
files, _ := ioutil.ReadDir(db.opt.Dir)
var fids []int
// 遍历目录中的文件
for _, file := range files {
// 检查当前文件名是否包含一个.mem的后缀(在第一次初始化过程中肯定不会存在)
// 此时目录中应有的文件为 LOCK MANIFEST KEYREGISTRY
if !strings.HasSuffix(file.Name(), memFileExt) {
continue
}
// 如果有.mem文件,则取文件的命名转为int值作为fid
// 例: 000001.mem 000002.mem
fsz := len(file.Name())
fid, _ := strconv.ParseInt(file.Name()[:fsz-len(memFileExt)], 10, 64)
fids = append(fids, int(fid))
}
// Sort in ascending order.
// 按照fid排序
sort.Slice(fids, func(i, j int) bool {
return fids[i] < fids[j]
})
// 按照fid顺序遍历
for _, fid := range fids {
flags := os.O_RDWR
if db.opt.ReadOnly {
flags = os.O_RDONLY
}
// 真正的打开.mem文件,采用mmap方式加载.mem文件中的数据
mt, err := db.openMemTable(fid, flags)
if err != nil {
return y.Wrapf(err, "while opening fid: %d", fid)
}
// If this memtable is empty we don't need to add it. This is a
// memtable that was completely truncated.
if mt.sl.Empty() {
mt.DecrRef()
continue
}
// These should no longer be written to. So, make them part of the imm.
db.imm = append(db.imm, mt)
}
// 设置最新的fid序列号
if len(fids) != 0 {
db.nextMemFid = fids[len(fids)-1]
}
db.nextMemFid++
return nil
}
创建Memtable
newMemTable()
方法
func (db *DB) newMemTable() (*memTable, error) {
// 真正创建.mem文件
mt, err := db.openMemTable(db.nextMemFid, os.O_CREATE|os.O_RDWR)
if err == z.NewFile {
db.nextMemFid++
return mt, nil
}
if err != nil {
db.opt.Errorf("Got error: %v for id: %d\n", err, db.nextMemFid)
return nil, y.Wrapf(err, "newMemTable")
}
return nil, errors.Errorf("File %s already exists", mt.wal.Fd.Name())
}
openMemTable(fid, flags)
方法
func (db *DB) openMemTable(fid, flags int) (*memTable, error) {
// 拼接路径
filepath := db.mtFilePath(fid)
// 创建memtable中的skiplist
s := skl.NewSkiplist(arenaSize(db.opt))
// 创建memtable实例
mt := &memTable{
sl: s,
opt: db.opt,
buf: &bytes.Buffer{},
}
// We don't need to create the wal for the skiplist in in-memory mode so return the mt.
// 如果只基于内存,则不需要创建wal文件,直接返回
if db.opt.InMemory {
return mt, z.NewFile
}
// 创建wal文件实例
mt.wal = &logFile{
fid: uint32(fid),
path: filepath,
registry: db.registry,
writeAt: vlogHeaderSize,
opt: db.opt,
}
// 调用系统函数创建wal文件
lerr := mt.wal.open(filepath, flags, 2*db.opt.MemTableSize)
// 如果未成功创建新文件或其他失败则返回err
if lerr != z.NewFile && lerr != nil {
return nil, y.Wrapf(lerr, "While opening memtable: %s", filepath)
}
// Have a callback set to delete WAL when skiplist reference count goes down to zero. That is,
// when it gets flushed to L0.
// 用来关闭的回调函数
s.OnClose = func() {
if err := mt.wal.Delete(); err != nil {
db.opt.Errorf("while deleting file: %s, err: %v", filepath, err)
}
}
// 成功创建mmap则返回 lerr (z.NewFile)
if lerr == z.NewFile {
return mt, lerr
}
// 当且仅当MemTableSize设置为0时造成 lerr == nil的适合执行到此
// 此时mmap未进行截断,在UpdateSkipList()中遍历wal文件并重新截断,如果wal文件不存在会返回错误
err := mt.UpdateSkipList()
return mt, y.Wrapf(err, "while updating skiplist")
}
创建levelController
newLevelsController(db, mf)
函数
func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
// 断言,进行一些必要的校验
y.AssertTrue(db.opt.NumLevelZeroTablesStall > db.opt.NumLevelZeroTables)
// 关联db实例,创建level数组对应层级关系(例:levels[0] => L0层)
// levelHandler就是真正负责某一层sst管理器的主要操作
s := &levelsController{
kv: db,
levels: make([]*levelHandler, db.opt.MaxLevels),
}
// 状态统计的一个的对象(set结构),key为fid,用以判断对应的fid是否存在于这一层
s.cstatus.tables = make(map[uint64]struct{})
// 合并状态的信息
s.cstatus.levels = make([]*levelCompactStatus, db.opt.MaxLevels)
// 按层遍历,每一层都创建一个levelhandler实例
for i := 0; i < db.opt.MaxLevels; i++ {
s.levels[i] = newLevelHandler(db, i)
s.cstatus.levels[i] = new(levelCompactStatus)
}
// 基于内存,那我走?🤡
if db.opt.InMemory {
return s, nil
}
// Compare manifest against directory, check for existent/non-existent files, and remove.
// 对manifest文件进行校验
if err := revertToManifest(db, mf, getIDMap(db.opt.Dir)); err != nil {
return nil, err
}
var mu sync.Mutex
tables := make([][]*table.Table, db.opt.MaxLevels)
var maxFileID uint64
// We found that using 3 goroutines allows disk throughput to be utilized to its max.
// Disk utilization is the main thing we should focus on, while trying to read the data. That's
// the one factor that remains constant between HDD and SSD.
// 一种针对并发控制的负载均衡策略,对于ssd来说,创建3个协程能够最大的发挥ssd的优点
throttle := y.NewThrottle(3)
start := time.Now()
var numOpened int32
// 创建一个定时触发器进行超时控制
tick := time.NewTicker(3 * time.Second)
// 钩子函数关闭定时器
defer tick.Stop()
// manifest清单文件的Tables
// 拿到每个table对应的fid
// 第一次初始化的适合因为Tables为空,会直接跳过
for fileID, tf := range mf.Tables {
fname := table.NewFilename(fileID, db.opt.Dir)
select {
case <-tick.C:
db.opt.Infof("%d tables out of %d opened in %s\n", atomic.LoadInt32(&numOpened),
len(mf.Tables), time.Since(start).Round(time.Millisecond))
default:
}
if err := throttle.Do(); err != nil {
closeAllTables(tables)
return nil, err
}
if fileID > maxFileID {
maxFileID = fileID
}
go func(fname string, tf TableManifest) {
var rerr error
defer func() {
throttle.Done(rerr)
atomic.AddInt32(&numOpened, 1)
}()
dk, err := db.registry.DataKey(tf.KeyID)
if err != nil {
rerr = y.Wrapf(err, "Error while reading datakey")
return
}
topt := buildTableOptions(db)
// Explicitly set Compression and DataKey based on how the table was generated.
topt.Compression = tf.Compression
topt.DataKey = dk
mf, err := z.OpenMmapFile(fname, db.opt.getFileFlags(), 0)
if err != nil {
rerr = y.Wrapf(err, "Opening file: %q", fname)
return
}
t, err := table.OpenTable(mf, topt)
if err != nil {
if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
db.opt.Errorf(err.Error())
db.opt.Errorf("Ignoring table %s", mf.Fd.Name())
// Do not set rerr. We will continue without this table.
} else {
rerr = y.Wrapf(err, "Opening table: %q", fname)
}
return
}
mu.Lock()
tables[tf.Level] = append(tables[tf.Level], t)
mu.Unlock()
}(fname, tf)
}
// 关闭相关的任务协程
if err := throttle.Finish(); err != nil {
closeAllTables(tables)
return nil, err
}
db.opt.Infof("All %d tables opened in %s\n", atomic.LoadInt32(&numOpened),
time.Since(start).Round(time.Millisecond))
// 记录当前fid最大值
s.nextFileID = maxFileID + 1
// 初始化每个level的tables
for i, tbls := range tables {
s.levels[i].initTables(tbls)
}
// Make sure key ranges do not overlap etc.
// 必要的数据校验
if err := s.validate(); err != nil {
_ = s.cleanupLevels()
return nil, y.Wrap(err, "Level validation")
}
// Sync directory (because we have at least removed some files, or previously created the
// manifest file).
// 手动进行同步刷盘
if err := syncDir(db.opt.Dir); err != nil {
_ = s.close()
return nil, err
}
return s, nil
}
创建levelHandler
newLevelHandler(db, level)
函数
func newLevelHandler(db *DB, level int) *levelHandler {
return &levelHandler{
level: level,
strLevel: fmt.Sprintf("l%d", level),
db: db,
}
}
初始化tables
initTables(tables)
方法
// initTables replaces s.tables with given tables. This is done during loading.
func (s *levelHandler) initTables(tables []*table.Table) {
// 加锁
s.Lock()
defer s.Unlock()
// 赋值与相关值的初始化
s.tables = tables
s.totalSize = 0
s.totalStaleSize = 0
for _, t := range tables {
s.addSize(t)
}
// 如果是L0层,需要拿每个fid排序
if s.level == 0 {
// Key range will overlap. Just sort by fileID in ascending order
// because newer tables are at the end of level 0.
sort.Slice(s.tables, func(i, j int) bool {
return s.tables[i].ID() < s.tables[j].ID()
})
} else {
// L0层往上,拿每个table文件的MinKey排序
// Sort tables by keys.
sort.Slice(s.tables, func(i, j int) bool {
return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
})
}
}
初始化vlog
init(db)
方法
// init initializes the value log struct. This initialization needs to happen
// before compactions start.
func (vlog *valueLog) init(db *DB) {
// 加载配置
vlog.opt = db.opt
vlog.db = db
// We don't need to open any vlog files or collect stats for GC if DB is opened
// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
// inmem,那我走?🤡
if vlog.opt.InMemory {
return
}
// 指定的vlog目录
vlog.dirPath = vlog.opt.ValueDir
// GC模块用到的channel
vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
// 创建一个GC模块相关文件
lf, err := InitDiscardStats(vlog.opt)
y.Check(err)
vlog.discardStats = lf
}
打开vlog
open(db)
方法
func (vlog *valueLog) open(db *DB) error {
// We don't need to open any vlog files or collect stats for GC if DB is opened
// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
// 不想再做解释了,inmem,那我走!!!
if db.opt.InMemory {
return nil
}
// 填充文件fid到filesMap
if err := vlog.populateFilesMap(); err != nil {
return err
}
// If no files are found, then create a new file.
// 如果没有.vlog文件
if len(vlog.filesMap) == 0 {
if vlog.opt.ReadOnly {
return nil
}
// 创建一个.vlog文件
_, err := vlog.createVlogFile()
return y.Wrapf(err, "Error while creating log file in valueLog.open")
}
fids := vlog.sortedFids()
for _, fid := range fids {
lf, ok := vlog.filesMap[fid]
y.AssertTrue(ok)
// Just open in RDWR mode. This should not create a new log file.
lf.opt = vlog.opt
if err := lf.open(vlog.fpath(fid), os.O_RDWR,
2*vlog.opt.ValueLogFileSize); err != nil {
return y.Wrapf(err, "Open existing file: %q", lf.path)
}
// We shouldn't delete the maxFid file.
if lf.size == vlogHeaderSize && fid != vlog.maxFid {
vlog.opt.Infof("Deleting empty file: %s", lf.path)
if err := lf.Delete(); err != nil {
return y.Wrapf(err, "while trying to delete empty file: %s", lf.path)
}
delete(vlog.filesMap, fid)
}
}
if vlog.opt.ReadOnly {
return nil
}
// Now we can read the latest value log file, and see if it needs truncation. We could
// technically do this over all the value log files, but that would mean slowing down the value
// log open.
last, ok := vlog.filesMap[vlog.maxFid]
y.AssertTrue(ok)
lastOff, err := last.iterate(vlog.opt.ReadOnly, vlogHeaderSize,
func(_ Entry, vp valuePointer) error {
return nil
})
if err != nil {
return y.Wrapf(err, "while iterating over: %s", last.path)
}
if err := last.Truncate(int64(lastOff)); err != nil {
return y.Wrapf(err, "while truncating last value log file: %s", last.path)
}
// Don't write to the old log file. Always create a new one.
if _, err := vlog.createVlogFile(); err != nil {
return y.Wrapf(err, "Error while creating log file in valueLog.open")
}
return nil
}
populateFilesMap()
方法
func (vlog *valueLog) populateFilesMap() error {
vlog.filesMap = make(map[uint32]*logFile)
// 从目录中拿到每个文件的句柄
files, _ := ioutil.ReadDir(vlog.dirPath)
found := make(map[uint64]struct{})
for _, file := range files {
// 判断是否以.vlog作为后缀
if !strings.HasSuffix(file.Name(), ".vlog") {
continue
}
// 对.vlog文件进行校验,去除fid,进行消重判断
fsz := len(file.Name())
fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
if err != nil {
return errFile(err, file.Name(), "Unable to parse log id.")
}
if _, ok := found[fid]; ok {
return errFile(err, file.Name(), "Duplicate file found. Please delete one.")
}
found[fid] = struct{}{}
lf := &logFile{
fid: uint32(fid),
path: vlog.fpath(uint32(fid)),
registry: vlog.db.registry,
}
// 最后保存到vlog的filesMap当中
vlog.filesMap[uint32(fid)] = lf
if vlog.maxFid < uint32(fid) {
vlog.maxFid = uint32(fid)
}
}
// 直到每个.vlog文件的fid都添加到了map中
// 第一次初始化时没有.vlog文件,故直接跳过
return nil
}
创建vlog文件
createVlogFile()
方法
func (vlog *valueLog) createVlogFile() (*logFile, error) {
// 最大的fid
fid := vlog.maxFid + 1
// 根据fid命名
path := vlog.fpath(fid)
// 创建一个句柄实例
lf := &logFile{
fid: fid,
path: path,
registry: vlog.db.registry,
writeAt: vlogHeaderSize,
opt: vlog.opt,
}
// 进行系统调用打开文件,通过mmap的方式
// .vlog文件初始化时会创建一个2G的文件
err := lf.open(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 2*vlog.opt.ValueLogFileSize)
if err != z.NewFile && err != nil {
return nil, err
}
// 进行数据初始化更新的操作
vlog.filesLock.Lock()
vlog.filesMap[fid] = lf
y.AssertTrue(vlog.maxFid < fid)
vlog.maxFid = fid
// writableLogOffset is only written by write func, by read by Read func.
// To avoid a race condition, all reads and updates to this variable must be
// done via atomics.
atomic.StoreUint32(&vlog.writableLogOffset, vlogHeaderSize)
vlog.numEntriesWritten = 0
vlog.filesLock.Unlock()
return lf, nil
}