Skip to content

Commit

Permalink
Lightweight validation of MANIFEST file after close on shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
anand1976 committed Dec 21, 2023
1 parent 7b24dec commit ca30fe0
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 30 deletions.
4 changes: 2 additions & 2 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class CompactionJobTestBase : public testing::Test {
/*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"",
/*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr)),
/*error_handler=*/nullptr, /*read_only=*/false)),
shutting_down_(false),
mock_table_factory_(new mock::MockTableFactory()),
error_handler_(nullptr, db_options_, &mutex_),
Expand Down Expand Up @@ -547,7 +547,7 @@ class CompactionJobTestBase : public testing::Test {
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));
compaction_job_stats_.Reset();
ASSERT_OK(SetIdentityFile(env_, dbname_));

Expand Down
14 changes: 13 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
dbname_, &immutable_db_options_, file_options_, table_cache_.get(),
write_buffer_manager_, &write_controller_, &block_cache_tracer_,
io_tracer_, db_id_, db_session_id_, options.daily_offpeak_time_utc,
&error_handler_));
&error_handler_, read_only));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));

Expand Down Expand Up @@ -657,6 +657,18 @@ Status DBImpl::CloseHelper() {

// versions need to be destroyed before table_cache since it can hold
// references to table_cache.
{
Status s = versions_->Close(directories_.GetDbDir(), &mutex_);
if (!s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Unable to close MANIFEST with error -- %s",
s.ToString().c_str());
if (ret.ok()) {
ret = s;
}
}
}

versions_.reset();
mutex_.Unlock();
if (db_lock_ != nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion db/db_wal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1543,7 +1543,7 @@ class RecoveryTestHelper {
/*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"",
options.daily_offpeak_time_utc,
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));

wal_manager.reset(
new WalManager(db_options, file_options, /*io_tracer=*/nullptr));
Expand Down
2 changes: 1 addition & 1 deletion db/flush_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class FlushJobTestBase : public testing::Test {
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));
EXPECT_OK(versions_->Recover(column_families, false));
}

