Skip to content

Commit

Permalink
feat: support vacuum inverted index (#17291)
Browse files Browse the repository at this point in the history
* feat: support vacuum inverted index

* rename function

* rename func

* introducing two type alias
  • Loading branch information
SkyFan2002 authored Jan 20, 2025
1 parent d3ccfb6 commit 4db02ec
Show file tree
Hide file tree
Showing 22 changed files with 904 additions and 22 deletions.
16 changes: 15 additions & 1 deletion src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use databend_common_meta_app::schema::GcDroppedTableReq;
use databend_common_meta_app::schema::GetDatabaseReq;
use databend_common_meta_app::schema::GetIndexReply;
use databend_common_meta_app::schema::GetMarkedDeletedIndexesReply;
use databend_common_meta_app::schema::GetMarkedDeletedTableIndexesReply;
use databend_common_meta_app::schema::GetTableCopiedFileReply;
use databend_common_meta_app::schema::GetTableCopiedFileReq;
use databend_common_meta_app::schema::GetTableReq;
Expand Down Expand Up @@ -165,12 +166,18 @@ pub trait SchemaApi: Send + Sync {
name_ident: &IndexNameIdent,
) -> Result<Option<GetIndexReply>, MetaError>;

async fn get_marked_deleted_indexes(
async fn list_marked_deleted_indexes(
&self,
tenant: &Tenant,
table_id: Option<u64>,
) -> Result<GetMarkedDeletedIndexesReply, MetaError>;

async fn list_marked_deleted_table_indexes(
&self,
tenant: &Tenant,
table_id: Option<u64>,
) -> Result<GetMarkedDeletedTableIndexesReply, MetaError>;

async fn update_index(
&self,
id_ident: IndexIdIdent,
Expand Down Expand Up @@ -402,4 +409,11 @@ pub trait SchemaApi: Send + Sync {
table_id: u64,
index_ids: &[u64],
) -> Result<(), MetaTxnError>;

async fn remove_marked_deleted_table_indexes(
&self,
tenant: &Tenant,
table_id: u64,
indexes: &[(String, String)],
) -> Result<(), MetaTxnError>;
}
126 changes: 123 additions & 3 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ use databend_common_meta_app::schema::index_name_ident::IndexName;
use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent;
use databend_common_meta_app::schema::marked_deleted_index_id::MarkedDeletedIndexId;
use databend_common_meta_app::schema::marked_deleted_index_ident::MarkedDeletedIndexIdIdent;
use databend_common_meta_app::schema::marked_deleted_table_index_id::MarkedDeletedTableIndexId;
use databend_common_meta_app::schema::marked_deleted_table_index_ident::MarkedDeletedTableIndexIdIdent;
use databend_common_meta_app::schema::table_niv::TableNIV;
use databend_common_meta_app::schema::CatalogIdToNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
Expand Down Expand Up @@ -113,6 +115,7 @@ use databend_common_meta_app::schema::GcDroppedTableReq;
use databend_common_meta_app::schema::GetDatabaseReq;
use databend_common_meta_app::schema::GetIndexReply;
use databend_common_meta_app::schema::GetMarkedDeletedIndexesReply;
use databend_common_meta_app::schema::GetMarkedDeletedTableIndexesReply;
use databend_common_meta_app::schema::GetTableCopiedFileReply;
use databend_common_meta_app::schema::GetTableCopiedFileReq;
use databend_common_meta_app::schema::GetTableReq;
Expand Down Expand Up @@ -847,7 +850,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

#[logcall::logcall]
#[fastrace::trace]
async fn get_marked_deleted_indexes(
async fn list_marked_deleted_indexes(
&self,
tenant: &Tenant,
table_id: Option<u64>,
Expand Down Expand Up @@ -2538,9 +2541,18 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// If the column ids and options do not change,
// use the old index version, otherwise create a new index version.
let mut old_version = None;
let mut mark_delete_op = None;
if let Some(old_index) = indexes.get(&req.name) {
if old_index.column_ids == req.column_ids && old_index.options == req.options {
old_version = Some(old_index.version.clone());
} else {
let (m_key, m_value) = mark_table_index_as_deleted(
&req.tenant,
req.table_id,
&req.name,
&old_index.version,
)?;
mark_delete_op = Some(TxnOp::put(m_key, m_value));
}
}
let version = old_version.unwrap_or(Uuid::new_v4().simple().to_string());
Expand All @@ -2554,14 +2566,18 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
};
indexes.insert(req.name.clone(), index);

let txn_req = TxnRequest::new(
let mut txn_req = TxnRequest::new(
//
vec![txn_cond_eq_seq(&tbid, tb_meta_seq)],
vec![
txn_op_put_pb(&tbid, &table_meta, None)?, // tb_id -> tb_meta
],
);

if let Some(mark_delete_op) = mark_delete_op {
txn_req.if_then.push(mark_delete_op);
}

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
Expand Down Expand Up @@ -2607,7 +2623,12 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
UnknownError::<IndexName>::new(req.name.clone(), "drop table index"),
)));
}
indexes.remove(&req.name);
let Some(index) = indexes.remove(&req.name) else {
return Ok(());
};

