Skip to content

Commit

Permalink
feat: move Table Id/Name mapping into DB Schema (#25436)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgattozzi authored Oct 8, 2024
1 parent bd20f80 commit c4534b0
Show file tree
Hide file tree
Showing 15 changed files with 116 additions and 212 deletions.
148 changes: 54 additions & 94 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,6 @@ impl Catalog {
Ok(db)
}

pub fn add_table_to_lookup(&self, db_id: DbId, table_id: TableId, name: Arc<str>) {
self.inner
.write()
.table_map
.entry(db_id)
.or_default()
.insert(table_id, name);
}

pub fn db_name_to_id(&self, db_name: Arc<str>) -> Option<DbId> {
self.inner.read().db_map.get_by_right(&db_name).copied()
}
Expand All @@ -184,23 +175,6 @@ impl Catalog {
self.inner.read().db_map.get_by_left(&db_id).map(Arc::clone)
}

pub fn table_name_to_id(&self, db_id: DbId, table_name: Arc<str>) -> Option<TableId> {
self.inner
.read()
.table_map
.get(&db_id)
.and_then(|map| map.get_by_right(&table_name).copied())
}

pub fn table_id_to_name(&self, db_id: DbId, table_id: TableId) -> Option<Arc<str>> {
self.inner
.read()
.table_map
.get(&db_id)
.and_then(|map| map.get_by_left(&table_id))
.map(Arc::clone)
}

pub fn db_schema(&self, id: &DbId) -> Option<Arc<DatabaseSchema>> {
self.inner.read().databases.get(id).cloned()
}
Expand Down Expand Up @@ -267,19 +241,6 @@ impl Catalog {

pub fn insert_database(&mut self, db: DatabaseSchema) {
let mut inner = self.inner.write();
for (table_id, table_def) in db.tables.iter() {
inner
.table_map
.entry(db.id)
.and_modify(|map: &mut BiHashMap<TableId, Arc<str>>| {
map.insert(*table_id, Arc::clone(&table_def.table_name));
})
.or_insert_with(|| {
let mut map = BiHashMap::new();
map.insert(*table_id, Arc::clone(&table_def.table_name));
map
});
}
inner.db_map.insert(db.id, Arc::clone(&db.name));
inner.databases.insert(db.id, Arc::new(db));
inner.sequence = inner.sequence.next();
Expand Down Expand Up @@ -321,8 +282,6 @@ pub struct InnerCatalog {
updated: bool,
#[serde_as(as = "DbMapAsArray")]
db_map: BiHashMap<DbId, Arc<str>>,
#[serde_as(as = "TableMapAsArray")]
pub table_map: HashMap<DbId, BiHashMap<TableId, Arc<str>>>,
}

serde_with::serde_conv!(
Expand Down Expand Up @@ -351,45 +310,33 @@ struct DbMap {
name: Arc<str>,
}

#[derive(Debug, Serialize, Deserialize)]
struct TableMap {
table_id: TableId,
name: Arc<str>,
}

serde_with::serde_conv!(
TableMapAsArray,
HashMap<DbId, BiHashMap<TableId, Arc<str>>>,
|map: &HashMap<DbId, BiHashMap<TableId, Arc<str>>>| {
map.iter().fold(Vec::new(), |mut acc, (db_id, table_map)| {
for (table_id, name) in table_map.iter() {
acc.push(TableMap {
db_id: *db_id,
table_id: *table_id,
name: Arc::clone(&name)
});
}
BiHashMap<TableId, Arc<str>>,
|map: &BiHashMap<TableId, Arc<str>>| {
map.iter().fold(Vec::new(), |mut acc, (table_id, name)| {
acc.push(TableMap {
table_id: *table_id,
name: Arc::clone(&name)
});
acc
})
},
|vec: Vec<TableMap>| -> Result<_, std::convert::Infallible> {
let mut map = HashMap::new();
let mut map = BiHashMap::new();
for item in vec {
map.entry(item.db_id)
.and_modify(|entry: &mut BiHashMap<TableId, Arc<str>>| {
entry.insert(item.table_id, Arc::clone(&item.name));
})
.or_insert_with(||{
let mut inner_map = BiHashMap::new();
inner_map.insert(item.table_id, Arc::clone(&item.name));
inner_map
});
map.insert(item.table_id, item.name);
}
Ok(map)
}
);

#[derive(Debug, Serialize, Deserialize)]
struct TableMap {
db_id: DbId,
table_id: TableId,
name: Arc<str>,
}

serde_with::serde_conv!(
DatabasesAsArray,
HashMap<DbId, Arc<DatabaseSchema>>,
Expand All @@ -406,17 +353,20 @@ serde_with::serde_conv!(
|vec: Vec<DatabasesSerialized>| -> Result<_, String> {
vec.into_iter().fold(Ok(HashMap::new()), |acc, db| {
let mut acc = acc?;
let mut table_map = BiHashMap::new();
if let Some(_) = acc.insert(db.id, Arc::new(DatabaseSchema {
id: db.id,
name: Arc::clone(&db.name),
tables: db.tables.into_iter().fold(Ok(BTreeMap::new()), |acc, table| {
let mut acc = acc?;
let table_name = Arc::clone(&table.table_name);
table_map.insert(table.table_id, Arc::clone(&table_name));
if let Some(_) = acc.insert(table.table_id, table) {
return Err(format!("found duplicate table: {}", table_name));
}
Ok(acc)
})?
})?,
table_map
})) {
return Err(format!("found duplicate db: {}", db.name));
}
Expand All @@ -441,7 +391,6 @@ impl InnerCatalog {
instance_id,
updated: false,
db_map: BiHashMap::new(),
table_map: HashMap::new(),
}
}

Expand Down Expand Up @@ -471,18 +420,6 @@ impl InnerCatalog {
self.sequence = self.sequence.next();
self.updated = true;
self.db_map.insert(new_db.id, Arc::clone(&new_db.name));
for (table_id, table_def) in new_db.tables.iter() {
self.table_map
.entry(new_db.id)
.and_modify(|map| {
map.insert(*table_id, Arc::clone(&table_def.table_name));
})
.or_insert_with(|| {
let mut map = BiHashMap::new();
map.insert(*table_id, Arc::clone(&table_def.table_name));
map
});
}
}
} else {
if self.databases.len() >= Catalog::NUM_DBS_LIMIT {
Expand All @@ -499,18 +436,6 @@ impl InnerCatalog {
self.sequence = self.sequence.next();
self.updated = true;
self.db_map.insert(new_db.id, Arc::clone(&new_db.name));
for (table_id, table_def) in new_db.tables.iter() {
self.table_map
.entry(new_db.id)
.and_modify(|map| {
map.insert(*table_id, Arc::clone(&table_def.table_name));
})
.or_insert_with(|| {
let mut map = BiHashMap::new();
map.insert(*table_id, Arc::clone(&table_def.table_name));
map
});
}
}

Ok(())
Expand All @@ -532,6 +457,8 @@ pub struct DatabaseSchema {
pub name: Arc<str>,
/// The database is a map of tables
pub tables: BTreeMap<TableId, TableDefinition>,
#[serde_as(as = "TableMapAsArray")]
pub table_map: BiHashMap<TableId, Arc<str>>,
}

impl DatabaseSchema {
Expand All @@ -540,6 +467,7 @@ impl DatabaseSchema {
id,
name,
tables: BTreeMap::new(),
table_map: BiHashMap::new(),
}
}

Expand Down Expand Up @@ -636,10 +564,17 @@ impl DatabaseSchema {
}
}

// With the final list of updated/new tables update the current mapping
let new_table_maps = updated_or_new_tables
.iter()
.map(|(table_id, table_def)| (*table_id, Arc::clone(&table_def.table_name)))
.collect();

Ok(Some(Self {
id: self.id,
name: Arc::clone(&self.name),
tables: updated_or_new_tables,
table_map: new_table_maps,
}))
}
}
Expand Down Expand Up @@ -681,6 +616,14 @@ impl DatabaseSchema {
pub fn tables(&self) -> impl Iterator<Item = &TableDefinition> {
self.tables.values()
}

pub fn table_name_to_id(&self, table_name: Arc<str>) -> Option<TableId> {
self.table_map.get_by_right(&table_name).copied()
}

pub fn table_id_to_name(&self, table_id: TableId) -> Option<Arc<str>> {
self.table_map.get_by_left(&table_id).map(Arc::clone)
}
}

#[derive(Debug, Eq, PartialEq, Clone)]
Expand Down Expand Up @@ -960,6 +903,12 @@ mod tests {
id: DbId::from(0),
name: "test_db".into(),
tables: BTreeMap::new(),
table_map: {
let mut map = BiHashMap::new();
map.insert(TableId::from(1), "test_table_1".into());
map.insert(TableId::from(2), "test_table_2".into());
map
},
};
use InfluxColumnType::*;
use InfluxFieldType::*;
Expand Down Expand Up @@ -1106,6 +1055,7 @@ mod tests {
id: DbId::from(0),
name: "test".into(),
tables: BTreeMap::new(),
table_map: BiHashMap::new(),
};
database.tables.insert(
TableId::from(0),
Expand Down Expand Up @@ -1142,6 +1092,11 @@ mod tests {
id: DbId::from(0),
name: "test_db".into(),
tables: BTreeMap::new(),
table_map: {
let mut map = BiHashMap::new();
map.insert(TableId::from(1), "test_table_1".into());
map
},
};
use InfluxColumnType::*;
use InfluxFieldType::*;
Expand Down Expand Up @@ -1188,6 +1143,11 @@ mod tests {
id: DbId::from(0),
name: "test_db".into(),
tables: BTreeMap::new(),
table_map: {
let mut map = BiHashMap::new();
map.insert(TableId::from(0), "test".into());
map
},
};
use InfluxColumnType::*;
use InfluxFieldType::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,5 @@ expression: catalog
"sequence": 0,
"host_id": "dummy-host-id",
"instance_id": "instance-id",
"db_map": [],
"table_map": []
"db_map": []
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,5 @@ expression: catalog
"sequence": 0,
"host_id": "dummy-host-id",
"instance_id": "instance-id",
"db_map": [],
"table_map": []
"db_map": []
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,5 @@ expression: catalog
"sequence": 0,
"host_id": "dummy-host-id",
"instance_id": "instance-id",
"db_map": [],
"table_map": []
"db_map": []
}
8 changes: 6 additions & 2 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,9 @@ where
let table_id = self
.write_buffer
.catalog()
.table_name_to_id(db_id, table.as_str().into())
.db_schema(&db_id)
.expect("db should exist")
.table_name_to_id(table.as_str().into())
.ok_or_else(|| WriteBufferError::TableDoesNotExist)?;
match self
.write_buffer
Expand Down Expand Up @@ -748,7 +750,9 @@ where
let table_id = self
.write_buffer
.catalog()
.table_name_to_id(db_id, table.into())
.db_schema(&db_id)
.expect("db should exist")
.table_name_to_id(table.into())
.ok_or_else(|| WriteBufferError::TableDoesNotExist)?;
self.write_buffer
.delete_last_cache(db_id, table_id, &name)
Expand Down
10 changes: 2 additions & 8 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,7 @@ impl Database {

async fn query_table(&self, table_name: &str) -> Option<Arc<QueryTable>> {
let table_name = table_name.into();
let table_id = self
.write_buffer
.catalog()
.table_name_to_id(self.db_schema.id, Arc::clone(&table_name))?;
let table_id = self.db_schema.table_name_to_id(Arc::clone(&table_name))?;
self.db_schema.get_table_schema(table_id).map(|schema| {
Arc::new(QueryTable {
db_schema: Arc::clone(&self.db_schema),
Expand Down Expand Up @@ -515,10 +512,7 @@ impl SchemaProvider for Database {
}

fn table_exist(&self, name: &str) -> bool {
self.write_buffer
.catalog()
.table_name_to_id(self.db_schema.id, name.into())
.is_some()
self.db_schema.table_name_to_id(name.into()).is_some()
}
}

Expand Down
4 changes: 3 additions & 1 deletion influxdb3_server/src/system_tables/parquet_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ impl IoxSystemTable for ParquetFilesTable {
self.db_id,
self.buffer
.catalog()
.table_name_to_id(self.db_id, table_name.as_str().into())
.db_schema(&self.db_id)
.expect("db exists")
.table_name_to_id(table_name.as_str().into())
.expect("table exists"),
);

Expand Down
Loading

0 comments on commit c4534b0

Please sign in to comment.