Expand Down
4 changes: 2 additions & 2 deletions db/memtable_list_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class MemTableListTest : public testing::Test {
&write_controller, /*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_id=*/"",
/*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr);
/*error_handler=*/nullptr, /*read_only=*/false);
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());
Expand Down Expand Up @@ -160,7 +160,7 @@ class MemTableListTest : public testing::Test {
&write_controller, /*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_id=*/"",
/*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr);
/*error_handler=*/nullptr, /*read_only=*/false);
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());
Expand Down
2 changes: 1 addition & 1 deletion db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class Repairer {
raw_table_cache_.get(), &wb_, &wc_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", db_session_id_, db_options.daily_offpeak_time_utc,
/*error_handler=*/nullptr),
/*error_handler=*/nullptr, /*read_only=*/false),
next_file_number_(1),
db_lock_(nullptr),
closed_(false) {
Expand Down
60 changes: 56 additions & 4 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5067,7 +5067,7 @@ VersionSet::VersionSet(
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer, const std::string& db_id,
const std::string& db_session_id, const std::string& daily_offpeak_time_utc,
ErrorHandler* const error_handler)
ErrorHandler* const error_handler, const bool read_only)
: column_family_set_(new ColumnFamilySet(
dbname, _db_options, storage_options, table_cache,
write_buffer_manager, write_controller, block_cache_tracer, io_tracer,
Expand All @@ -5094,7 +5094,59 @@ VersionSet::VersionSet(
io_tracer_(io_tracer),
db_session_id_(db_session_id),
offpeak_time_option_(OffpeakTimeOption(daily_offpeak_time_utc)),
error_handler_(error_handler) {}
error_handler_(error_handler),
read_only_(read_only),
closed_(false) {}

Status VersionSet::Close(FSDirectory* db_dir, InstrumentedMutex* mu) {
Status s;
if (closed_ || read_only_ || !manifest_file_number_ ||
!descriptor_log_) {
return s;
}

std::string manifest_file_name =
DescriptorFileName(dbname_, manifest_file_number_);
uint64_t size = 0;
IOStatus io_s = descriptor_log_->Close();
descriptor_log_.reset();
TEST_SYNC_POINT("VersionSet::Close:AfterClose");
if (io_s.ok()) {
io_s = fs_->GetFileSize(manifest_file_name, IOOptions(), &size, nullptr);
}
if (!io_s.ok() || size != manifest_file_size_) {
if (io_s.ok()) {
// This means the size is not as expected. So we treat it as a
// corruption and set io_s appropriately
io_s = IOStatus::Corruption();
}
ColumnFamilyData* cfd = GetColumnFamilySet()->GetDefault();
const ImmutableOptions* ioptions = cfd->ioptions();
IOErrorInfo io_error_info(io_s, FileOperationType::kVerify,
manifest_file_name, /*length=*/size,
/*offset=*/0);

for (auto& listener : ioptions->listeners) {
listener->OnIOError(io_error_info);
}
io_s.PermitUncheckedError();
io_error_info.io_status.PermitUncheckedError();
ROCKS_LOG_ERROR(db_options_->info_log,
"MANIFEST verification on Close, "
"filename %s, expected size %" PRIu64
" failed with status %s and "
"actual size %" PRIu64 "\n",
manifest_file_name.c_str(), manifest_file_size_,
io_s.ToString().c_str(), size);
VersionEdit edit;
assert(cfd);
const MutableCFOptions& cf_opts = *cfd->GetLatestMutableCFOptions();
s = LogAndApply(cfd, cf_opts, ReadOptions(), &edit, mu, db_dir);
}

closed_ = true;
return s;
}

VersionSet::~VersionSet() {
// we need to delete column_family_set_ because its destructor depends on
Expand Down Expand Up @@ -6238,7 +6290,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/,
/*db_id*/ "",
/*db_session_id*/ "", options->daily_offpeak_time_utc,
/*error_handler_*/ nullptr);
/*error_handler_*/ nullptr, /*read_only=*/false);
Status status;

std::vector<ColumnFamilyDescriptor> dummy;
Expand Down Expand Up @@ -7280,7 +7332,7 @@ ReactiveVersionSet::ReactiveVersionSet(
write_buffer_manager, write_controller,
/*block_cache_tracer=*/nullptr, io_tracer, /*db_id*/ "",
/*db_session_id*/ "", /*daily_offpeak_time_utc*/ "",
/*error_handler=*/nullptr) {}
/*error_handler=*/nullptr, /*read_only=*/true) {}

ReactiveVersionSet::~ReactiveVersionSet() = default;

Expand Down
11 changes: 10 additions & 1 deletion db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -1160,13 +1160,15 @@ class VersionSet {
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id, const std::string& db_session_id,
const std::string& daily_offpeak_time_utc,
ErrorHandler* const error_handler);
ErrorHandler* const error_handler, const bool read_only);
// No copying allowed
VersionSet(const VersionSet&) = delete;
void operator=(const VersionSet&) = delete;

virtual ~VersionSet();

virtual Status Close(FSDirectory* db_dir, InstrumentedMutex* mu);

Status LogAndApplyToDefaultColumnFamily(
const ReadOptions& read_options, VersionEdit* edit, InstrumentedMutex* mu,
FSDirectory* dir_contains_current_file, bool new_descriptor_log = false,
Expand Down Expand Up @@ -1693,6 +1695,9 @@ class VersionSet {
Status LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b,
VersionEdit* edit, SequenceNumber* max_last_sequence,
InstrumentedMutex* mu);

const bool read_only_;
bool closed_;
};

// ReactiveVersionSet represents a collection of versions of the column
Expand All @@ -1710,6 +1715,10 @@ class ReactiveVersionSet : public VersionSet {

~ReactiveVersionSet() override;

Status Close(FSDirectory* /*db_dir*/, InstrumentedMutex* /*mu*/) override {
return Status::OK();
}

Status ReadAndApply(
InstrumentedMutex* mu,
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
Expand Down
57 changes: 47 additions & 10 deletions db/version_set_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,7 @@ class VersionSetTestBase {
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));
reactive_versions_ = std::make_shared<ReactiveVersionSet>(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, nullptr);
Expand Down Expand Up @@ -1354,16 +1354,31 @@ class VersionSetTestBase {
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
}

void CloseDB() {
mutex_.Lock();
versions_->Close(nullptr, &mutex_).PermitUncheckedError();
versions_.reset();
mutex_.Unlock();
}

void ReopenDB() {
versions_.reset(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));
EXPECT_OK(versions_->Recover(column_families_, false));
}

