Skip to content

Commit

Permalink
feat: update flushed manifest version when it is larger
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Aug 22, 2023
1 parent cb3561f commit 3280d50
Show file tree
Hide file tree
Showing 10 changed files with 14 additions and 37 deletions.
2 changes: 0 additions & 2 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ max_purge_tasks = 32
checkpoint_margin = 10
# Region manifest logs and checkpoints gc execution duration
gc_duration = '10m'
# Whether to try creating a manifest checkpoint on region opening
checkpoint_on_startup = false

# Storage flush options
[storage.flush]
Expand Down
2 changes: 0 additions & 2 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ max_purge_tasks = 32
checkpoint_margin = 10
# Region manifest logs and checkpoints gc execution duration
gc_duration = '10m'
# Whether to try creating a manifest checkpoint on region opening
checkpoint_on_startup = false

# Storage flush options
[storage.flush]
Expand Down
5 changes: 0 additions & 5 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ mod tests {
[storage.manifest]
checkpoint_margin = 9
gc_duration = '7s'
checkpoint_on_startup = true
compress = true
[logging]
Expand Down Expand Up @@ -289,7 +288,6 @@ mod tests {
RegionManifestConfig {
checkpoint_margin: Some(9),
gc_duration: Some(Duration::from_secs(7)),
checkpoint_on_startup: true,
compress: true
},
options.storage.manifest,
Expand Down Expand Up @@ -383,9 +381,6 @@ mod tests {
max_files_in_level0 = 7
max_purge_tasks = 32
[storage.manifest]
checkpoint_on_startup = true
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
Expand Down
12 changes: 0 additions & 12 deletions src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,17 +201,6 @@ mod tests {
.join(ENV_VAR_SEP),
Some("42s"),
),
(
// storage.manifest.checkpoint_on_startup = true
[
env_prefix.to_string(),
"storage".to_uppercase(),
"manifest".to_uppercase(),
"checkpoint_on_startup".to_uppercase(),
]
.join(ENV_VAR_SEP),
Some("true"),
),
(
// wal.dir = /other/wal/dir
[
Expand Down Expand Up @@ -253,7 +242,6 @@ mod tests {
opts.storage.manifest.gc_duration,
Some(Duration::from_secs(42))
);
assert!(opts.storage.manifest.checkpoint_on_startup);
assert_eq!(
opts.meta_client_options.unwrap().metasrv_addrs,
vec![
Expand Down
1 change: 0 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ mod tests {
[storage.manifest]
checkpoint_margin = 9
gc_duration = '7s'
checkpoint_on_startup = true
[http_options]
addr = "127.0.0.1:4000"
Expand Down
4 changes: 0 additions & 4 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,6 @@ pub struct RegionManifestConfig {
/// Region manifest logs and checkpoints gc task execution duration.
#[serde(with = "humantime_serde")]
pub gc_duration: Option<Duration>,
/// Whether to try creating a manifest checkpoint on region opening
pub checkpoint_on_startup: bool,
/// Whether to compress manifest and checkpoint file by gzip
pub compress: bool,
}
Expand All @@ -267,7 +265,6 @@ impl Default for RegionManifestConfig {
Self {
checkpoint_margin: Some(10u16),
gc_duration: Some(Duration::from_secs(600)),
checkpoint_on_startup: false,
compress: false,
}
}
Expand Down Expand Up @@ -341,7 +338,6 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
compress_manifest: value.storage.manifest.compress,
manifest_checkpoint_on_startup: value.storage.manifest.checkpoint_on_startup,
manifest_checkpoint_margin: value.storage.manifest.checkpoint_margin,
manifest_gc_duration: value.storage.manifest.gc_duration,
max_files_in_l0: value.storage.compaction.max_files_in_level0,
Expand Down
2 changes: 0 additions & 2 deletions src/storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub const DEFAULT_PICKER_SCHEDULE_INTERVAL: u32 = 5 * 60 * 1000;

#[derive(Debug, Clone)]
pub struct EngineConfig {
pub manifest_checkpoint_on_startup: bool,
pub compress_manifest: bool,
pub manifest_checkpoint_margin: Option<u16>,
pub manifest_gc_duration: Option<Duration>,
Expand All @@ -55,7 +54,6 @@ pub struct EngineConfig {
impl Default for EngineConfig {
fn default() -> Self {
Self {
manifest_checkpoint_on_startup: false,
compress_manifest: false,
manifest_checkpoint_margin: Some(10),
manifest_gc_duration: Some(Duration::from_secs(30)),
Expand Down
16 changes: 14 additions & 2 deletions src/storage/src/manifest/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ pub struct RegionManifestCheckpointer {

impl RegionManifestCheckpointer {
pub(crate) fn set_flushed_manifest_version(&self, manifest_version: ManifestVersion) {
let current = self.flushed_manifest_version.load(Ordering::Relaxed);

self.flushed_manifest_version
.store(manifest_version, Ordering::Relaxed);
.store(current.max(manifest_version), Ordering::Relaxed);
}
}

Expand Down Expand Up @@ -82,6 +84,12 @@ impl Checkpointer for RegionManifestCheckpointer {
return Ok(None);
}

info!("Begin to do region manifest checkpoint, path: {}, start_version: {}, end_version: {}, flushed_manifest_version: {}",
manifest.manifest_store().path(),
start_version,
end_version,
self.flushed_manifest_version.load(Ordering::Relaxed));

let mut iter = manifest.scan(start_version, end_version).await?;

let mut last_version = start_version;
Expand Down Expand Up @@ -135,7 +143,11 @@ impl Checkpointer for RegionManifestCheckpointer {
}
}

info!("Region manifest checkpoint, start_version: {}, last_version: {}, compacted actions: {}", start_version, last_version, compacted_actions);
info!("Region manifest checkpoint, path: {}, start_version: {}, last_version: {}, compacted actions: {}",
manifest.manifest_store().path(),
start_version,
last_version,
compacted_actions);

Ok(Some(checkpoint))
}
Expand Down
6 changes: 0 additions & 6 deletions src/storage/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,6 @@ impl<S: LogStore> RegionImpl<S> {
.replay(recovered_metadata_after_flushed, writer_ctx)
.await?;

// Try to do a manifest checkpoint on opening
if store_config.engine_config.manifest_checkpoint_on_startup {
let manifest = &store_config.manifest;
manifest.may_do_checkpoint(manifest.last_version()).await?;
}

let inner = Arc::new(RegionInner {
shared,
writer,
Expand Down
1 change: 0 additions & 1 deletion tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,6 @@ pub async fn test_config_api(store_type: StorageType) {
[storage.manifest]
checkpoint_margin = 10
gc_duration = "10m"
checkpoint_on_startup = false
compress = false
[storage.flush]
Expand Down

0 comments on commit 3280d50

Please sign in to comment.