Skip to content

Commit

Permalink
fix: misuse of props and options in CREATE SOURCE (#13762)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 authored Dec 15, 2023
1 parent 028f0ae commit c253bd3
Show file tree
Hide file tree
Showing 34 changed files with 522 additions and 202 deletions.
14 changes: 12 additions & 2 deletions dashboard/components/Relations.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ import { Fragment, useEffect, useState } from "react"
import Title from "../components/Title"
import extractColumnInfo from "../lib/extractInfo"
import { Relation, StreamingJob } from "../pages/api/streaming"
import { Table as RwTable } from "../proto/gen/catalog"
import {
Sink as RwSink,
Source as RwSource,
Table as RwTable,
} from "../proto/gen/catalog"

const ReactJson = loadable(() => import("react-json-view"))

Expand Down Expand Up @@ -98,7 +102,13 @@ export const primaryKeyColumn: Column<RwTable> = {
.join(", "),
}

export const connectorColumn: Column<Relation> = {
export const connectorColumnSource: Column<RwSource> = {
name: "Connector",
width: 3,
content: (r) => r.withProperties.connector ?? "unknown",
}

export const connectorColumnSink: Column<RwSink> = {
name: "Connector",
width: 3,
content: (r) => r.properties.connector ?? "unknown",
Expand Down
1 change: 0 additions & 1 deletion dashboard/pages/api/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ export interface Relation {
name: string
owner: number
columns: (ColumnCatalog | Field)[]
properties: { [key: string]: string }
}

export interface StreamingJob extends Relation {
Expand Down
7 changes: 5 additions & 2 deletions dashboard/pages/sinks.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
*/

import {
connectorColumn,
connectorColumnSink,
Relations,
streamingJobColumns,
} from "../components/Relations"
import { getSinks } from "./api/streaming"

export default function Sinks() {
return Relations("Sinks", getSinks, [connectorColumn, ...streamingJobColumns])
return Relations("Sinks", getSinks, [
connectorColumnSink,
...streamingJobColumns,
])
}
4 changes: 2 additions & 2 deletions dashboard/pages/sources.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import {
Column,
connectorColumn,
connectorColumnSource,
dependentsColumn,
Relations,
} from "../components/Relations"
Expand All @@ -32,7 +32,7 @@ export default function DataSources() {
}

return Relations("Sources", getSources, [
connectorColumn,
connectorColumnSource,
rowFormatColumn,
dependentsColumn,
])
Expand Down
2 changes: 1 addition & 1 deletion proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ message ScanRange {
message SourceNode {
uint32 source_id = 1;
repeated plan_common.ColumnCatalog columns = 2;
map<string, string> properties = 3;
map<string, string> with_properties = 3;
bytes split = 4;
catalog.StreamSourceInfo info = 5;
}
Expand Down
4 changes: 3 additions & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ message StreamSourceInfo {
// Whether the stream source is a cdc source streaming job.
// We need this field to differentiate the cdc source job until we fully implement risingwavelabs/rfcs#72.
bool cdc_source_job = 13;
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;
}

message Source {
Expand All @@ -81,7 +83,7 @@ message Source {
// specify a primary key, the vector will be empty.
repeated int32 pk_column_ids = 7;
// Properties specified by the user in WITH clause.
map<string, string> properties = 8;
map<string, string> with_properties = 8;

uint32 owner = 9;

Expand Down
4 changes: 2 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ message StreamSource {
repeated plan_common.ColumnCatalog columns = 4;
reserved "pk_column_ids";
reserved 5;
map<string, string> properties = 6;
map<string, string> with_properties = 6;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
// Streaming rate limit
Expand All @@ -184,7 +184,7 @@ message StreamFsFetch {
repeated plan_common.ColumnCatalog columns = 4;
reserved "pk_column_ids";
reserved 5;
map<string, string> properties = 6;
map<string, string> with_properties = 6;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
// Streaming rate limit
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ impl BoxedExecutorBuilder for SourceExecutor {

// prepare connector source
let source_props: HashMap<String, String> =
HashMap::from_iter(source_node.properties.clone());
HashMap::from_iter(source_node.with_properties.clone());
let config = ConnectorProperties::extract(source_props).map_err(BatchError::connector)?;

let info = source_node.get_info().unwrap();
let parser_config = SpecificParserConfig::new(info, &source_node.properties)?;
let parser_config = SpecificParserConfig::new(info, &source_node.with_properties)?;

let columns: Vec<_> = source_node
.columns
Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,11 @@ impl JsonParser {

pub async fn schema_to_columns(
schema_location: &str,
use_schema_registry: bool,
schema_registry_auth: Option<SchemaRegistryAuth>,
props: &HashMap<String, String>,
) -> anyhow::Result<Vec<ColumnDesc>> {
let url = handle_sr_list(schema_location)?;
let schema_content = if use_schema_registry {
let schema_registry_auth = SchemaRegistryAuth::from(props);
let schema_content = if let Some(schema_registry_auth) = schema_registry_auth {
let client = Client::new(url, &schema_registry_auth)?;
let topic = get_kafka_topic(props)?;
let resolver = ConfluentSchemaResolver::new(client);
Expand Down
18 changes: 9 additions & 9 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ pub enum ProtocolProperties {

impl SpecificParserConfig {
// The validity of (format, encode) is ensured by `extract_format_encode`
pub fn new(info: &StreamSourceInfo, props: &HashMap<String, String>) -> Result<Self> {
pub fn new(info: &StreamSourceInfo, with_properties: &HashMap<String, String>) -> Result<Self> {
let source_struct = extract_source_struct(info)?;
let format = source_struct.format;
let encode = source_struct.encode;
Expand Down Expand Up @@ -925,12 +925,12 @@ impl SpecificParserConfig {
config.enable_upsert = true;
}
if info.use_schema_registry {
config.topic = get_kafka_topic(props)?.clone();
config.client_config = SchemaRegistryAuth::from(props);
config.topic = get_kafka_topic(with_properties)?.clone();
config.client_config = SchemaRegistryAuth::from(&info.format_encode_options);
} else {
config.aws_auth_props = Some(
serde_json::from_value::<AwsAuthProps>(
serde_json::to_value(props).unwrap(),
serde_json::to_value(info.format_encode_options.clone()).unwrap(),
)
.map_err(|e| anyhow::anyhow!(e))?,
);
Expand All @@ -957,12 +957,12 @@ impl SpecificParserConfig {
config.enable_upsert = true;
}
if info.use_schema_registry {
config.topic = get_kafka_topic(props)?.clone();
config.client_config = SchemaRegistryAuth::from(props);
config.topic = get_kafka_topic(with_properties)?.clone();
config.client_config = SchemaRegistryAuth::from(&info.format_encode_options);
} else {
config.aws_auth_props = Some(
serde_json::from_value::<AwsAuthProps>(
serde_json::to_value(props).unwrap(),
serde_json::to_value(info.format_encode_options.clone()).unwrap(),
)
.map_err(|e| anyhow::anyhow!(e))?,
);
Expand All @@ -980,8 +980,8 @@ impl SpecificParserConfig {
.unwrap(),
key_record_name: info.key_message_name.clone(),
row_schema_location: info.row_schema_location.clone(),
topic: get_kafka_topic(props).unwrap().clone(),
client_config: SchemaRegistryAuth::from(props),
topic: get_kafka_topic(with_properties).unwrap().clone(),
client_config: SchemaRegistryAuth::from(&info.format_encode_options),
..Default::default()
})
}
Expand Down
28 changes: 14 additions & 14 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,56 +364,56 @@ pub trait SplitReader: Sized + Send {
for_all_sources!(impl_connector_properties);

impl ConnectorProperties {
pub fn is_new_fs_connector_b_tree_map(props: &BTreeMap<String, String>) -> bool {
props
pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap<String, String>) -> bool {
with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.eq_ignore_ascii_case(S3_V2_CONNECTOR))
.unwrap_or(false)
}

pub fn is_new_fs_connector_hash_map(props: &HashMap<String, String>) -> bool {
props
pub fn is_new_fs_connector_hash_map(with_properties: &HashMap<String, String>) -> bool {
with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.eq_ignore_ascii_case(S3_V2_CONNECTOR))
.unwrap_or(false)
}

pub fn rewrite_upstream_source_key_hash_map(props: &mut HashMap<String, String>) {
let connector = props.remove(UPSTREAM_SOURCE_KEY).unwrap();
pub fn rewrite_upstream_source_key_hash_map(with_properties: &mut HashMap<String, String>) {
let connector = with_properties.remove(UPSTREAM_SOURCE_KEY).unwrap();
match connector.as_str() {
S3_V2_CONNECTOR => {
tracing::info!(
"using new fs source, rewrite connector from '{}' to '{}'",
S3_V2_CONNECTOR,
S3_CONNECTOR
);
props.insert(UPSTREAM_SOURCE_KEY.to_string(), S3_CONNECTOR.to_string());
with_properties.insert(UPSTREAM_SOURCE_KEY.to_string(), S3_CONNECTOR.to_string());
}
_ => {
props.insert(UPSTREAM_SOURCE_KEY.to_string(), connector);
with_properties.insert(UPSTREAM_SOURCE_KEY.to_string(), connector);
}
}
}
}

impl ConnectorProperties {
pub fn extract(mut props: HashMap<String, String>) -> Result<Self> {
if Self::is_new_fs_connector_hash_map(&props) {
_ = props
pub fn extract(mut with_properties: HashMap<String, String>) -> Result<Self> {
if Self::is_new_fs_connector_hash_map(&with_properties) {
_ = with_properties
.remove(UPSTREAM_SOURCE_KEY)
.ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
return Ok(ConnectorProperties::S3(Box::new(
S3Properties::try_from_hashmap(props)?,
S3Properties::try_from_hashmap(with_properties)?,
)));
}

let connector = props
let connector = with_properties
.remove(UPSTREAM_SOURCE_KEY)
.ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
match_source_name_str!(
connector.to_lowercase().as_str(),
PropType,
PropType::try_from_hashmap(props).map(ConnectorProperties::from),
PropType::try_from_hashmap(with_properties).map(ConnectorProperties::from),
|other| Err(anyhow!("connector '{}' is not supported", other))
)
}
Expand Down
12 changes: 6 additions & 6 deletions src/connector/src/source/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub enum CdcTableType {
}

impl CdcTableType {
pub fn from_properties(properties: &HashMap<String, String>) -> Self {
let connector = properties
pub fn from_properties(with_properties: &HashMap<String, String>) -> Self {
let connector = with_properties
.get("connector")
.map(|c| c.to_ascii_lowercase())
.unwrap_or_default();
Expand All @@ -63,12 +63,12 @@ impl CdcTableType {

pub async fn create_table_reader(
&self,
properties: HashMap<String, String>,
with_properties: HashMap<String, String>,
schema: Schema,
) -> ConnectorResult<ExternalTableReaderImpl> {
match self {
Self::MySql => Ok(ExternalTableReaderImpl::MySql(
MySqlExternalTableReader::new(properties, schema).await?,
MySqlExternalTableReader::new(with_properties, schema).await?,
)),
_ => bail!(ConnectorError::Config(anyhow!(
"invalid external table type: {:?}",
Expand Down Expand Up @@ -291,13 +291,13 @@ impl ExternalTableReader for MySqlExternalTableReader {

impl MySqlExternalTableReader {
pub async fn new(
properties: HashMap<String, String>,
with_properties: HashMap<String, String>,
rw_schema: Schema,
) -> ConnectorResult<Self> {
tracing::debug!(?rw_schema, "create mysql external table reader");

let config = serde_json::from_value::<ExternalTableConfig>(
serde_json::to_value(properties).unwrap(),
serde_json::to_value(with_properties).unwrap(),
)
.map_err(|e| {
ConnectorError::Config(anyhow!("fail to extract mysql connector properties: {}", e))
Expand Down
22 changes: 22 additions & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,25 @@ pub use mock_external_table::MockExternalTableReader;
pub use crate::source::filesystem::{S3_CONNECTOR, S3_V2_CONNECTOR};
pub use crate::source::nexmark::NEXMARK_CONNECTOR;
pub use crate::source::pulsar::PULSAR_CONNECTOR;

pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool {
const PREFIXES: &[&str] = &[
"schema.registry",
"schema.location",
"message",
"key.message",
"without_header",
"delimiter",
// AwsAuthProps
"region",
"endpoint_url",
"access_key",
"secret_key",
"session_token",
"arn",
"external_id",
"profile",
];
PREFIXES.iter().any(|prefix| key.starts_with(prefix))
|| (key == "endpoint" && !connector.eq_ignore_ascii_case(KINESIS_CONNECTOR))
}
9 changes: 4 additions & 5 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use risingwave_pb::catalog::{PbSource, StreamSourceInfo, WatermarkDesc};
use super::{ColumnId, ConnectionId, DatabaseId, OwnedByUserCatalog, SchemaId, SourceId};
use crate::catalog::TableId;
use crate::user::UserId;
use crate::WithOptions;

/// This struct `SourceCatalog` is used in frontend.
/// Compared with `PbSource`, it only maintains information used during optimization.
Expand All @@ -36,7 +35,7 @@ pub struct SourceCatalog {
pub owner: UserId,
pub info: StreamSourceInfo,
pub row_id_index: Option<usize>,
pub properties: BTreeMap<String, String>,
pub with_properties: BTreeMap<String, String>,
pub watermark_descs: Vec<WatermarkDesc>,
pub associated_table_id: Option<TableId>,
pub definition: String,
Expand All @@ -61,7 +60,7 @@ impl SourceCatalog {
row_id_index: self.row_id_index.map(|idx| idx as _),
columns: self.columns.iter().map(|c| c.to_protobuf()).collect(),
pk_column_ids: self.pk_col_ids.iter().map(Into::into).collect(),
properties: self.properties.clone().into_iter().collect(),
with_properties: self.with_properties.clone().into_iter().collect(),
owner: self.owner,
info: Some(self.info.clone()),
watermark_descs: self.watermark_descs.clone(),
Expand Down Expand Up @@ -93,7 +92,7 @@ impl From<&PbSource> for SourceCatalog {
.into_iter()
.map(Into::into)
.collect();
let with_options = WithOptions::new(prost.properties.clone());
let with_properties = prost.with_properties.clone().into_iter().collect();
let columns = prost_columns.into_iter().map(ColumnCatalog::from).collect();
let row_id_index = prost.row_id_index.map(|idx| idx as _);

Expand All @@ -120,7 +119,7 @@ impl From<&PbSource> for SourceCatalog {
owner,
info: prost.info.clone().unwrap(),
row_id_index,
properties: with_options.into_inner(),
with_properties,
watermark_descs,
associated_table_id: associated_table_id.map(|x| x.into()),
definition: prost.definition.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl SysCatalogReaderImpl {
Some(ScalarImpl::Int32(source.owner as i32)),
Some(ScalarImpl::Utf8(
source
.properties
.with_properties
.get(UPSTREAM_SOURCE_KEY)
.cloned()
.unwrap_or("".to_string())
Expand Down
Loading

0 comments on commit c253bd3

Please sign in to comment.