void GetManifestPath(std::string* manifest_path) const {
assert(manifest_path != nullptr);
uint64_t manifest_file_number = 0;
Status s = versions_->GetCurrentManifestPath(
dbname_, fs_.get(), manifest_path, &manifest_file_number);
ASSERT_OK(s);
}

void VerifyManifest(std::string* manifest_path) const {
assert(manifest_path != nullptr);
uint64_t manifest_file_number = 0;
Expand Down Expand Up @@ -1873,7 +1888,7 @@ TEST_F(VersionSetTest, WalAddition) {
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));
ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
Expand Down Expand Up @@ -1941,7 +1956,7 @@ TEST_F(VersionSetTest, WalCloseWithoutSync) {
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 2);
Expand Down Expand Up @@ -1995,7 +2010,7 @@ TEST_F(VersionSetTest, WalDeletion) {
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
Expand Down Expand Up @@ -2034,7 +2049,7 @@ TEST_F(VersionSetTest, WalDeletion) {
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
Expand Down Expand Up @@ -2155,7 +2170,7 @@ TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) {
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
Expand Down Expand Up @@ -2192,7 +2207,7 @@ TEST_F(VersionSetTest, DeleteAllWals) {
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 0);
Expand Down Expand Up @@ -2235,7 +2250,7 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));
std::string db_id;
ASSERT_OK(
new_versions->Recover(column_families_, /*read_only=*/false, &db_id));
Expand Down Expand Up @@ -2343,6 +2358,28 @@ TEST_F(VersionSetTest, OffpeakTimeInfoTest) {
versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
}

TEST_F(VersionSetTest, ManifestTruncateAfterClose) {
std::string manifest_path;
VersionEdit edit;

NewDB();
ASSERT_OK(LogAndApplyToDefaultCF(edit));
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::Close:AfterClose", [&](void*) {
GetManifestPath(&manifest_path);
std::unique_ptr<WritableFile> manifest_file;
EXPECT_OK(env_->ReopenWritableFile(manifest_path, &manifest_file,
EnvOptions()));
EXPECT_OK(manifest_file->Truncate(0));
EXPECT_OK(manifest_file->Close());
});
SyncPoint::GetInstance()->EnableProcessing();
CloseDB();
SyncPoint::GetInstance()->DisableProcessing();

ReopenDB();
}

TEST_F(VersionStorageInfoTest, AddRangeDeletionCompensatedFileSize) {
// Tests that compensated range deletion size is added to compensated file
// size.
Expand Down Expand Up @@ -2394,7 +2431,7 @@ class VersionSetWithTimestampTest : public VersionSetTest {
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));
ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false,
/*db_id=*/nullptr));
for (auto* cfd : *(vset->GetColumnFamilySet())) {
Expand Down
3 changes: 2 additions & 1 deletion db/version_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class OfflineManifestWriter {
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"",
options.daily_offpeak_time_utc,
/*error_handler=*/nullptr) {}
/*error_handler=*/nullptr,
/*read_only=*/false) {}

Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families) {
return versions_.Recover(column_families, /*read_only*/ false,
Expand Down
2 changes: 1 addition & 1 deletion db/wal_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class WalManagerTest : public testing::Test {
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
/*error_handler=*/nullptr, /*read_only=*/false));

wal_manager_.reset(
new WalManager(db_options_, env_options_, nullptr /*IOTracer*/));
Expand Down
3 changes: 2 additions & 1 deletion include/rocksdb/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ enum class FileOperationType {
kRangeSync,
kAppend,
kPositionedAppend,
kOpen
kOpen,
kVerify
};

struct FileOperationInfo {
Expand Down
Loading

0 comments on commit ca30fe0

Please sign in to comment.