Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion committed Jul 26, 2024
1 parent 18cf195 commit 316b035
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 22 deletions.
11 changes: 11 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,16 @@ message ApplyThrottleResponse {
common.Status status = 1;
}

message AlterConnectorConfigRequest {
uint32 source_id = 1;
bool require_clean_state = 2;
map<string, string> config = 3;
}

message AlterConnectorConfigResponse {
common.Status status = 1;
}

message RecoverRequest {}

message RecoverResponse {}
Expand All @@ -301,6 +311,7 @@ service StreamManagerService {
rpc ListObjectDependencies(ListObjectDependenciesRequest) returns (ListObjectDependenciesResponse);
rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse);
rpc Recover(RecoverRequest) returns (RecoverResponse);
rpc AlterConnectorConfig(AlterConnectorConfigRequest) returns (AlterConnectorConfigResponse);
}

// Below for cluster service.
Expand Down
25 changes: 16 additions & 9 deletions src/frontend/src/handler/alter_table_connector_attr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_connector::bail_invalid_option_error;
use risingwave_common::{bail, bail_not_implemented};
use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_sqlparser::ast::{ObjectName, Value, WithProperties};

Expand All @@ -31,11 +31,17 @@ pub fn handle_alter_table_connector_attr(
let original_table = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;

if original_table.associated_source_id.is_none() {
bail_invalid_option_error!("Cannot alter a table without connector")
bail!("Cannot alter a table without connector")
}
let clean_state_flag = check_attr(&mut new_attr)?;

return PgResponse::empty_result(StatementType::ALTER_TABLE);
if clean_state_flag {
bail_not_implemented!(
"alter connector params requiring clean its state is not implemented yet, {} should be set to false", REQUIRE_CLEAN_STATE
);
}

Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}

fn check_attr(attr: &mut WithProperties) -> crate::error::Result<bool> {
Expand All @@ -49,7 +55,7 @@ fn check_attr(attr: &mut WithProperties) -> crate::error::Result<bool> {
.real_value()
.eq_ignore_ascii_case(UPSTREAM_SOURCE_KEY)
}) {
bail_invalid_option_error!("cannot alter attribute {}", UPSTREAM_SOURCE_KEY);
bail!("cannot alter attribute {}", UPSTREAM_SOURCE_KEY);
}

for idx in 0..attr.0.len() {
Expand All @@ -62,18 +68,19 @@ fn check_attr(attr: &mut WithProperties) -> crate::error::Result<bool> {
if let Value::Boolean(b) = &attr.0[idx].value {
clean_state_flag = *b
} else {
bail_invalid_option_error!("{} should be a boolean", REQUIRE_CLEAN_STATE);
bail!("{} should be a boolean", REQUIRE_CLEAN_STATE);
}

attr.0.remove(idx);
break;
}
attr.0.remove(idx);
break;
}
if !contain_clean_state_flag {
bail_invalid_option_error!(
bail!(
"{} should be contained in the WITH clause",
REQUIRE_CLEAN_STATE
)
}

return Ok(clean_state_flag);
Ok(clean_state_flag)
}
2 changes: 1 addition & 1 deletion src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ pub async fn handle(
}
Statement::AlterTable {
name,
operation: AlterTableOperation::AlterConnectorAttr { attr },
operation: AlterTableOperation::AlterConnectorConfig { config: attr },
} => handle_alter_table_connector_attr(handler_args, name, attr),
Statement::AlterIndex {
name,
Expand Down
6 changes: 6 additions & 0 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,4 +423,10 @@ impl StreamManagerService for StreamServiceImpl {
.await;
Ok(Response::new(RecoverResponse {}))
}

async fn alter_connector_config(
&self,
request: Request<AlterConnectorConfigRequest>,
) -> Result<Response<AlterConnectorConfigResponse>, Status> {
}
}
7 changes: 5 additions & 2 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ pub enum AlterTableOperation {
SetStreamingRateLimit {
rate_limit: i32,
},
AlterConnectorAttr {
attr: WithProperties,
AlterConnectorConfig {
config: WithProperties,
},
}

Expand Down Expand Up @@ -296,6 +296,9 @@ impl fmt::Display for AlterTableOperation {
AlterTableOperation::SetStreamingRateLimit { rate_limit } => {
write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit)
}
AlterTableOperation::AlterConnectorConfig { config } => {
write!(f, " CONNECTOR CONFIG {}", config)
}
}
}
}
Expand Down
19 changes: 9 additions & 10 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3094,8 +3094,15 @@ impl Parser<'_> {
cascade,
}
} else if self.parse_keyword(Keyword::ALTER) {
if self.parse_keyword(Keyword::COLUMN) {
// ALTER COLUMN
if self.parse_keyword(Keyword::CONNECTOR) {
// ALTER CONNECTOR WITH (...)
let config = self.parse_with_properties()?;
AlterTableOperation::AlterConnectorConfig {
config: WithProperties(config),
}
} else {
// ALTER [ COLUMN ] column_name SET/DROP NOT NULL
let _ = self.parse_keyword(Keyword::COLUMN);
let column_name = self.parse_identifier_non_reserved()?;
let op = if self.parse_keywords(&[Keyword::SET, Keyword::NOT, Keyword::NULL]) {
AlterColumnOperation::SetNotNull {}
Expand Down Expand Up @@ -3123,14 +3130,6 @@ impl Parser<'_> {
);
};
AlterTableOperation::AlterColumn { column_name, op }
} else if self.parse_keyword(Keyword::CONNECTOR) {
// ALTER CONNECTOR WITH (...)
let attr = self.parse_with_properties()?;
AlterTableOperation::AlterConnectorAttr {
attr: WithProperties(attr),
}
} else {
return self.expected("COLUMN or CONNECTOR after ALTER");
}
} else if self.parse_keywords(&[Keyword::REFRESH, Keyword::SCHEMA]) {
AlterTableOperation::RefreshSchema
Expand Down

0 comments on commit 316b035

Please sign in to comment.