Skip to content

Commit

Permalink
feat(meta): support time travel query on a per-table basis
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Sep 9, 2024
1 parent becb896 commit 28f9770
Show file tree
Hide file tree
Showing 13 changed files with 304 additions and 44 deletions.
1 change: 1 addition & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,7 @@ message CancelCompactTaskResponse {

message GetVersionByEpochRequest {
uint64 epoch = 1;
uint32 table_id = 2;
}

message GetVersionByEpochResponse {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model_v2/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod m20240702_080451_system_param_value;
mod m20240702_084927_unnecessary_fk;
mod m20240726_063833_auto_schema_change;
mod m20240806_143329_add_rate_limit_to_source_catalog;
mod m20240820_081248_add_time_travel_per_table_epoch;

pub struct Migrator;

Expand All @@ -45,6 +46,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240702_084927_unnecessary_fk::Migration),
Box::new(m20240726_063833_auto_schema_change::Migration),
Box::new(m20240806_143329_add_rate_limit_to_source_catalog::Migration),
Box::new(m20240820_081248_add_time_travel_per_table_epoch::Migration),
]
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

const TABLE_NAME: &str = "hummock_epoch_to_version";

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// modify PK
match manager.get_database_backend() {
sea_orm::DatabaseBackend::MySql => {
manager
.alter_table(
Table::alter()
.table(HummockEpochToVersion::Table)
.add_column(
ColumnDef::new(HummockEpochToVersion::TableId).big_integer(),
)
.to_owned(),
)
.await?;
manager
.get_connection()
.execute(sea_orm::Statement::from_string(
sea_orm::DatabaseBackend::MySql,
format!(
"ALTER TABLE {TABLE_NAME}
DROP PRIMARY KEY,
ADD PRIMARY KEY (epoch, table_id)"
),
))
.await?;
}
sea_orm::DatabaseBackend::Postgres => {
manager
.alter_table(
Table::alter()
.table(HummockEpochToVersion::Table)
.add_column(
ColumnDef::new(HummockEpochToVersion::TableId).big_integer(),
)
.to_owned(),
)
.await?;
manager
.get_connection()
.execute(sea_orm::Statement::from_string(
sea_orm::DatabaseBackend::Postgres,
format!("ALTER TABLE {TABLE_NAME} DROP CONSTRAINT {TABLE_NAME}_pkey"),
))
.await?;
manager
.get_connection()
.execute(sea_orm::Statement::from_string(
sea_orm::DatabaseBackend::Postgres,
format!("ALTER TABLE {TABLE_NAME} ADD PRIMARY KEY (epoch, table_id)"),
))
.await?;
}
sea_orm::DatabaseBackend::Sqlite => {
// sqlite is not for prod usage, so recreating the table is fine.
manager
.drop_table(
sea_orm_migration::prelude::Table::drop()
.table(HummockEpochToVersion::Table)
.if_exists()
.cascade()
.to_owned(),
)
.await?;

manager
.create_table(
Table::create()
.table(HummockEpochToVersion::Table)
.if_not_exists()
.col(
ColumnDef::new(HummockEpochToVersion::Epoch)
.big_integer()
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(HummockEpochToVersion::TableId)
.big_integer()
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(HummockEpochToVersion::VersionId)
.big_integer()
.not_null(),
)
.to_owned(),
)
.await?;
}
}
Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// The downgrade for MySql and Postgres may not work due to PK confliction.
match manager.get_database_backend() {
sea_orm::DatabaseBackend::MySql => {
manager
.get_connection()
.execute(sea_orm::Statement::from_string(
sea_orm::DatabaseBackend::MySql,
format!("ALTER TABLE {TABLE_NAME} DROP PRIMARY KEY"),
))
.await?;
manager
.alter_table(
Table::alter()
.table(HummockEpochToVersion::Table)
.drop_column(HummockEpochToVersion::TableId)
.to_owned(),
)
.await?;
manager
.get_connection()
.execute(sea_orm::Statement::from_string(
sea_orm::DatabaseBackend::MySql,
format!("ALTER TABLE {TABLE_NAME} ADD PRIMARY KEY (epoch)"),
))
.await?;
}
sea_orm::DatabaseBackend::Postgres => {
manager
.get_connection()
.execute(sea_orm::Statement::from_string(
sea_orm::DatabaseBackend::Postgres,
format!("ALTER TABLE {TABLE_NAME} DROP CONSTRAINT {TABLE_NAME}_pkey"),
))
.await?;
manager
.alter_table(
Table::alter()
.table(HummockEpochToVersion::Table)
.drop_column(HummockEpochToVersion::TableId)
.to_owned(),
)
.await?;
manager
.get_connection()
.execute(sea_orm::Statement::from_string(
sea_orm::DatabaseBackend::Postgres,
format!(
"ALTER TABLE {TABLE_NAME} ADD PRIMARY KEY (epoch)"
),
))
.await?;
}
sea_orm::DatabaseBackend::Sqlite => {
manager
.drop_table(
sea_orm_migration::prelude::Table::drop()
.table(HummockEpochToVersion::Table)
.if_exists()
.cascade()
.to_owned(),
)
.await?;

manager
.create_table(
Table::create()
.table(HummockEpochToVersion::Table)
.if_not_exists()
.col(
ColumnDef::new(HummockEpochToVersion::Epoch)
.big_integer()
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(HummockEpochToVersion::VersionId)
.big_integer()
.not_null(),
)
.to_owned(),
)
.await?;
}
}

