From 1e738b0f39b389db4d0d16543ca3d6987d98ba34 Mon Sep 17 00:00:00 2001 From: windqiu Date: Thu, 30 Mar 2023 10:45:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E5=A4=9A=E4=BD=99=E7=9A=84?= =?UTF-8?q?=E8=A1=8C=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...\347\240\201\345\210\206\346\236\22020.md" | 306 +++++++++--------- 1 file changed, 153 insertions(+), 153 deletions(-) diff --git "a/articles/leveldb\346\272\220\347\240\201\345\210\206\346\236\220/leveldb\346\272\220\347\240\201\345\210\206\346\236\22020.md" "b/articles/leveldb\346\272\220\347\240\201\345\210\206\346\236\220/leveldb\346\272\220\347\240\201\345\210\206\346\236\22020.md" index 2ac9912..e7e11d7 100644 --- "a/articles/leveldb\346\272\220\347\240\201\345\210\206\346\236\220/leveldb\346\272\220\347\240\201\345\210\206\346\236\22020.md" +++ "b/articles/leveldb\346\272\220\347\240\201\345\210\206\346\236\220/leveldb\346\272\220\347\240\201\345\210\206\346\236\22020.md" @@ -27,10 +27,10 @@ S5 最后删除一些过期文件,并检查是否需要执行compaction,如 S1 首先创建DBImpl对象,锁定并试图做Recover操作。Recover操作用来处理创建flag,比如存在就返回失败等等,尝试从已存在的sstable文件恢复db。并返回db元信息的变动信息,一个VersionEdit对象。 ``` -1DBImpl* impl = newDBImpl(options, dbname); -2impl->mutex_.Lock(); // 锁db -3VersionEdit edit; -4Status s =impl->Recover(&edit); // 处理flag&恢复:create_if_missing,error_if_exists +DBImpl* impl = newDBImpl(options, dbname); +impl->mutex_.Lock(); // 锁db +VersionEdit edit; +Status s =impl->Recover(&edit); // 处理flag&恢复:create_if_missing,error_if_exists ``` S2 如果Recover返回成功,则调用VersionSet取得新的log文件编号——实际上是在当前基础上+1,准备新的log文件。如果log文件创建成功,则根据log文件创建log::Writer。然后执行VersionSet::LogAndApply,根据edit记录的增量变动生成新的current version,并写入MANIFEST文件。 @@ -38,28 +38,28 @@ S2 如果Recover返回成功,则调用VersionSet取得新的log文件编号— 函数**NewFileNumber(){returnnext_file_number_++;}**,直接返回**next_file_number_**。 ``` - 1uint64_t new_log_number = impl->versions_->NewFileNumber(); - 2WritableFile* lfile; - 3s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), &lfile); - 4if (s.ok()) { - 5 edit.SetLogNumber(new_log_number); - 6 impl->logfile_ = lfile; - 7 impl->logfile_number_ = new_log_number; - 8 impl->log_ = newlog::Writer(lfile); - 9 s = impl->versions_->LogAndApply(&edit, &impl->mutex_); -10} + uint64_t new_log_number = impl->versions_->NewFileNumber(); + WritableFile* lfile; + s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), &lfile); + if (s.ok()) { + edit.SetLogNumber(new_log_number); + impl->logfile_ = lfile; + impl->logfile_number_ = new_log_number; + impl->log_ = newlog::Writer(lfile); + s = impl->versions_->LogAndApply(&edit, &impl->mutex_); +} ``` S3 如果VersionSet::LogAndApply返回成功,则删除过期文件,检查是否需要执行compaction,最终返回创建的DBImpl对象。 ``` -1if (s.ok()) { -2 impl->DeleteObsoleteFiles(); -3 impl->MaybeScheduleCompaction(); -4} -5impl->mutex_.Unlock(); -6if (s.ok()) *dbptr = impl; -7return s; +if (s.ok()) { + impl->DeleteObsoleteFiles(); + impl->MaybeScheduleCompaction(); +} +impl->mutex_.Unlock(); +if (s.ok()) *dbptr = impl; +return s; ``` 以上就是DB::Open的主题逻辑。 @@ -76,14 +76,14 @@ DBImpl::DBImpl(const Options& options, const std::string&dbname) 创建MemTable,并增加引用计数,创建WriteBatch。 ``` -1mem_(newMemTable(internal_comparator_)), -2tmp_batch_(new WriteBatch), -3mem_->Ref(); -4// 然后在函数体中,创建TableCache和VersionSet。 -5// 为其他预留10个文件,其余的都给TableCache. -6const int table_cache_size = options.max_open_files - 10; -7table_cache_ = newTableCache(dbname_, &options_, table_cache_size); -8versions_ = newVersionSet(dbname_, &options_, table_cache_, &internal_comparator_); +mem_(newMemTable(internal_comparator_)), +tmp_batch_(new WriteBatch), +mem_->Ref(); +// 然后在函数体中,创建TableCache和VersionSet。 +// 为其他预留10个文件,其余的都给TableCache. +const int table_cache_size = options.max_open_files - 10; +table_cache_ = newTableCache(dbname_, &options_, table_cache_size); +versions_ = newVersionSet(dbname_, &options_, table_cache_, &internal_comparator_); ``` ### @@ -93,29 +93,29 @@ DBImpl::DBImpl(const Options& options, const std::string&dbname) 当外部在调用DB::Open()时设置了option指定如果db不存在就创建,如果db不存在leveldb就会调用函数创建新的db。判断db是否存在的依据是**/CURRENT**文件是否存在。其逻辑很简单。 ``` - 1// S1首先生产DB元信息,设置comparator名,以及log文件编号、文件编号,以及seq no。 - 2VersionEdit new_db; - 3new_db.SetComparatorName(user_comparator()->Name()); - 4new_db.SetLogNumber(0); - 5new_db.SetNextFile(2); - 6new_db.SetLastSequence(0); - 7// S2 生产MANIFEST文件,将db元信息写入MANIFEST文件。 - 8const std::string manifest = DescriptorFileName(dbname_, 1); - 9WritableFile* file; -10Status s = env_->NewWritableFile(manifest, &file); -11if (!s.ok()) return s; -12{ -13 log::Writer log(file); -14 std::string record; -15 new_db.EncodeTo(&record); -16 s = log.AddRecord(record); -17 if (s.ok()) s = file->Close(); -18} -19delete file; -20// S3 如果成功,就把MANIFEST文件名写入到CURRENT文件中 -21if (s.ok()) s = SetCurrentFile(env_, dbname_, 1); -22elseenv_->DeleteFile(manifest); -23return s; +// S1首先生产DB元信息,设置comparator名,以及log文件编号、文件编号,以及seq no。 +VersionEdit new_db; +new_db.SetComparatorName(user_comparator()->Name()); +new_db.SetLogNumber(0); +new_db.SetNextFile(2); +new_db.SetLastSequence(0); +// S2 生产MANIFEST文件,将db元信息写入MANIFEST文件。 +const std::string manifest = DescriptorFileName(dbname_, 1); +WritableFile* file; +Status s = env_->NewWritableFile(manifest, &file); +if (!s.ok()) return s; +{ + log::Writer log(file); + std::string record; + new_db.EncodeTo(&record); + s = log.AddRecord(record); + if (s.ok()) s = file->Close(); +} +delete file; +// S3 如果成功,就把MANIFEST文件名写入到CURRENT文件中 +if (s.ok()) s = SetCurrentFile(env_, dbname_, 1); +else env_->DeleteFile(manifest); +return s; ``` 这就是创建新DB的逻辑,很简单。 @@ -133,9 +133,9 @@ StatusDBImpl::Recover(VersionEdit* edit) S1 创建目录,目录以db name命名,忽略任何创建错误,然后尝试获取d**b name/LOCK**文件锁,失败则返回。 ``` -1env_->CreateDir(dbname_); -2Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); -3if (!s.ok()) return s; +env_->CreateDir(dbname_); +Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); +if (!s.ok()) return s; ``` S2 根据CURRENT文件是否存在,以及option参数执行检查。 @@ -150,34 +150,34 @@ s = versions_->Recover(); S4尝试从所有比manifest文件中记录的log要新的log文件中恢复(前一个版本可能会添加新的log文件,却没有记录在manifest中)。另外,函数PrevLogNumber()已经不再用了,仅为了兼容老版本。 ``` - 1// S4.1 这里先找出所有满足条件的log文件:比manifest文件记录的log编号更新。 - 2SequenceNumber max_sequence(0); - 3const uint64_t min_log = versions_->LogNumber(); - 4const uint64_t prev_log = versions_->PrevLogNumber(); - 5std::vectorfilenames; - 6s = env_->GetChildren(dbname_, &filenames); // 列出目录内的所有文件 - 7uint64_t number; - 8FileType type; - 9std::vectorlogs; -10for (size_t i = 0; i < filenames.size(); i++) { // 检查log文件是否比min log更新 -11 if (ParseFileName(filenames[i], &number, &type) && type == kLogFile -12 && ((number >= min_log) || (number == prev_log))) { -13 logs.push_back(number); -14 } -15} -16// S4.2 找到log文件后,首先排序,保证按照生成顺序,依次回放log。并把DB元信息的变动(sstable文件的变动)追加到edit中返回。 -17std::sort(logs.begin(), logs.end()); -18for (size_t i = 0; i < logs.size(); i++) { -19 s = RecoverLogFile(logs[i], edit, &max_sequence); -20 // 前一版可能在生成该log编号后没有记录在MANIFEST中, -21 //所以这里我们手动更新VersionSet中的文件编号计数器 -22 versions_->MarkFileNumberUsed(logs[i]); -23} -24// S4.3 更新VersionSet的sequence -25if (s.ok()) { -26 if (versions_->LastSequence() < max_sequence) -27 versions_->SetLastSequence(max_sequence); -28} +// S4.1 这里先找出所有满足条件的log文件:比manifest文件记录的log编号更新。 +SequenceNumber max_sequence(0); +const uint64_t min_log = versions_->LogNumber(); +const uint64_t prev_log = versions_->PrevLogNumber(); +std::vectorfilenames; +s = env_->GetChildren(dbname_, &filenames); // 列出目录内的所有文件 +uint64_t number; +FileType type; +std::vectorlogs; +for (size_t i = 0; i < filenames.size(); i++) { // 检查log文件是否比min log更新 + if (ParseFileName(filenames[i], &number, &type) && type == kLogFile + && ((number >= min_log) || (number == prev_log))) { + logs.push_back(number); + } +} +// S4.2 找到log文件后,首先排序,保证按照生成顺序,依次回放log。并把DB元信息的变动(sstable文件的变动)追加到edit中返回。 +std::sort(logs.begin(), logs.end()); +for (size_t i = 0; i < logs.size(); i++) { + s = RecoverLogFile(logs[i], edit, &max_sequence); + // 前一版可能在生成该log编号后没有记录在MANIFEST中, + //所以这里我们手动更新VersionSet中的文件编号计数器 + versions_->MarkFileNumberUsed(logs[i]); +} +// S4.3 更新VersionSet的sequence +if (s.ok()) { + if (versions_->LastSequence() < max_sequence) + versions_->SetLastSequence(max_sequence); +} ``` 上面就是Recover的执行流程。 @@ -190,36 +190,36 @@ S4尝试从所有比manifest文件中记录的log要新的log文件中恢复( 该函数没有参数,其代码逻辑也很直观,就是列出db的所有文件,对不同类型的文件分别判断,如果是过期文件,就删除之,如下: ``` - 1// S1 首先,确保不会删除pending文件,将versionset正在使用的所有文件加入到live中。 - 2std::set live = pending_outputs_; - 3versions_->AddLiveFiles(&live); //该函数其后分析 - 4 // S2 列举db的所有文件 - 5std::vectorfilenames; - 6env_->GetChildren(dbname_, &filenames); - 7// S3 遍历所有列举的文件,根据文件类型,分别处理; - 8uint64_t number; - 9FileType type; -10for (size_t i = 0; i < filenames.size(); i++) { -11 if (ParseFileName(filenames[i], &number, &type)) { -12 bool keep = true; //false表明是过期文件 -13 // S3.1 kLogFile,log文件,根据log编号判断是否过期 -14 keep = ((number >= versions_->LogNumber()) || -15 (number == versions_->PrevLogNumber())); -16 // S3.2 kDescriptorFile,MANIFEST文件,根据versionset记录的编号判断 -17 keep = (number >= versions_->ManifestFileNumber()); -18 // S3.3 kTableFile,sstable文件,只要在live中就不能删除 -19 // S3.4 kTempFile,如果是正在写的文件,只要在live中就不能删除 -20 keep = (live.find(number) != live.end()); -21 // S3.5 kCurrentFile,kDBLockFile, kInfoLogFile,不能删除 -22 keep = true; -23 // S3.6 如果keep为false,表明需要删除文件,如果是table还要从cache中删除 -24 if (!keep) { -25 if (type == kTableFile) table_cache_->Evict(number); -26 Log(options_.info_log, "Delete type=%d #%lld\n", type, number); -27 env_->DeleteFile(dbname_ + "/" + filenames[i]); -28 } -29 } -30} +// S1 首先,确保不会删除pending文件,将versionset正在使用的所有文件加入到live中。 +std::set live = pending_outputs_; +versions_->AddLiveFiles(&live); //该函数其后分析 + // S2 列举db的所有文件 +std::vectorfilenames; +env_->GetChildren(dbname_, &filenames); +// S3 遍历所有列举的文件,根据文件类型,分别处理; +uint64_t number; +FileType type; +for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type)) { + bool keep = true; //false表明是过期文件 + // S3.1 kLogFile,log文件,根据log编号判断是否过期 + keep = ((number >= versions_->LogNumber()) || + (number == versions_->PrevLogNumber())); + // S3.2 kDescriptorFile,MANIFEST文件,根据versionset记录的编号判断 + keep = (number >= versions_->ManifestFileNumber()); + // S3.3 kTableFile,sstable文件,只要在live中就不能删除 + // S3.4 kTempFile,如果是正在写的文件,只要在live中就不能删除 + keep = (live.find(number) != live.end()); + // S3.5 kCurrentFile,kDBLockFile, kInfoLogFile,不能删除 + keep = true; + // S3.6 如果keep为false,表明需要删除文件,如果是table还要从cache中删除 + if (!keep) { + if (type == kTableFile) table_cache_->Evict(number); + Log(options_.info_log, "Delete type=%d #%lld\n", type, number); + env_->DeleteFile(dbname_ + "/" + filenames[i]); + } + } +} ``` 这就是删除过期文件的逻辑,其中调用到了**VersionSet::AddLiveFiles**函数,保证不会删除active的文件。 @@ -242,46 +242,46 @@ StatusRecoverLogFile(uint64_t log_number, VersionEdit* edit,SequenceNumber* max_ 它声明了一个局部类LogReporter以打印错误日志,没什么好说的,下面来看代码逻辑。 ``` - 1// S1 打开log文件返回SequentialFile*file,出错就返回,否则向下执行S2。 - 2// S2 根据log文件句柄file创建log::Reader,准备读取log。 - 3log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/); - 4// S3 依次读取所有的log记录,并插入到新生成的memtable中。这里使用到了批量更新接口WriteBatch,具体后面再分析。 - 5std::string scratch; - 6Slice record; - 7WriteBatch batch; - 8MemTable* mem = NULL; - 9while (reader.ReadRecord(&record, &scratch) && status.ok()) { // 读取全部log -10 if (record.size() < 12) { // log数据错误,不满足最小长度12 -11 reporter.Corruption(record.size(), Status::Corruption("log recordtoo small")); -12 continue; -13 } -14 WriteBatchInternal::SetContents(&batch, record); // log内容设置到WriteBatch中 -15 if (mem == NULL) { // 创建memtable -16 mem = new MemTable(internal_comparator_); -17 mem->Ref(); -18 } -19 status = WriteBatchInternal::InsertInto(&batch, mem); // 插入到memtable中 -20 MaybeIgnoreError(&status); -21 if (!status.ok()) break; -22 const SequenceNumber last_seq = -23 WriteBatchInternal::Sequence(&batch) + WriteBatchInternal::Count(&batch) - 1; -24 if (last_seq > *max_sequence) *max_sequence = last_seq; // 更新max sequence -25 // 如果mem的内存超过设置值,则执行compaction,如果compaction出错, -26 // 立刻返回错误,DB::Open失败 -27 if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { -28 status = WriteLevel0Table(mem, edit, NULL); -29 if (!status.ok()) break; -30 mem->Unref(); // 释放当前memtable -31 mem = NULL; -32 } -33} -34// S4 扫尾工作,如果mem != NULL,说明还需要dump到新的sstable文件中。 -35if (status.ok() && mem != NULL) {// 如果compaction出错,立刻返回错误 -36 status = WriteLevel0Table(mem, edit, NULL); -37} -38if (mem != NULL)mem->Unref(); -39delete file; -40return status; +// S1 打开log文件返回SequentialFile*file,出错就返回,否则向下执行S2。 +// S2 根据log文件句柄file创建log::Reader,准备读取log。 +log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/); +// S3 依次读取所有的log记录,并插入到新生成的memtable中。这里使用到了批量更新接口WriteBatch,具体后面再分析。 +std::string scratch; +Slice record; +WriteBatch batch; +MemTable* mem = NULL; +while (reader.ReadRecord(&record, &scratch) && status.ok()) { // 读取全部log + if (record.size() < 12) { // log数据错误,不满足最小长度12 + reporter.Corruption(record.size(), Status::Corruption("log recordtoo small")); + continue; + } + WriteBatchInternal::SetContents(&batch, record); // log内容设置到WriteBatch中 + if (mem == NULL) { // 创建memtable + mem = new MemTable(internal_comparator_); + mem->Ref(); + } + status = WriteBatchInternal::InsertInto(&batch, mem); // 插入到memtable中 + MaybeIgnoreError(&status); + if (!status.ok()) break; + const SequenceNumber last_seq = + WriteBatchInternal::Sequence(&batch) + WriteBatchInternal::Count(&batch) - 1; + if (last_seq > *max_sequence) *max_sequence = last_seq; // 更新max sequence + // 如果mem的内存超过设置值,则执行compaction,如果compaction出错, + // 立刻返回错误,DB::Open失败 + if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { + status = WriteLevel0Table(mem, edit, NULL); + if (!status.ok()) break; + mem->Unref(); // 释放当前memtable + mem = NULL; + } +} +// S4 扫尾工作,如果mem != NULL,说明还需要dump到新的sstable文件中。 +if (status.ok() && mem != NULL) {// 如果compaction出错,立刻返回错误 + status = WriteLevel0Table(mem, edit, NULL); +} +if (mem != NULL)mem->Unref(); +delete file; +return status; ``` 把MemTabledump到sstable是函数WriteLevel0Table的工作,其实这是compaction的一部分,准备放在compaction一节来分析。