From c4534b06da868a0cee070718457d2f80d4ca4866 Mon Sep 17 00:00:00 2001 From: Michael Gattozzi Date: Tue, 8 Oct 2024 10:03:55 -0400 Subject: [PATCH] feat: move Table Id/Name mapping into DB Schema (#25436) --- influxdb3_catalog/src/catalog.rs | 148 +++++++----------- ...catalog__tests__catalog_serialization.snap | 3 +- ..._catalog__tests__serialize_last_cache.snap | 3 +- ...catalog__tests__serialize_series_keys.snap | 3 +- influxdb3_server/src/http.rs | 8 +- influxdb3_server/src/query_executor.rs | 10 +- .../src/system_tables/parquet_files.rs | 4 +- influxdb3_write/src/last_cache/mod.rs | 18 ++- .../src/last_cache/table_function.rs | 4 +- influxdb3_write/src/write_buffer/mod.rs | 18 +-- .../src/write_buffer/queryable_buffer.rs | 14 +- ...after-last-cache-create-and-new-field.snap | 9 +- ...g-immediately-after-last-cache-create.snap | 9 +- ...g-immediately-after-last-cache-delete.snap | 9 +- influxdb3_write/src/write_buffer/validator.rs | 68 ++------ 15 files changed, 116 insertions(+), 212 deletions(-) diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 0370f478209..3348e39b99a 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -167,15 +167,6 @@ impl Catalog { Ok(db) } - pub fn add_table_to_lookup(&self, db_id: DbId, table_id: TableId, name: Arc) { - self.inner - .write() - .table_map - .entry(db_id) - .or_default() - .insert(table_id, name); - } - pub fn db_name_to_id(&self, db_name: Arc) -> Option { self.inner.read().db_map.get_by_right(&db_name).copied() } @@ -184,23 +175,6 @@ impl Catalog { self.inner.read().db_map.get_by_left(&db_id).map(Arc::clone) } - pub fn table_name_to_id(&self, db_id: DbId, table_name: Arc) -> Option { - self.inner - .read() - .table_map - .get(&db_id) - .and_then(|map| map.get_by_right(&table_name).copied()) - } - - pub fn table_id_to_name(&self, db_id: DbId, table_id: TableId) -> Option> { - self.inner - .read() - .table_map - .get(&db_id) - .and_then(|map| map.get_by_left(&table_id)) - .map(Arc::clone) - } - pub fn db_schema(&self, id: &DbId) -> Option> { self.inner.read().databases.get(id).cloned() } @@ -267,19 +241,6 @@ impl Catalog { pub fn insert_database(&mut self, db: DatabaseSchema) { let mut inner = self.inner.write(); - for (table_id, table_def) in db.tables.iter() { - inner - .table_map - .entry(db.id) - .and_modify(|map: &mut BiHashMap>| { - map.insert(*table_id, Arc::clone(&table_def.table_name)); - }) - .or_insert_with(|| { - let mut map = BiHashMap::new(); - map.insert(*table_id, Arc::clone(&table_def.table_name)); - map - }); - } inner.db_map.insert(db.id, Arc::clone(&db.name)); inner.databases.insert(db.id, Arc::new(db)); inner.sequence = inner.sequence.next(); @@ -321,8 +282,6 @@ pub struct InnerCatalog { updated: bool, #[serde_as(as = "DbMapAsArray")] db_map: BiHashMap>, - #[serde_as(as = "TableMapAsArray")] - pub table_map: HashMap>>, } serde_with::serde_conv!( @@ -351,45 +310,33 @@ struct DbMap { name: Arc, } +#[derive(Debug, Serialize, Deserialize)] +struct TableMap { + table_id: TableId, + name: Arc, +} + serde_with::serde_conv!( TableMapAsArray, - HashMap>>, - |map: &HashMap>>| { - map.iter().fold(Vec::new(), |mut acc, (db_id, table_map)| { - for (table_id, name) in table_map.iter() { - acc.push(TableMap { - db_id: *db_id, - table_id: *table_id, - name: Arc::clone(&name) - }); - } + BiHashMap>, + |map: &BiHashMap>| { + map.iter().fold(Vec::new(), |mut acc, (table_id, name)| { + acc.push(TableMap { + table_id: *table_id, + name: Arc::clone(&name) + }); acc }) }, |vec: Vec| -> Result<_, std::convert::Infallible> { - let mut map = HashMap::new(); + let mut map = BiHashMap::new(); for item in vec { - map.entry(item.db_id) - .and_modify(|entry: &mut BiHashMap>| { - entry.insert(item.table_id, Arc::clone(&item.name)); - }) - .or_insert_with(||{ - let mut inner_map = BiHashMap::new(); - inner_map.insert(item.table_id, Arc::clone(&item.name)); - inner_map - }); + map.insert(item.table_id, item.name); } Ok(map) } ); -#[derive(Debug, Serialize, Deserialize)] -struct TableMap { - db_id: DbId, - table_id: TableId, - name: Arc, -} - serde_with::serde_conv!( DatabasesAsArray, HashMap>, @@ -406,17 +353,20 @@ serde_with::serde_conv!( |vec: Vec| -> Result<_, String> { vec.into_iter().fold(Ok(HashMap::new()), |acc, db| { let mut acc = acc?; + let mut table_map = BiHashMap::new(); if let Some(_) = acc.insert(db.id, Arc::new(DatabaseSchema { id: db.id, name: Arc::clone(&db.name), tables: db.tables.into_iter().fold(Ok(BTreeMap::new()), |acc, table| { let mut acc = acc?; let table_name = Arc::clone(&table.table_name); + table_map.insert(table.table_id, Arc::clone(&table_name)); if let Some(_) = acc.insert(table.table_id, table) { return Err(format!("found duplicate table: {}", table_name)); } Ok(acc) - })? + })?, + table_map })) { return Err(format!("found duplicate db: {}", db.name)); } @@ -441,7 +391,6 @@ impl InnerCatalog { instance_id, updated: false, db_map: BiHashMap::new(), - table_map: HashMap::new(), } } @@ -471,18 +420,6 @@ impl InnerCatalog { self.sequence = self.sequence.next(); self.updated = true; self.db_map.insert(new_db.id, Arc::clone(&new_db.name)); - for (table_id, table_def) in new_db.tables.iter() { - self.table_map - .entry(new_db.id) - .and_modify(|map| { - map.insert(*table_id, Arc::clone(&table_def.table_name)); - }) - .or_insert_with(|| { - let mut map = BiHashMap::new(); - map.insert(*table_id, Arc::clone(&table_def.table_name)); - map - }); - } } } else { if self.databases.len() >= Catalog::NUM_DBS_LIMIT { @@ -499,18 +436,6 @@ impl InnerCatalog { self.sequence = self.sequence.next(); self.updated = true; self.db_map.insert(new_db.id, Arc::clone(&new_db.name)); - for (table_id, table_def) in new_db.tables.iter() { - self.table_map - .entry(new_db.id) - .and_modify(|map| { - map.insert(*table_id, Arc::clone(&table_def.table_name)); - }) - .or_insert_with(|| { - let mut map = BiHashMap::new(); - map.insert(*table_id, Arc::clone(&table_def.table_name)); - map - }); - } } Ok(()) @@ -532,6 +457,8 @@ pub struct DatabaseSchema { pub name: Arc, /// The database is a map of tables pub tables: BTreeMap, + #[serde_as(as = "TableMapAsArray")] + pub table_map: BiHashMap>, } impl DatabaseSchema { @@ -540,6 +467,7 @@ impl DatabaseSchema { id, name, tables: BTreeMap::new(), + table_map: BiHashMap::new(), } } @@ -636,10 +564,17 @@ impl DatabaseSchema { } } + // With the final list of updated/new tables update the current mapping + let new_table_maps = updated_or_new_tables + .iter() + .map(|(table_id, table_def)| (*table_id, Arc::clone(&table_def.table_name))) + .collect(); + Ok(Some(Self { id: self.id, name: Arc::clone(&self.name), tables: updated_or_new_tables, + table_map: new_table_maps, })) } } @@ -681,6 +616,14 @@ impl DatabaseSchema { pub fn tables(&self) -> impl Iterator { self.tables.values() } + + pub fn table_name_to_id(&self, table_name: Arc) -> Option { + self.table_map.get_by_right(&table_name).copied() + } + + pub fn table_id_to_name(&self, table_id: TableId) -> Option> { + self.table_map.get_by_left(&table_id).map(Arc::clone) + } } #[derive(Debug, Eq, PartialEq, Clone)] @@ -960,6 +903,12 @@ mod tests { id: DbId::from(0), name: "test_db".into(), tables: BTreeMap::new(), + table_map: { + let mut map = BiHashMap::new(); + map.insert(TableId::from(1), "test_table_1".into()); + map.insert(TableId::from(2), "test_table_2".into()); + map + }, }; use InfluxColumnType::*; use InfluxFieldType::*; @@ -1106,6 +1055,7 @@ mod tests { id: DbId::from(0), name: "test".into(), tables: BTreeMap::new(), + table_map: BiHashMap::new(), }; database.tables.insert( TableId::from(0), @@ -1142,6 +1092,11 @@ mod tests { id: DbId::from(0), name: "test_db".into(), tables: BTreeMap::new(), + table_map: { + let mut map = BiHashMap::new(); + map.insert(TableId::from(1), "test_table_1".into()); + map + }, }; use InfluxColumnType::*; use InfluxFieldType::*; @@ -1188,6 +1143,11 @@ mod tests { id: DbId::from(0), name: "test_db".into(), tables: BTreeMap::new(), + table_map: { + let mut map = BiHashMap::new(); + map.insert(TableId::from(0), "test".into()); + map + }, }; use InfluxColumnType::*; use InfluxFieldType::*; diff --git a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap index 558559fcc33..fb7aa2d9d21 100644 --- a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap +++ b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap @@ -156,6 +156,5 @@ expression: catalog "sequence": 0, "host_id": "dummy-host-id", "instance_id": "instance-id", - "db_map": [], - "table_map": [] + "db_map": [] } diff --git a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap index 36453f45d22..baa2b329bc0 100644 --- a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap +++ b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap @@ -81,6 +81,5 @@ expression: catalog "sequence": 0, "host_id": "dummy-host-id", "instance_id": "instance-id", - "db_map": [], - "table_map": [] + "db_map": [] } diff --git a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap index 5b66988d257..b708f7f11db 100644 --- a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap +++ b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap @@ -70,6 +70,5 @@ expression: catalog "sequence": 0, "host_id": "dummy-host-id", "instance_id": "instance-id", - "db_map": [], - "table_map": [] + "db_map": [] } diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 5e2ecb9066b..0c87718583e 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -700,7 +700,9 @@ where let table_id = self .write_buffer .catalog() - .table_name_to_id(db_id, table.as_str().into()) + .db_schema(&db_id) + .expect("db should exist") + .table_name_to_id(table.as_str().into()) .ok_or_else(|| WriteBufferError::TableDoesNotExist)?; match self .write_buffer @@ -748,7 +750,9 @@ where let table_id = self .write_buffer .catalog() - .table_name_to_id(db_id, table.into()) + .db_schema(&db_id) + .expect("db should exist") + .table_name_to_id(table.into()) .ok_or_else(|| WriteBufferError::TableDoesNotExist)?; self.write_buffer .delete_last_cache(db_id, table_id, &name) diff --git a/influxdb3_server/src/query_executor.rs b/influxdb3_server/src/query_executor.rs index 947fe71d6fd..e45b176a9fd 100644 --- a/influxdb3_server/src/query_executor.rs +++ b/influxdb3_server/src/query_executor.rs @@ -377,10 +377,7 @@ impl Database { async fn query_table(&self, table_name: &str) -> Option> { let table_name = table_name.into(); - let table_id = self - .write_buffer - .catalog() - .table_name_to_id(self.db_schema.id, Arc::clone(&table_name))?; + let table_id = self.db_schema.table_name_to_id(Arc::clone(&table_name))?; self.db_schema.get_table_schema(table_id).map(|schema| { Arc::new(QueryTable { db_schema: Arc::clone(&self.db_schema), @@ -515,10 +512,7 @@ impl SchemaProvider for Database { } fn table_exist(&self, name: &str) -> bool { - self.write_buffer - .catalog() - .table_name_to_id(self.db_schema.id, name.into()) - .is_some() + self.db_schema.table_name_to_id(name.into()).is_some() } } diff --git a/influxdb3_server/src/system_tables/parquet_files.rs b/influxdb3_server/src/system_tables/parquet_files.rs index 3283e34d0b3..6652ac3ad9e 100644 --- a/influxdb3_server/src/system_tables/parquet_files.rs +++ b/influxdb3_server/src/system_tables/parquet_files.rs @@ -95,7 +95,9 @@ impl IoxSystemTable for ParquetFilesTable { self.db_id, self.buffer .catalog() - .table_name_to_id(self.db_id, table_name.as_str().into()) + .db_schema(&self.db_id) + .expect("db exists") + .table_name_to_id(table_name.as_str().into()) .expect("table exists"), ); diff --git a/influxdb3_write/src/last_cache/mod.rs b/influxdb3_write/src/last_cache/mod.rs index 3b69c7187e4..728c00fe4d5 100644 --- a/influxdb3_write/src/last_cache/mod.rs +++ b/influxdb3_write/src/last_cache/mod.rs @@ -136,12 +136,7 @@ impl LastCacheProvider { .create_cache(CreateCacheArguments { db_id: db_schema.id, db_name: db_schema.name.to_string(), - table_id: catalog - .table_name_to_id( - db_schema.id, - Arc::clone(&table_def.table_name) - ) - .expect("table exists"), + table_id: table_def.table_id, table_name: table_def.table_name.to_string(), schema: table_def.schema.clone(), cache_name: Some(cache_name.to_owned()), @@ -205,7 +200,9 @@ impl LastCacheProvider { lc.to_definition( *table_id, self.catalog - .table_id_to_name(db, *table_id) + .db_schema(&db) + .expect("db exists") + .table_id_to_name(*table_id) .expect("table exists") .to_string(), lc_name, @@ -1609,6 +1606,7 @@ mod tests { }; use ::object_store::{memory::InMemory, ObjectStore}; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; + use bimap::BiHashMap; use data_types::NamespaceName; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition}; use influxdb3_id::{DbId, TableId}; @@ -3126,6 +3124,12 @@ mod tests { id: DbId::from(0), name: db_name.into(), tables: BTreeMap::new(), + table_map: { + let mut map = BiHashMap::new(); + map.insert(TableId::from(0), "test_table_1".into()); + map.insert(TableId::from(1), "test_table_2".into()); + map + }, }; let table_id = TableId::from(0); use schema::InfluxColumnType::*; diff --git a/influxdb3_write/src/last_cache/table_function.rs b/influxdb3_write/src/last_cache/table_function.rs index 08db8f2a458..68aa92d2115 100644 --- a/influxdb3_write/src/last_cache/table_function.rs +++ b/influxdb3_write/src/last_cache/table_function.rs @@ -100,7 +100,9 @@ impl TableFunctionImpl for LastCacheFunction { let table_id = self .provider .catalog - .table_name_to_id(self.db_id, table_name.as_str().into()) + .db_schema(&self.db_id) + .expect("db exists") + .table_name_to_id(table_name.as_str().into()) .expect("table exists"); match self.provider.get_cache_name_and_schema( diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index ea53b055504..fd824ff8d05 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -308,9 +308,8 @@ impl WriteBufferImpl { .db_schema(&db_id) .expect("Already checked db exists"); - let table_id = self - .catalog - .table_name_to_id(db_id, table_name.into()) + let table_id = db_schema + .table_name_to_id(table_name.into()) .ok_or_else(|| { DataFusionError::Execution(format!( "table {} not found in db {}", @@ -484,10 +483,10 @@ impl LastCacheManager for WriteBufferImpl { if let Some(info) = self.last_cache.create_cache(CreateCacheArguments { db_id, - db_name: catalog.db_id_to_name(db_id).expect("db exists").to_string(), + db_name: db_schema.name.to_string(), table_id, - table_name: catalog - .table_id_to_name(db_id, table_id) + table_name: db_schema + .table_id_to_name(table_id) .expect("table exists") .to_string(), schema, @@ -519,6 +518,7 @@ impl LastCacheManager for WriteBufferImpl { cache_name: &str, ) -> crate::Result<(), self::Error> { let catalog = self.catalog(); + let db_schema = catalog.db_schema(&db_id).expect("db should exist"); self.last_cache.delete_cache(db_id, tbl_id, cache_name)?; catalog.delete_last_cache(db_id, tbl_id, cache_name); @@ -528,11 +528,11 @@ impl LastCacheManager for WriteBufferImpl { .write_ops(vec![WalOp::Catalog(CatalogBatch { time_ns: self.time_provider.now().timestamp_nanos(), database_id: db_id, - database_name: catalog.db_id_to_name(db_id).expect("database exists"), + database_name: Arc::clone(&db_schema.name), ops: vec![CatalogOp::DeleteLastCache(LastCacheDelete { table_id: tbl_id, - table_name: catalog - .table_id_to_name(db_id, tbl_id) + table_name: db_schema + .table_id_to_name(tbl_id) .expect("table exists") .to_string(), name: cache_name.into(), diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 82f93bb8844..7643ea67192 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -80,9 +80,8 @@ impl QueryableBuffer { _projection: Option<&Vec>, _ctx: &dyn Session, ) -> Result>, DataFusionError> { - let table_id = self - .catalog - .table_name_to_id(db_schema.id, table_name.into()) + let table_id = db_schema + .table_name_to_id(table_name.into()) .ok_or_else(|| DataFusionError::Execution(format!("table {} not found", table_name)))?; let table = db_schema .tables @@ -154,14 +153,13 @@ impl QueryableBuffer { let mut persisting_chunks = vec![]; let catalog = Arc::clone(&buffer.catalog); for (database_id, table_map) in buffer.db_to_table.iter_mut() { + let db_schema = catalog.db_schema(database_id).expect("db exists"); for (table_id, table_buffer) in table_map.iter_mut() { let snapshot_chunks = table_buffer.snapshot(snapshot_details.end_time_marker); for chunk in snapshot_chunks { - let table_name = catalog - .table_id_to_name(*database_id, *table_id) - .expect("table exists"); - let db_name = catalog.db_id_to_name(*database_id).expect("db_exists"); + let table_name = + db_schema.table_id_to_name(*table_id).expect("table exists"); let persist_job = PersistJob { database_id: *database_id, table_id: *table_id, @@ -169,7 +167,7 @@ impl QueryableBuffer { chunk_time: chunk.chunk_time, path: ParquetFilePath::new( self.persister.host_identifier_prefix(), - db_name.as_ref(), + db_schema.name.as_ref(), database_id.as_u32(), table_name.as_ref(), table_id.as_u32(), diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap index 714d7dd0c75..8dab63c07c7 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap @@ -68,12 +68,5 @@ expression: catalog_json ], "host_id": "test_host", "instance_id": "[uuid]", - "sequence": 3, - "table_map": [ - { - "db_id": 0, - "name": "table", - "table_id": 0 - } - ] + "sequence": 3 } diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap index 707318c64e4..37ada61adcb 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap @@ -63,12 +63,5 @@ expression: catalog_json ], "host_id": "test_host", "instance_id": "[uuid]", - "sequence": 2, - "table_map": [ - { - "db_id": 0, - "name": "table", - "table_id": 0 - } - ] + "sequence": 2 } diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap index d7aa539bb62..acd534e8945 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap @@ -55,12 +55,5 @@ expression: catalog_json ], "host_id": "test_host", "instance_id": "[uuid]", - "sequence": 4, - "table_map": [ - { - "db_id": 0, - "name": "table", - "table_id": 0 - } - ] + "sequence": 4 } diff --git a/influxdb3_write/src/write_buffer/validator.rs b/influxdb3_write/src/write_buffer/validator.rs index 0697d82efa0..2f99832f9a5 100644 --- a/influxdb3_write/src/write_buffer/validator.rs +++ b/influxdb3_write/src/write_buffer/validator.rs @@ -1,7 +1,6 @@ use std::{borrow::Cow, sync::Arc}; use crate::{write_buffer::Result, Precision, WriteLineError}; -use bimap::BiHashMap; use data_types::{NamespaceName, Timestamp}; use hashbrown::HashMap; use influxdb3_catalog::catalog::{ @@ -89,15 +88,8 @@ impl WriteValidator { line_number: line_idx + 1, error_message: e.to_string(), }) - .and_then(|l| { - validate_v3_line( - &self.state.catalog, - &mut schema, - line_idx, - l, - lp_lines.peek().unwrap(), - ) - }) { + .and_then(|l| validate_v3_line(&mut schema, line_idx, l, lp_lines.peek().unwrap())) + { Ok(line) => line, Err(e) => { if !accept_partial { @@ -169,7 +161,7 @@ impl WriteValidator { line_number: line_idx + 1, error_message: e.to_string(), }) - .and_then(|l| validate_v1_line(&self.state.catalog, &mut schema, line_idx, l)) + .and_then(|l| validate_v1_line(&mut schema, line_idx, l)) { Ok(line) => line, Err(e) => { @@ -226,7 +218,6 @@ impl WriteValidator { /// This errors if the write is being performed against a v1 table, i.e., one that does not have /// a series key. fn validate_v3_line<'a>( - catalog: &Catalog, db_schema: &mut Cow<'_, DatabaseSchema>, line_number: usize, line: v3::ParsedLine<'a>, @@ -234,8 +225,8 @@ fn validate_v3_line<'a>( ) -> Result<(v3::ParsedLine<'a>, Option), WriteLineError> { let mut catalog_op = None; let table_name = line.series.measurement.as_str(); - if let Some(table_def) = catalog - .table_name_to_id(db_schema.id, table_name.into()) + if let Some(table_def) = db_schema + .table_name_to_id(table_name.into()) .and_then(|table_id| db_schema.get_table(table_id)) { let table_id = table_def.table_id; @@ -393,26 +384,11 @@ fn validate_v3_line<'a>( }); catalog_op = Some(table_definition_op); - // We have to add the mapping here or else each line might create a new - // table and table_id before the CatalogOp is applied - catalog - .inner() - .write() - .table_map - .entry(db_schema.id) - .and_modify(|map| { - map.insert(table_id, Arc::clone(&table_name)); - }) - .or_insert_with(|| { - let mut map = BiHashMap::new(); - map.insert(table_id, Arc::clone(&table_name)); - map - }); - assert!( db_schema.to_mut().tables.insert(table_id, table).is_none(), "attempted to overwrite existing table" - ) + ); + db_schema.to_mut().table_map.insert(table_id, table_name); } Ok((line, catalog_op)) @@ -426,15 +402,14 @@ fn validate_v3_line<'a>( /// An error will also be produced if the write, which is for the v1 data model, is targetting /// a v3 table. fn validate_v1_line<'a>( - catalog: &Catalog, db_schema: &mut Cow<'_, DatabaseSchema>, line_number: usize, line: ParsedLine<'a>, ) -> Result<(ParsedLine<'a>, Option), WriteLineError> { let mut catalog_op = None; let table_name = line.series.measurement.as_str(); - if let Some(table_def) = catalog - .table_name_to_id(db_schema.id, table_name.into()) + if let Some(table_def) = db_schema + .table_name_to_id(table_name.into()) .and_then(|table_id| db_schema.get_table(table_id)) { if table_def.is_v3() { @@ -549,22 +524,6 @@ fn validate_v1_line<'a>( key: None, })); - // We have to add the mapping here or else each line might create a new - // table and table_id before the CatalogOp is applied - catalog - .inner() - .write() - .table_map - .entry(db_schema.id) - .and_modify(|map| { - map.insert(table_id, Arc::clone(&table_name)); - }) - .or_insert_with(|| { - let mut map = BiHashMap::new(); - map.insert(table_id, Arc::clone(&table_name)); - map - }); - let table = TableDefinition::new( table_id, Arc::clone(&table_name), @@ -577,6 +536,7 @@ fn validate_v1_line<'a>( db_schema.to_mut().tables.insert(table_id, table).is_none(), "attempted to overwrite existing table" ); + db_schema.to_mut().table_map.insert(table_id, table_name); } Ok((line, catalog_op)) @@ -700,7 +660,9 @@ fn convert_v3_parsed_line( let chunk_time = gen1_duration.chunk_time_for_timestamp(Timestamp::new(time_value_nanos)); let table_name: Arc = line.series.measurement.to_string().into(); let table_id = catalog - .table_name_to_id(db_id, Arc::clone(&table_name)) + .db_schema(&db_id) + .expect("db should exist by this point") + .table_name_to_id(Arc::clone(&table_name)) .expect("table should exist by this point"); let table_chunks = table_chunk_map.entry(table_id).or_default(); table_chunks.push_row( @@ -817,7 +779,9 @@ fn convert_v1_parsed_line( let table_name: Arc = line.series.measurement.to_string().into(); let table_id = catalog - .table_name_to_id(db_id, Arc::clone(&table_name)) + .db_schema(&db_id) + .expect("the database should exist by this point") + .table_name_to_id(Arc::clone(&table_name)) .expect("table should exist by this point"); let table_chunks = table_chunk_map.entry(table_id).or_default(); table_chunks.push_row(