Skip to content

Commit

Permalink
fix: resolve conflicts and review conversations
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 25, 2023
2 parents 62a8991 + d4ac873 commit af419f0
Show file tree
Hide file tree
Showing 23 changed files with 169 additions and 107 deletions.
3 changes: 2 additions & 1 deletion docker/dev-builder/centos/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ ARG RUST_TOOLCHAIN
RUN rustup toolchain install ${RUST_TOOLCHAIN}

# Install nextest.
RUN cargo install cargo-nextest --locked
RUN cargo install cargo-binstall --locked
RUN cargo binstall cargo-nextest --no-confirm
3 changes: 2 additions & 1 deletion docker/dev-builder/ubuntu/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ ARG RUST_TOOLCHAIN
RUN rustup toolchain install ${RUST_TOOLCHAIN}

# Install nextest.
RUN cargo install cargo-nextest --locked
RUN cargo install cargo-binstall --locked
RUN cargo binstall cargo-nextest --no-confirm
3 changes: 2 additions & 1 deletion docker/dev-builder/ubuntu/Dockerfile-18.10
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ ARG RUST_TOOLCHAIN
RUN rustup toolchain install ${RUST_TOOLCHAIN}

# Install nextest.
RUN cargo install cargo-nextest --locked
RUN cargo install cargo-binstall --locked
RUN cargo binstall cargo-nextest --no-confirm
8 changes: 3 additions & 5 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,11 @@ impl CatalogManager for KvBackendCatalogManager {
.try_collect::<BTreeSet<_>>()
.await
.map_err(BoxedError::new)
.context(ListSchemasSnafu { catalog })?
.into_iter()
.collect::<Vec<_>>();
.context(ListSchemasSnafu { catalog })?;

keys.extend_from_slice(&self.system_catalog.schema_names());
keys.extend(self.system_catalog.schema_names());

Ok(keys)
Ok(keys.into_iter().collect())
}

async fn table_names(&self, catalog: &str, schema: &str) -> CatalogResult<Vec<String>> {
Expand Down
7 changes: 7 additions & 0 deletions src/catalog/src/memory/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ impl MemoryCatalogManager {
schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
})
.unwrap();
manager
.register_schema_sync(RegisterSchemaRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: INFORMATION_SCHEMA_NAME.to_string(),
})
.unwrap();

manager
}
Expand Down Expand Up @@ -256,6 +262,7 @@ impl MemoryCatalogManager {
Arc::downgrade(self) as Weak<dyn CatalogManager>,
);
let information_schema = information_schema_provider.tables().clone();

let mut catalog = HashMap::new();
catalog.insert(INFORMATION_SCHEMA_NAME.to_string(), information_schema);
catalog
Expand Down
8 changes: 7 additions & 1 deletion src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ pub struct OpenRegion {
pub region_options: HashMap<String, String>,
#[serde(default)]
pub region_wal_options: HashMap<String, String>,
#[serde(default)]
pub skip_wal_replay: bool,
}

impl OpenRegion {
Expand All @@ -106,12 +108,14 @@ impl OpenRegion {
path: &str,
region_options: HashMap<String, String>,
region_wal_options: HashMap<String, String>,
skip_wal_replay: bool,
) -> Self {
Self {
region_ident,
region_storage_path: path.to_string(),
region_options,
region_wal_options,
skip_wal_replay,
}
}
}
Expand Down Expand Up @@ -227,12 +231,13 @@ mod tests {
"test/foo",
HashMap::new(),
HashMap::new(),
false,
));

let serialized = serde_json::to_string(&open_region).unwrap();

assert_eq!(
r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{}}}"#,
r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
serialized
);

Expand Down Expand Up @@ -289,6 +294,7 @@ mod tests {
region_storage_path,
region_options,
region_wal_options: HashMap::new(),
skip_wal_replay: false,
};
assert_eq!(expected, deserialized);
}
Expand Down
23 changes: 13 additions & 10 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use std::sync::Arc;

