Skip to content

Commit

Permalink
feat: alter database ttl (#5035)
Browse files Browse the repository at this point in the history
* feat: alter databaset ttl

* fix: make clippy happy

* feat: add unset database option

* fix: happy ci

* fix: happy clippy

* chore: fmt toml

* fix: fix header

* refactor: introduce `AlterDatabaseKind`

* chore: apply suggestions from CR

* refactor: add unset database option support

* test: add unit tests

* test: add sqlness tests

* feat: invalidate schema name value cache

* Apply suggestions from code review

* chore: fmt

* chore: update error messages

* test: add more test cases

* test: add more test cases

* Apply suggestions from code review

* chore: apply suggestions from CR

---------

Co-authored-by: WenyXu <[email protected]>
  • Loading branch information
CookiePieWw and WenyXu authored Nov 21, 2024
1 parent 3029b47 commit 5f8d849
Show file tree
Hide file tree
Showing 41 changed files with 1,320 additions and 186 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0b90ddc7eb2e99ce15d1d62c5d41f76a139c5c28" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" }
hex = "0.4"
humantime = "2.1"
humantime-serde = "1.1"
Expand Down
3 changes: 2 additions & 1 deletion src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,14 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
match request.expr {
Some(Expr::CreateDatabase(_)) => "ddl.create_database",
Some(Expr::CreateTable(_)) => "ddl.create_table",
Some(Expr::Alter(_)) => "ddl.alter",
Some(Expr::AlterTable(_)) => "ddl.alter_table",
Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
Some(Expr::CreateFlow(_)) => "ddl.create_flow",
Some(Expr::DropFlow(_)) => "ddl.drop_flow",
Some(Expr::CreateView(_)) => "ddl.create_view",
Some(Expr::DropView(_)) => "ddl.drop_view",
Some(Expr::AlterDatabase(_)) => "ddl.alter_database",
None => "ddl.empty",
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl InformationSchemaSchemataBuilder {
.context(TableMetadataManagerSnafu)?
// information_schema is not available from this
// table_metadata_manager and we return None
.map(|schema_opts| format!("{schema_opts}"))
.map(|schema_opts| format!("{}", schema_opts.into_inner()))
} else {
None
};
Expand Down
6 changes: 3 additions & 3 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
AlterTableExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
QueryRequest, RequestHeader,
};
use arrow_flight::Ticket;
Expand Down Expand Up @@ -211,9 +211,9 @@ impl Database {
.await
}

pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
pub async fn alter(&self, expr: AlterTableExpr) -> Result<Output> {
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
expr: Some(DdlExpr::AlterTable(expr)),
}))
.await
}
Expand Down
18 changes: 9 additions & 9 deletions src/common/grpc-expr/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::column_def::as_fulltext_option;
use api::v1::{
column_def, AddColumnLocation as Location, AlterExpr, Analyzer, CreateTableExpr, DropColumns,
ModifyColumnTypes, RenameTable, SemanticType,
column_def, AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr,
DropColumns, ModifyColumnTypes, RenameTable, SemanticType,
};
use common_query::AddColumnLocation;
use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema};
Expand All @@ -36,8 +36,8 @@ use crate::error::{
const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;

/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<AlterTableRequest> {
/// Convert an [`AlterTableExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<AlterTableRequest> {
let catalog_name = expr.catalog_name;
let schema_name = expr.schema_name;
let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
Expand Down Expand Up @@ -203,7 +203,7 @@ mod tests {

#[test]
fn test_alter_expr_to_request() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),
Expand Down Expand Up @@ -244,7 +244,7 @@ mod tests {

#[test]
fn test_alter_expr_with_location_to_request() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),
Expand Down Expand Up @@ -321,7 +321,7 @@ mod tests {

#[test]
fn test_modify_column_type_expr() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_name: "monitor".to_string(),
Expand Down Expand Up @@ -355,7 +355,7 @@ mod tests {

#[test]
fn test_drop_column_expr() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_name: "monitor".to_string(),
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::{ClusterId, DatanodeId};

pub mod alter_database;
pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_database;
Expand Down
248 changes: 248 additions & 0 deletions src/common/meta/src/ddl/alter_database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::tracing::info;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use strum::AsRefStr;

use super::utils::handle_retry_error;
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::error::{Result, SchemaNotFoundSnafu};
use crate::instruction::CacheIdent;
use crate::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock};
use crate::rpc::ddl::UnsetDatabaseOption::{self};
use crate::rpc::ddl::{AlterDatabaseKind, AlterDatabaseTask, SetDatabaseOption};
use crate::ClusterId;

pub struct AlterDatabaseProcedure {
pub context: DdlContext,
pub data: AlterDatabaseData,
}

fn build_new_schema_value(
mut value: SchemaNameValue,
alter_kind: &AlterDatabaseKind,
) -> Result<SchemaNameValue> {
match alter_kind {
AlterDatabaseKind::SetDatabaseOptions(options) => {
for option in options.0.iter() {
match option {
SetDatabaseOption::Ttl(ttl) => {
if ttl.is_zero() {
value.ttl = None;
} else {
value.ttl = Some(*ttl);
}
}
}
}
}
AlterDatabaseKind::UnsetDatabaseOptions(keys) => {
for key in keys.0.iter() {
match key {
UnsetDatabaseOption::Ttl => value.ttl = None,
}
}
}
}
Ok(value)
}

impl AlterDatabaseProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterDatabase";

pub fn new(
cluster_id: ClusterId,
task: AlterDatabaseTask,
context: DdlContext,
) -> Result<Self> {
Ok(Self {
context,
data: AlterDatabaseData::new(task, cluster_id)?,
})
}

pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;

Ok(Self { context, data })
}

pub async fn on_prepare(&mut self) -> Result<Status> {
let value = self
.context
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey::new(self.data.catalog(), self.data.schema()))
.await?;

ensure!(
value.is_some(),
SchemaNotFoundSnafu {
table_schema: self.data.schema(),
}
);

self.data.schema_value = value;
self.data.state = AlterDatabaseState::UpdateMetadata;

Ok(Status::executing(true))
}

pub async fn on_update_metadata(&mut self) -> Result<Status> {
let schema_name = SchemaNameKey::new(self.data.catalog(), self.data.schema());

// Safety: schema_value is not None.
let current_schema_value = self.data.schema_value.as_ref().unwrap();

let new_schema_value = build_new_schema_value(
current_schema_value.get_inner_ref().clone(),
&self.data.kind,
)?;

self.context
.table_metadata_manager
.schema_manager()
.update(schema_name, current_schema_value, &new_schema_value)
.await?;

info!("Updated database metadata for schema {schema_name}");
self.data.state = AlterDatabaseState::InvalidateSchemaCache;
Ok(Status::executing(true))
}

pub async fn on_invalidate_schema_cache(&mut self) -> Result<Status> {
let cache_invalidator = &self.context.cache_invalidator;
cache_invalidator
.invalidate(
&Context::default(),
&[CacheIdent::SchemaName(SchemaName {
catalog_name: self.data.catalog().to_string(),
schema_name: self.data.schema().to_string(),
})],
)
.await?;

Ok(Status::done())
}
}

#[async_trait]
impl Procedure for AlterDatabaseProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
match self.data.state {
AlterDatabaseState::Prepare => self.on_prepare().await,
AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await,
}
.map_err(handle_retry_error)
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
let catalog = self.data.catalog();
let schema = self.data.schema();

let lock_key = vec![
CatalogLock::Read(catalog).into(),
SchemaLock::write(catalog, schema).into(),
];

LockKey::new(lock_key)
}
}

#[derive(Debug, Serialize, Deserialize, AsRefStr)]
enum AlterDatabaseState {
Prepare,
UpdateMetadata,
InvalidateSchemaCache,
}

/// The data of alter database procedure.
#[derive(Debug, Serialize, Deserialize)]
pub struct AlterDatabaseData {
cluster_id: ClusterId,
state: AlterDatabaseState,
kind: AlterDatabaseKind,
catalog_name: String,
schema_name: String,
schema_value: Option<DeserializedValueWithBytes<SchemaNameValue>>,
}

impl AlterDatabaseData {
pub fn new(task: AlterDatabaseTask, cluster_id: ClusterId) -> Result<Self> {
Ok(Self {
cluster_id,
state: AlterDatabaseState::Prepare,
kind: AlterDatabaseKind::try_from(task.alter_expr.kind.unwrap())?,
catalog_name: task.alter_expr.catalog_name,
schema_name: task.alter_expr.schema_name,
schema_value: None,
})
}

pub fn catalog(&self) -> &str {
&self.catalog_name
}

pub fn schema(&self) -> &str {
&self.schema_name
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use crate::ddl::alter_database::build_new_schema_value;
use crate::key::schema_name::SchemaNameValue;
use crate::rpc::ddl::{
AlterDatabaseKind, SetDatabaseOption, SetDatabaseOptions, UnsetDatabaseOption,
UnsetDatabaseOptions,
};

#[test]
fn test_build_new_schema_value() {
let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
SetDatabaseOption::Ttl(Duration::from_secs(10)),
]));
let current_schema_value = SchemaNameValue::default();
let new_schema_value =
build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10)));

let unset_ttl_alter_kind =
AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
UnsetDatabaseOption::Ttl,
]));
let new_schema_value =
build_new_schema_value(current_schema_value, &unset_ttl_alter_kind).unwrap();
assert_eq!(new_schema_value.ttl, None);
}
}
Loading

0 comments on commit 5f8d849

Please sign in to comment.