Skip to content

Commit

Permalink
feat(sink): support flink dynamic table sink and http sink (#13280)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Dec 6, 2023
1 parent 3733e7c commit 5af41d3
Show file tree
Hide file tree
Showing 29 changed files with 2,402 additions and 12 deletions.
22 changes: 10 additions & 12 deletions e2e_test/iceberg/test_case/cdc/load.slt
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
# CDC source basic test

# enable cdc backfill in ci
statement ok
set cdc_backfill='true';
create source mysql_mydb with (
connector = 'mysql-cdc',
hostname = 'mysql',
port = '3306',
username = 'root',
password = '123456',
database.name = 'my@db',
server.id = '2'
);

statement ok
create table products ( id INT,
name STRING,
description STRING,
PRIMARY KEY (id)
) with (
connector = 'mysql-cdc',
hostname = 'mysql',
port = '3306',
username = 'root',
password = '123456',
database.name = 'my@db',
table.name = 'products',
server.id = '5085'
);
) FROM mysql_mydb TABLE '[email protected]';


statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ public class CommonSinkConfig {
@JsonProperty(value = "primary_key")
protected String primaryKey;

public CommonSinkConfig() {}

public CommonSinkConfig(String connector, Boolean forceAppendOnly, String primaryKey) {
this.connector = connector;
this.forceAppendOnly = forceAppendOnly;
this.primaryKey = primaryKey;
}

public String getConnector() {
return connector;
}
Expand Down
8 changes: 8 additions & 0 deletions java/connector-node/risingwave-connector-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,13 @@
<artifactId>risingwave-sink-cassandra</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-mock-flink-runtime</artifactId>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-mock-flink-http-sink</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import static io.grpc.Status.*;

import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.mock.flink.http.HttpFlinkMockSinkFactory;
import com.risingwave.mock.flink.runtime.FlinkDynamicAdapterFactory;
import com.risingwave.proto.ConnectorServiceProto;
import java.util.Optional;

Expand Down Expand Up @@ -45,6 +47,8 @@ public static SinkFactory getSinkFactory(String sinkName) {
return new EsSinkFactory();
case "cassandra":
return new CassandraFactory();
case "http":
return new FlinkDynamicAdapterFactory(new HttpFlinkMockSinkFactory());
default:
throw UNIMPLEMENTED
.withDescription("unknown sink type: " + sinkName)
Expand Down
15 changes: 15 additions & 0 deletions java/connector-node/risingwave-connector-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,20 @@
<artifactId>risingwave-sink-cassandra</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-mock-flink-runtime</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-mock-flink-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-mock-flink-http-sink</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 5af41d3

Please sign in to comment.