use bytes::Bytes;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
};
use common_telemetry::warn;
use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
Expand Down Expand Up @@ -297,17 +297,20 @@ impl TableMetadataManager {

pub async fn init(&self) -> Result<()> {
let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
let public_schema_name = SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
let private_schema_name =
SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME);

self.catalog_manager().create(catalog_name, true).await?;
self.schema_manager()
.create(public_schema_name, None, true)
.await?;
self.schema_manager()
.create(private_schema_name, None, true)
.await?;

let internal_schemas = [
DEFAULT_SCHEMA_NAME,
INFORMATION_SCHEMA_NAME,
DEFAULT_PRIVATE_SCHEMA_NAME,
];

for schema_name in internal_schemas {
let schema_key = SchemaNameKey::new(DEFAULT_CATALOG_NAME, schema_name);

self.schema_manager().create(schema_key, None, true).await?;
}

Ok(())
}
Expand Down
6 changes: 4 additions & 2 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ impl RegionHeartbeatResponseHandler {
region_storage_path,
region_options,
region_wal_options,
}) => Ok(Box::new(|region_server| {
skip_wal_replay,
}) => Ok(Box::new(move |region_server| {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
// TODO(niebayes): extends region options with region_wal_options.
Expand All @@ -64,7 +65,7 @@ impl RegionHeartbeatResponseHandler {
engine: region_ident.engine,
region_dir: region_dir(&region_storage_path, region_id),
options: region_options,
skip_wal_replay: false,
skip_wal_replay,
});
let result = region_server.handle_request(region_id, request).await;

Expand Down Expand Up @@ -244,6 +245,7 @@ mod tests {
path,
HashMap::new(),
HashMap::new(),
false,
))
}

Expand Down
26 changes: 15 additions & 11 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl LogStore for KafkaLogStore {
type Entry = EntryImpl;
type Namespace = NamespaceImpl;

/// Creates an entry.
/// Creates an entry of the associated Entry type.
fn entry<D: AsRef<[u8]>>(
&self,
data: D,
Expand All @@ -74,19 +74,20 @@ impl LogStore for KafkaLogStore {
.produce(&self.client_manager)
.await
.map(TryInto::try_into)??;
Ok(AppendResponse { entry_id })
Ok(AppendResponse {
last_entry_id: entry_id,
})
}

/// Appends a batch of entries to the log store. The response contains a map where the key
/// is a region id while the value is the id of the entry, the first entry of the entries belong to the region,
/// written into the log store.
/// Appends a batch of entries and returns a response containing a map where the key is a region id
/// while the value is the id of the last successfully written entry of the region.
async fn append_batch(&self, entries: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
if entries.is_empty() {
return Ok(AppendBatchResponse::default());
}

// Groups entries by region id and pushes them to an associated record producer.
let mut producers: HashMap<_, RecordProducer> = HashMap::with_capacity(entries.len());
let mut producers = HashMap::with_capacity(entries.len());
for entry in entries {
producers
.entry(entry.ns.region_id)
Expand All @@ -108,7 +109,7 @@ impl LogStore for KafkaLogStore {
.map(TryInto::try_into)
.collect::<Result<Vec<_>>>()?;
Ok(AppendBatchResponse {
entry_ids: region_ids.into_iter().zip(entry_ids).collect(),
last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(),
})
}

Expand Down Expand Up @@ -144,7 +145,7 @@ impl LogStore for KafkaLogStore {
Ok(Box::pin(stream))
}

/// Creates a namespace.
/// Creates a namespace of the associated Namespace type.
fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace {
// Safety: upon start, the datanode checks the consistency of the wal providers in the wal config of the
// datanode and that of the metasrv. Therefore, the wal options passed into the kafka log store
Expand All @@ -158,21 +159,24 @@ impl LogStore for KafkaLogStore {
}
}

/// Creates a new `Namespace` from the given ref.
async fn create_namespace(&self, _ns: &Self::Namespace) -> Result<()> {
Ok(())
}

/// Deletes an existing `Namespace` specified by the given ref.
async fn delete_namespace(&self, _ns: &Self::Namespace) -> Result<()> {
Ok(())
}

/// Lists all existing namespaces.
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>> {
Ok(vec![])
}

/// Marks all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete
/// the log files if all entries inside are obsolete. This method may not delete log
/// files immediately.
/// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete,
/// so that the log store can safely delete those entries. This method does not guarantee
/// that the obsolete entries are deleted immediately.
async fn obsolete(&self, _ns: Self::Namespace, _entry_id: EntryId) -> Result<()> {
Ok(())
}
Expand Down
3 changes: 3 additions & 0 deletions src/log-store/src/kafka/record_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type ConsumeResult = std::result::Result<(RecordAndOffset, i64), rskafka::client
/// Record metadata which will be serialized/deserialized to/from the `key` of a Record.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct RecordMeta {
/// Meta version. Used for backward compatibility.
version: u32,
/// The namespace of the entries wrapped in the record.
ns: NamespaceImpl,
/// Ids of the entries built into the record.
Expand All @@ -41,6 +43,7 @@ struct RecordMeta {
impl RecordMeta {
fn new(ns: NamespaceImpl, entries: &[EntryImpl]) -> Self {
Self {
version: 0,
ns,
entry_ids: entries.iter().map(|entry| entry.id).collect(),
entry_offsets: entries
Expand Down
2 changes: 1 addition & 1 deletion src/log-store/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl LogStore for NoopLogStore {
}

async fn append(&self, mut _e: Self::Entry) -> Result<AppendResponse> {
Ok(AppendResponse { entry_id: 0 })
Ok(AppendResponse::default())
}

async fn append_batch(&self, _e: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
Expand Down
Loading

0 comments on commit af419f0

Please sign in to comment.