Ok(())
}
}

#[derive(DeriveIden)]
enum HummockEpochToVersion {
Table,
Epoch,
TableId,
VersionId,
}
2 changes: 2 additions & 0 deletions src/meta/model_v2/src/hummock_epoch_to_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use crate::{Epoch, HummockVersionId};
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub epoch: Epoch,
#[sea_orm(primary_key, auto_increment = false)]
pub table_id: i64,
pub version_id: HummockVersionId,
}

Expand Down
7 changes: 5 additions & 2 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,11 @@ impl HummockManagerService for HummockServiceImpl {
&self,
request: Request<GetVersionByEpochRequest>,
) -> Result<Response<GetVersionByEpochResponse>, Status> {
let GetVersionByEpochRequest { epoch } = request.into_inner();
let version = self.hummock_manager.epoch_to_version(epoch).await?;
let GetVersionByEpochRequest { epoch, table_id } = request.into_inner();
let version = self
.hummock_manager
.epoch_to_version(epoch, table_id)
.await?;
Ok(Response::new(GetVersionByEpochResponse {
version: Some(version.to_protobuf()),
}))
Expand Down
19 changes: 18 additions & 1 deletion src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl HummockManager {
is_visible_table_committed_epoch,
new_compaction_group,
commit_sstables,
new_table_ids,
&new_table_ids,
new_table_watermarks,
change_log_delta,
);
Expand Down Expand Up @@ -289,6 +289,21 @@ impl HummockManager {
.values()
.map(|g| (g.group_id, g.parent_group_id))
.collect();
let time_travel_tables_to_commit =
new_table_ids
.iter()
.chain(tables_to_commit.iter().filter_map(|table_id| {
let Some(info) = version
.latest_version()
.state_table_info
.info()
.get(table_id)
else {
tracing::warn!("state table info not found for {table_id}");
return None;
};
Some((table_id, &info.compaction_group_id))
}));
let mut txn = sql_store.conn.begin().await?;
let version_snapshot_sst_ids = self
.write_time_travel_metadata(
Expand All @@ -297,6 +312,8 @@ impl HummockManager {
time_travel_delta,
&group_parents,
&versioning.last_time_travel_snapshot_sst_ids,
time_travel_tables_to_commit,
committed_epoch,
)
.await?;
commit_multi_var_with_provided_txn!(
Expand Down
Loading

0 comments on commit 28f9770

Please sign in to comment.