Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: deprecate old s3 and use use s3_v2 as default #17963

Merged
merged 8 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e_test/s3/fs_parquet_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _table():
test_timestamp timestamp,
test_timestamptz timestamptz,
) WITH (
connector = 's3_v2',
connector = 's3',
match_pattern = '*.parquet',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/s3/fs_source_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def _encode():
sex int,
mark int,
) WITH (
connector = 's3_v2',
connector = 's3',
match_pattern = '{prefix}*.{fmt}',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/s3/fs_source_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _encode():
sex int,
mark int,
) WITH (
connector = 's3_v2',
connector = 's3',
match_pattern = '{prefix}*.{fmt}',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/kinesis-s3-source/create_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ CREATE SOURCE ad_click (
bid_id BIGINT,
click_timestamp TIMESTAMPTZ
) WITH (
connector = 's3_v2',
connector = 's3',
s3.region_name = 'us-east-1',
s3.bucket_name = 'ad-click',
s3.credentials.access = 'test',
Expand Down
22 changes: 11 additions & 11 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType};
use risingwave_connector::source::test_source::TEST_CONNECTOR;
pub use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_connector::source::{
ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
KINESIS_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR,
Expand All @@ -73,7 +74,7 @@ use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::{DatabaseId, SchemaId};
use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
use crate::error::ErrorCode::{self, InvalidInputSyntax, NotSupported, ProtocolError};
use crate::error::{Result, RwError};
use crate::expr::Expr;
use crate::handler::create_table::{
Expand All @@ -88,8 +89,6 @@ use crate::session::SessionImpl;
use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options};
use crate::{bind_data_type, build_graph, OptimizerContext, WithOptions, WithOptionsSecResolved};

pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector";

/// Map a JSON schema to a relational schema
async fn extract_json_table_schema(
schema_config: &Option<(AstString, bool)>,
Expand Down Expand Up @@ -1117,10 +1116,18 @@ pub fn validate_compatibility(
source_schema: &ConnectorSchema,
props: &mut BTreeMap<String, String>,
) -> Result<()> {
let connector = props
let mut connector = props
.get_connector()
.ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_string())))?;

if connector == S3_CONNECTOR {
// S3 connector is deprecated, use OPENDAL_S3_CONNECTOR instead
// do s3 -> s3_v2 migration
let entry = props.get_mut(UPSTREAM_SOURCE_KEY).unwrap();
*entry = OPENDAL_S3_CONNECTOR.to_string();
connector = OPENDAL_S3_CONNECTOR.to_string();
}
Comment on lines +1132 to +1138
Copy link
Member

@fuyufjh fuyufjh Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will these lines work for an existing job?

I don't think it's necessary to help users write their SQL from connector='s3_v2' to 's3'. While on the other hand, it is necessary to do this for an existing job persisted in Meta.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only for new jobs. The existing jobs do not go through frontend again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xxchan's proposal #17963 (review) sounds the most reasonable to me, that is, rejecting the s3_v2 in frontend, but allow the existing ones. Can we do that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think the changes here are rewriting s3 -> s3_v2, which is necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rejecting the s3_v2 in frontend, but allow the existing ones. Can we do that?

Technically, yes.
My concern is from the product side. Rejecting a higher version but accepting a lower one seems strange. And what if we evolve a new s3 source, we'd call it s3_v2 or s3_v3?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving s3_v2 as an alias sounds acceptable to me, but please make sure all the docs & test cases are updated to s3.


let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
.get(&connector)
.ok_or_else(|| {
Expand Down Expand Up @@ -1150,13 +1157,6 @@ pub fn validate_compatibility(
}
}

if connector == S3_CONNECTOR {
return Err(RwError::from(Deprecated(
S3_CONNECTOR.to_string(),
OPENDAL_S3_CONNECTOR.to_string(),
)));
}

let compatible_encodes = compatible_formats
.get(&source_schema.format)
.ok_or_else(|| {
Expand Down
Loading