let (m_key, m_value) =
mark_table_index_as_deleted(&req.tenant, req.table_id, &req.name, &index.version)?;

let txn_req = TxnRequest::new(
vec![
Expand All @@ -2616,6 +2637,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
],
vec![
txn_op_put(&tbid, serialize_struct(&table_meta)?), // tb_id -> tb_meta
TxnOp::put(m_key, m_value),
],
);

Expand All @@ -2628,6 +2650,48 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

#[logcall::logcall]
#[fastrace::trace]
async fn list_marked_deleted_table_indexes(
&self,
tenant: &Tenant,
table_id: Option<u64>,
) -> Result<GetMarkedDeletedTableIndexesReply, MetaError> {
let dir = match table_id {
Some(table_id) => {
let ident = MarkedDeletedTableIndexIdIdent::new_generic(
tenant,
MarkedDeletedTableIndexId::new(
table_id,
"dummy".to_string(),
"dummy".to_string(),
),
);
DirName::new_with_level(ident, 2)
}
None => {
let ident = MarkedDeletedTableIndexIdIdent::new_generic(
tenant,
MarkedDeletedTableIndexId::new(0, "dummy".to_string(), "dummy".to_string()),
);
DirName::new_with_level(ident, 3)
}
};
let list_res = self.list_pb_vec(&dir).await?;
let mut table_indexes = HashMap::new();
for (k, v) in list_res {
let table_id = k.name().table_id;
let index_name = k.name().index_name.clone();
let index_version = k.name().index_version.clone();
let index_meta = v.data;
table_indexes
.entry(table_id)
.or_insert_with(Vec::new)
.push((index_name, index_version, index_meta));
}
Ok(GetMarkedDeletedTableIndexesReply { table_indexes })
}

#[logcall::logcall]
#[fastrace::trace]
async fn get_drop_table_infos(
Expand Down Expand Up @@ -3175,6 +3239,40 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}
}

#[logcall::logcall]
#[fastrace::trace]
async fn remove_marked_deleted_table_indexes(
&self,
tenant: &Tenant,
table_id: u64,
indexes: &[(String, String)],
) -> Result<(), MetaTxnError> {
let mut trials = txn_backoff(None, func_name!());

loop {
trials.next().unwrap()?.await;
let mut txn = TxnRequest::default();

for (index_name, index_version) in indexes {
txn.if_then
.push(txn_op_del(&MarkedDeletedTableIndexIdIdent::new_generic(
tenant,
MarkedDeletedTableIndexId::new(
table_id,
index_name.to_string(),
index_version.to_string(),
),
)));
}

let (succ, _responses) = send_txn(self, txn).await?;

if succ {
return Ok(());
}
}
}
}

async fn get_history_table_metas(
Expand Down Expand Up @@ -4208,3 +4306,25 @@ pub fn mark_index_as_deleted(
serialize_struct(&marked_deleted_index_meta)?,
))
}

/// add __fd_marked_deleted_table_index/<table_id>/<index_name>/<index_version> -> marked_deleted_table_index_meta
pub fn mark_table_index_as_deleted(
tenant: &Tenant,
table_id: u64,
index_name: &str,
index_version: &str,
) -> Result<(String, Vec<u8>), MetaError> {
let marked_deleted_table_index_id_ident = MarkedDeletedTableIndexIdIdent::new_generic(
tenant,
MarkedDeletedTableIndexId::new(table_id, index_name.to_owned(), index_version.to_owned()),
);
let marked_deleted_table_index_meta = MarkedDeletedIndexMeta {
dropped_on: Utc::now(),
index_type: MarkedDeletedIndexType::INVERTED,
};

Ok((
marked_deleted_table_index_id_ident.to_string_key(),
serialize_struct(&marked_deleted_table_index_meta)?,
))
}
Loading

0 comments on commit 4db02ec

Please sign in to comment.