Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support vacuum inverted index #17291

Merged
merged 5 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use databend_common_meta_app::schema::GcDroppedTableReq;
use databend_common_meta_app::schema::GetDatabaseReq;
use databend_common_meta_app::schema::GetIndexReply;
use databend_common_meta_app::schema::GetMarkedDeletedIndexesReply;
use databend_common_meta_app::schema::GetMarkedDeletedTableIndexesReply;
use databend_common_meta_app::schema::GetTableCopiedFileReply;
use databend_common_meta_app::schema::GetTableCopiedFileReq;
use databend_common_meta_app::schema::GetTableReq;
Expand Down Expand Up @@ -165,12 +166,18 @@ pub trait SchemaApi: Send + Sync {
name_ident: &IndexNameIdent,
) -> Result<Option<GetIndexReply>, MetaError>;

async fn get_marked_deleted_indexes(
async fn list_marked_deleted_indexes(
&self,
tenant: &Tenant,
table_id: Option<u64>,
) -> Result<GetMarkedDeletedIndexesReply, MetaError>;

async fn list_marked_deleted_table_indexes(
&self,
tenant: &Tenant,
table_id: Option<u64>,
) -> Result<GetMarkedDeletedTableIndexesReply, MetaError>;

async fn update_index(
&self,
id_ident: IndexIdIdent,
Expand Down Expand Up @@ -402,4 +409,11 @@ pub trait SchemaApi: Send + Sync {
table_id: u64,
index_ids: &[u64],
) -> Result<(), MetaTxnError>;

async fn remove_marked_deleted_table_indexes(
&self,
tenant: &Tenant,
table_id: u64,
indexes: &[(String, String)],
) -> Result<(), MetaTxnError>;
}
126 changes: 123 additions & 3 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ use databend_common_meta_app::schema::index_name_ident::IndexName;
use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent;
use databend_common_meta_app::schema::marked_deleted_index_id::MarkedDeletedIndexId;
use databend_common_meta_app::schema::marked_deleted_index_ident::MarkedDeletedIndexIdIdent;
use databend_common_meta_app::schema::marked_deleted_table_index_id::MarkedDeletedTableIndexId;
use databend_common_meta_app::schema::marked_deleted_table_index_ident::MarkedDeletedTableIndexIdIdent;
use databend_common_meta_app::schema::table_niv::TableNIV;
use databend_common_meta_app::schema::CatalogIdToNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
Expand Down Expand Up @@ -113,6 +115,7 @@ use databend_common_meta_app::schema::GcDroppedTableReq;
use databend_common_meta_app::schema::GetDatabaseReq;
use databend_common_meta_app::schema::GetIndexReply;
use databend_common_meta_app::schema::GetMarkedDeletedIndexesReply;
use databend_common_meta_app::schema::GetMarkedDeletedTableIndexesReply;
use databend_common_meta_app::schema::GetTableCopiedFileReply;
use databend_common_meta_app::schema::GetTableCopiedFileReq;
use databend_common_meta_app::schema::GetTableReq;
Expand Down Expand Up @@ -847,7 +850,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

#[logcall::logcall]
#[fastrace::trace]
async fn get_marked_deleted_indexes(
async fn list_marked_deleted_indexes(
&self,
tenant: &Tenant,
table_id: Option<u64>,
Expand Down Expand Up @@ -2538,9 +2541,18 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// If the column ids and options do not change,
// use the old index version, otherwise create a new index version.
let mut old_version = None;
let mut mark_delete_op = None;
if let Some(old_index) = indexes.get(&req.name) {
if old_index.column_ids == req.column_ids && old_index.options == req.options {
old_version = Some(old_index.version.clone());
} else {
let (m_key, m_value) = mark_table_index_as_deleted(
&req.tenant,
req.table_id,
&req.name,
&old_index.version,
)?;
mark_delete_op = Some(TxnOp::put(m_key, m_value));
}
}
let version = old_version.unwrap_or(Uuid::new_v4().simple().to_string());
Expand All @@ -2554,14 +2566,18 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
};
indexes.insert(req.name.clone(), index);

let txn_req = TxnRequest::new(
let mut txn_req = TxnRequest::new(
//
vec![txn_cond_eq_seq(&tbid, tb_meta_seq)],
vec![
txn_op_put_pb(&tbid, &table_meta, None)?, // tb_id -> tb_meta
],
);

if let Some(mark_delete_op) = mark_delete_op {
txn_req.if_then.push(mark_delete_op);
}

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
Expand Down Expand Up @@ -2607,7 +2623,12 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
UnknownError::<IndexName>::new(req.name.clone(), "drop table index"),
)));
}
indexes.remove(&req.name);
let Some(index) = indexes.remove(&req.name) else {
return Ok(());
};

