diff --git a/Cargo.lock b/Cargo.lock index e481583bffea..2238b680e6bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1853,6 +1853,7 @@ dependencies = [ "arrow-flight", "async-stream", "async-trait", + "base64 0.21.3", "chrono", "common-catalog", "common-error", diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 98aa010a4d65..f2ba42293209 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -12,6 +12,7 @@ api = { workspace = true } arrow-flight.workspace = true async-stream.workspace = true async-trait.workspace = true +base64 = "0.21" common-catalog = { workspace = true } common-error = { workspace = true } common-grpc-expr.workspace = true diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index aed5fa1131b3..0dfa5f507156 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -21,6 +21,8 @@ use api::v1::meta::{ SubmitDdlTaskResponse as PbSubmitDdlTaskResponse, TruncateTableTask as PbTruncateTableTask, }; use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, TruncateTableExpr}; +use base64::engine::general_purpose; +use base64::Engine as _; use prost::Message; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -287,7 +289,8 @@ impl Serialize for CreateTableTask { table_info, }; let buf = pb.encode_to_vec(); - serializer.serialize_bytes(&buf) + let encoded = general_purpose::STANDARD_NO_PAD.encode(buf); + serializer.serialize_str(&encoded) } } @@ -296,7 +299,10 @@ impl<'de> Deserialize<'de> for CreateTableTask { where D: serde::Deserializer<'de>, { - let buf = Vec::::deserialize(deserializer)?; + let encoded = String::deserialize(deserializer)?; + let buf = general_purpose::STANDARD_NO_PAD + .decode(encoded) + .map_err(|err| serde::de::Error::custom(err.to_string()))?; let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf) .map_err(|err| serde::de::Error::custom(err.to_string()))?; @@ -353,7 +359,8 @@ impl Serialize for AlterTableTask { alter_table: Some(self.alter_table.clone()), }; let buf = pb.encode_to_vec(); - serializer.serialize_bytes(&buf) + let encoded = general_purpose::STANDARD_NO_PAD.encode(buf); + serializer.serialize_str(&encoded) } } @@ -362,7 +369,10 @@ impl<'de> Deserialize<'de> for AlterTableTask { where D: serde::Deserializer<'de>, { - let buf = Vec::::deserialize(deserializer)?; + let encoded = String::deserialize(deserializer)?; + let buf = general_purpose::STANDARD_NO_PAD + .decode(encoded) + .map_err(|err| serde::de::Error::custom(err.to_string()))?; let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf) .map_err(|err| serde::de::Error::custom(err.to_string()))?; @@ -425,12 +435,12 @@ impl TryFrom for TruncateTableTask { mod tests { use std::sync::Arc; - use api::v1::CreateTableExpr; + use api::v1::{AlterExpr, CreateTableExpr}; use datatypes::schema::SchemaBuilder; use table::metadata::RawTableInfo; use table::test_util::table_info::test_table_info; - use super::CreateTableTask; + use super::{AlterTableTask, CreateTableTask}; #[test] fn test_basic_ser_de_create_table_task() { @@ -447,4 +457,16 @@ mod tests { let de = serde_json::from_slice(&output).unwrap(); assert_eq!(task, de); } + + #[test] + fn test_basic_ser_de_alter_table_task() { + let task = AlterTableTask { + alter_table: AlterExpr::default(), + }; + + let output = serde_json::to_vec(&task).unwrap(); + + let de = serde_json::from_slice(&output).unwrap(); + assert_eq!(task, de); + } }