Skip to content

Commit

Permalink
fix: properly handle last entry id
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 26, 2023
1 parent 6dd3602 commit 224c6cc
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 34 deletions.
7 changes: 1 addition & 6 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use common_procedure::error::{
ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{debug, info};
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
Expand Down Expand Up @@ -475,11 +475,6 @@ impl CreateRequestBuilder {
);
}

debug!(
"Set region options {:?} for region {}",
request.options, region_id
);

Ok(request)
}
}
6 changes: 0 additions & 6 deletions src/common/meta/src/key/datanode_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::collections::HashMap;
use std::sync::Arc;

use common_telemetry::debug;
use futures::stream::BoxStream;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -186,11 +185,6 @@ impl DatanodeTableManager {
})
.collect();

debug!(
"Persist region wal options {:?} for table {}",
filtered_region_wal_options, table_id
);

let key = DatanodeTableKey::new(datanode_id, table_id);
let val = DatanodeTableValue::new(
table_id,
Expand Down
16 changes: 7 additions & 9 deletions src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,13 @@ impl TopicManager {
}

fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
if let &RsKafkaError::ServerError {
protocol_error: TopicAlreadyExists,
..
} = e
{
true
} else {
false
}
matches!(
e,
&RsKafkaError::ServerError {
protocol_error: TopicAlreadyExists,
..
}
)
}
}

Expand Down
6 changes: 1 addition & 5 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue}
use common_meta::kv_backend::KvBackendRef;
pub use common_procedure::options::ProcedureConfig;
use common_runtime::Runtime;
use common_telemetry::{debug, error, info, warn};
use common_telemetry::{error, info, warn};
use file_engine::engine::FileRegionEngine;
use futures::future;
use futures_util::future::try_join_all;
Expand Down Expand Up @@ -545,10 +545,6 @@ async fn open_all_regions(
.and_then(|wal_options| {
region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});
debug!(
"Read region options {:?} for region {} from kv backend",
region_options, region_number
);

regions.push((
RegionId::new(table_value.table_id, region_number),
Expand Down
2 changes: 0 additions & 2 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,6 @@ pub(crate) async fn replay_memtable<S: LogStore>(
}

// set next_entry_id and write to memtable.
// FIXME(niebayes): figure out how to properly set last entry id when the replay is done.
last_entry_id = flushed_entry_id;
region_write_ctx.set_next_entry_id(last_entry_id + 1);
region_write_ctx.write_memtable();

Expand Down
6 changes: 0 additions & 6 deletions src/mito2/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use api::v1::WalEntry;
use async_stream::try_stream;
use common_config::wal::WalOptions;
use common_error::ext::BoxedError;
use common_telemetry::debug;
use futures::stream::BoxStream;
use futures::StreamExt;
use prost::Message;
Expand Down Expand Up @@ -74,11 +73,6 @@ impl<S: LogStore> Wal<S> {
start_id: EntryId,
wal_options: &'a WalOptions,
) -> Result<WalEntryStream> {
debug!(
"Scanning log entries for region {} starting from {} with wal_options {:?}",
region_id, start_id, wal_options
);

let stream = try_stream!({
let namespace = self.store.namespace(region_id.into(), wal_options);
let mut stream = self
Expand Down

0 comments on commit 224c6cc

Please sign in to comment.