let (m_key, m_value) =
mark_table_index_as_deleted(&req.tenant, req.table_id, &req.name, &index.version)?;

let txn_req = TxnRequest::new(
vec![
Expand All @@ -2616,6 +2637,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
],
vec![
txn_op_put(&tbid, serialize_struct(&table_meta)?), // tb_id -> tb_meta
TxnOp::put(m_key, m_value),
],
);

Expand All @@ -2628,6 +2650,48 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

#[logcall::logcall]
#[fastrace::trace]
async fn list_marked_deleted_table_indexes(
&self,
tenant: &Tenant,
table_id: Option<u64>,
) -> Result<GetMarkedDeletedTableIndexesReply, MetaError> {
let dir = match table_id {
Some(table_id) => {
let ident = MarkedDeletedTableIndexIdIdent::new_generic(
tenant,
MarkedDeletedTableIndexId::new(
table_id,
"dummy".to_string(),
"dummy".to_string(),
),
);
DirName::new_with_level(ident, 2)
}
None => {
let ident = MarkedDeletedTableIndexIdIdent::new_generic(
tenant,
MarkedDeletedTableIndexId::new(0, "dummy".to_string(), "dummy".to_string()),
);
DirName::new_with_level(ident, 3)
}
};
let list_res = self.list_pb_vec(&dir).await?;
let mut table_indexes = HashMap::new();
for (k, v) in list_res {
let table_id = k.name().table_id;
let index_name = k.name().index_name.clone();
let index_version = k.name().index_version.clone();
let index_meta = v.data;
table_indexes
.entry(table_id)
.or_insert_with(Vec::new)
.push((index_name, index_version, index_meta));
}
Ok(GetMarkedDeletedTableIndexesReply { table_indexes })
}

#[logcall::logcall]
#[fastrace::trace]
async fn get_drop_table_infos(
Expand Down Expand Up @@ -3175,6 +3239,40 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}
}

#[logcall::logcall]
#[fastrace::trace]
async fn remove_marked_deleted_table_indexes(
&self,
tenant: &Tenant,
table_id: u64,
indexes: &[(String, String)],
) -> Result<(), MetaTxnError> {
let mut trials = txn_backoff(None, func_name!());

loop {
trials.next().unwrap()?.await;
let mut txn = TxnRequest::default();

for (index_name, index_version) in indexes {
txn.if_then
.push(txn_op_del(&MarkedDeletedTableIndexIdIdent::new_generic(
tenant,
MarkedDeletedTableIndexId::new(
table_id,
index_name.to_string(),
index_version.to_string(),
),
)));
}

let (succ, _responses) = send_txn(self, txn).await?;

if succ {
return Ok(());
}
}
}
}

async fn get_history_table_metas(
Expand Down Expand Up @@ -4208,3 +4306,25 @@ pub fn mark_index_as_deleted(
serialize_struct(&marked_deleted_index_meta)?,
))
}

/// add __fd_marked_deleted_table_index/<table_id>/<index_name>/<index_version> -> marked_deleted_table_index_meta
pub fn mark_table_index_as_deleted(
tenant: &Tenant,
table_id: u64,
index_name: &str,
index_version: &str,
) -> Result<(String, Vec<u8>), MetaError> {
let marked_deleted_table_index_id_ident = MarkedDeletedTableIndexIdIdent::new_generic(
tenant,
MarkedDeletedTableIndexId::new(table_id, index_name.to_owned(), index_version.to_owned()),
);
let marked_deleted_table_index_meta = MarkedDeletedIndexMeta {
dropped_on: Utc::now(),
index_type: MarkedDeletedIndexType::INVERTED,
};

Ok((
marked_deleted_table_index_id_ident.to_string_key(),
serialize_struct(&marked_deleted_table_index_meta)?,
))
}
Loading
Loading