Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce include clause to add additional connector columns #13707

Merged
merged 60 commits into from
Dec 20, 2023
Merged
Changes from 1 commit
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
648dce1
stash
tabVersion Nov 29, 2023
16f451a
Merge remote-tracking branch 'origin' into tab/include-opts
tabVersion Nov 29, 2023
5b65a4e
fix compilation
xxchan Nov 29, 2023
37d8192
stash
tabVersion Nov 29, 2023
05147aa
Merge branch 'tab/include-opts' of https://github.com/singularity-dat…
tabVersion Nov 29, 2023
d7c39c4
feat: Refactor source and table creation handling
tabVersion Nov 29, 2023
922fa1c
add new field in ColumnDesc
tabVersion Nov 30, 2023
7b268cc
bind_source_pk
tabVersion Nov 30, 2023
842d722
add license
tabVersion Nov 30, 2023
bbf74fb
Merge branch 'main' into tab/include-opts
tabVersion Dec 2, 2023
c31f624
feat: Implement additional column validation for various formats
tabVersion Dec 4, 2023
981a74c
stash
tabVersion Dec 4, 2023
90c1d92
Merge remote-tracking branch 'origin' into tab/include-opts
tabVersion Dec 4, 2023
ebfe20b
format
tabVersion Dec 4, 2023
30c5768
stash
tabVersion Dec 5, 2023
0ce99cc
fix
tabVersion Dec 5, 2023
c67f138
compatible with prev version
tabVersion Dec 6, 2023
cc94cc8
Merge remote-tracking branch 'origin' into tab/include-opts
tabVersion Dec 6, 2023
f4c4e44
merge fix
tabVersion Dec 6, 2023
4f6bca2
change parser trait, stash
tabVersion Dec 7, 2023
2d0ad74
rerun
tabVersion Dec 7, 2023
6fb5922
stash
tabVersion Dec 10, 2023
383e844
Revert "change parser trait, stash"
tabVersion Dec 10, 2023
64e6656
fix
tabVersion Dec 10, 2023
b2cafab
stash
tabVersion Dec 10, 2023
ac03436
refactor: Refactor and standardize the `access_field` function and im…
tabVersion Dec 11, 2023
fece45b
Refactor parsing logic and imports in connector code
tabVersion Dec 11, 2023
74c3f86
stash
tabVersion Dec 11, 2023
67a14e5
Merge branch 'main' into tab/include-opts
tabVersion Dec 12, 2023
1a9653f
fix
tabVersion Dec 12, 2023
f45a0a9
fix
tabVersion Dec 12, 2023
7650a9f
fix e2e
tabVersion Dec 13, 2023
104a478
fix e2e
tabVersion Dec 13, 2023
4926b84
format
tabVersion Dec 13, 2023
00b1ae8
fix test
tabVersion Dec 13, 2023
33c1354
fix test
tabVersion Dec 14, 2023
32f13eb
Merge branch 'main' into tab/include-opts
tabVersion Dec 14, 2023
c71f49b
pre delete json parser
tabVersion Dec 14, 2023
30f9516
remove unwrap array for kafka
tabVersion Dec 15, 2023
4c05e90
feat: Refactor handling of exempted connectors and addition columns
tabVersion Dec 15, 2023
8c2ca18
fix
tabVersion Dec 15, 2023
0377f84
fix
tabVersion Dec 15, 2023
131faf5
fix
tabVersion Dec 15, 2023
285e3a2
fix
tabVersion Dec 16, 2023
6c797b7
fix
tabVersion Dec 16, 2023
4f8600c
format
tabVersion Dec 16, 2023
ec06a13
fix broker addr
tabVersion Dec 16, 2023
3c9b563
refactor: Refactor primary key definitions in test database
tabVersion Dec 18, 2023
bdaabc4
Merge remote-tracking branch 'origin' into tab/include-opts
tabVersion Dec 18, 2023
4c70a6b
fix
tabVersion Dec 18, 2023
9904da0
remove legacy avro behavior
tabVersion Dec 18, 2023
a4c6ba6
refactor: Refactor additional columns handling and imports
tabVersion Dec 19, 2023
4da8dbf
change additionalColumn_type to Normal rather than Unspecified
tabVersion Dec 19, 2023
d38e038
add version for column_desc
tabVersion Dec 19, 2023
334f6c0
Merge remote-tracking branch 'origin' into tab/include-opts
tabVersion Dec 19, 2023
be6d28e
resolve comments
tabVersion Dec 19, 2023
c2c3642
resolve comments
tabVersion Dec 19, 2023
412e4bc
refactor: Refactor error messages and handling for source creation wi…
tabVersion Dec 19, 2023
8e72517
fix
tabVersion Dec 19, 2023
46df0fc
Merge branch 'main' into tab/include-opts
tabVersion Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
compatible with prev version
  • Loading branch information
tabVersion committed Dec 6, 2023
commit c67f13899fcdf383287773204421cd48985a095f
4 changes: 4 additions & 0 deletions src/connector/src/source/manager.rs
Original file line number Diff line number Diff line change
@@ -33,6 +33,10 @@ pub struct SourceColumnDesc {

// `is_pk` is used to indicate whether the column is part of the primary key columns.
pub is_pk: bool,

// `additional_column_type` and `column_type` are orthogonal
// `additional_column_type` is used to indicate the column is from which part of the message
// `column_type` is used to indicate the type of the column, only used in cdc scenario
pub additional_column_type: AdditionalColumnType,
}

27 changes: 26 additions & 1 deletion src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
@@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::{ColumnId, Schema, TableId};
use risingwave_common::catalog::{ColumnId, Schema, TableId, DEFAULT_KEY_COLUMN_NAME};
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::source::external::{CdcTableType, SchemaTableName};
use risingwave_connector::source::{ConnectorProperties, SourceCtrlOpts};
use risingwave_pb::data::data_type::TypeName as PbTypeName;
use risingwave_pb::plan_common::{AdditionalColumnType, FormatType, PbEncodeType};
use risingwave_pb::stream_plan::SourceNode;
use risingwave_source::source_desc::SourceDescBuilder;
use risingwave_storage::panic_store::PanicStateStore;
@@ -53,6 +55,29 @@ impl ExecutorBuilder for SourceExecutorBuilder {
let source_name = source.source_name.clone();
let source_info = source.get_info()?;

let mut source_columns = source.columns.clone();

{
// compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707
// for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key
if source_info.format() == FormatType::Upsert
&& (source_info.row_encode() == PbEncodeType::Avro
|| source_info.row_encode() == PbEncodeType::Protobuf)
{
let _ = source_columns.iter_mut().map(|c| {
let _ = c.column_desc.as_mut().map(|desc| {
let is_bytea = desc
.get_column_type()
.map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
.unwrap();
if desc.name == DEFAULT_KEY_COLUMN_NAME && is_bytea {
desc.additional_column_type = AdditionalColumnType::Key as i32;
}
});
});
}
}
st1page marked this conversation as resolved.
Show resolved Hide resolved

let source_desc_builder = SourceDescBuilder::new(
source.columns.clone(),
params.env.source_metrics(),