diff --git a/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result b/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result index 3e03f5a511c0a..6a979eaa4f8b7 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result +++ b/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result @@ -1 +1 @@ -{"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test1","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test1","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test1","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test1","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test1","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}} \ No newline at end of file +{"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":7,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test1","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test1","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test1","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test1","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test1","_type":"_doc","_id":"1_1-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":1,"v2":2,"v3":"1-2"}},{"_index":"test1","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}} \ No newline at end of file diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java index 73f0799e44d1d..4d274a2ef20b6 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java @@ -39,8 +39,8 @@ public static SinkFactory getSinkFactory(String sinkName) { return new FileSinkFactory(); case "jdbc": return new JDBCSinkFactory(); - case "elasticsearch": - case "opensearch": + case "elasticsearch_v1": + case "opensearch_v1": return new EsSinkFactory(); case "cassandra": return new CassandraFactory(); diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java index d2873fac9d216..1c4392d0b7afd 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java @@ -59,7 +59,7 @@ public void testEsSink(ElasticsearchContainer container, String username, String .withDelimiter("$") .withUsername(username) .withPassword(password); - config.setConnector("elasticsearch"); + config.setConnector("elasticsearch_v1"); EsSink sink = new EsSink(config, getTestTableSchema()); sink.write( Iterators.forArray( diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 5f9c574cde6a2..ab86b6c86463c 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -152,12 +152,12 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { this.config = config; this.requestTracker = new RequestTracker(); // ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever. - if (config.getConnector().equals("elasticsearch")) { + if (config.getConnector().equals("elasticsearch_v1")) { ElasticRestHighLevelClientAdapter client = new ElasticRestHighLevelClientAdapter(host, config); this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client, config); - } else if (config.getConnector().equals("opensearch")) { + } else if (config.getConnector().equals("opensearch_v1")) { OpensearchRestHighLevelClientAdapter client = new OpensearchRestHighLevelClientAdapter(host, config); this.bulkProcessor = diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java index 5108f4eab4787..f601101b21aec 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java @@ -74,7 +74,7 @@ public void validate( // 2. check connection try { - if (config.getConnector().equals("elasticsearch")) { + if (config.getConnector().equals("elasticsearch_v1")) { ElasticRestHighLevelClientAdapter esClient = new ElasticRestHighLevelClientAdapter(host, config); if (!esClient.ping(org.elasticsearch.client.RequestOptions.DEFAULT)) { @@ -83,7 +83,7 @@ public void validate( .asRuntimeException(); } esClient.close(); - } else if (config.getConnector().equals("opensearch")) { + } else if (config.getConnector().equals("opensearch_v1")) { OpensearchRestHighLevelClientAdapter opensearchClient = new OpensearchRestHighLevelClientAdapter(host, config); if (!opensearchClient.ping(org.opensearch.client.RequestOptions.DEFAULT)) { diff --git a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_common.rs b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_common.rs index 5b88d0d75e5d0..f0acd849b58a1 100644 --- a/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_common.rs +++ b/src/connector/src/sink/elasticsearch_opensearch/elasticsearch_opensearch_common.rs @@ -347,6 +347,16 @@ pub fn validate_config(config: &ElasticSearchOpenSearchConfig, schema: &Schema) ))); } } + + if let Some(routing_column) = &config.routing_column { + let filed = schema.fields().get(*routing_column).unwrap(); + if filed.data_type() != DataType::Varchar { + return Err(SinkError::Config(anyhow!( + "please ensure the data type of {} is varchar.", + routing_column + ))); + } + } Ok(()) } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 6994557278288..082785c4e1a8c 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -257,9 +257,22 @@ ElasticSearchOpenSearchConfig: field_type: usize comments: It is used for dynamic index, if it is be set, the value of this column will be used as the index. It and `index` can only set one required: false - - name: max_task_num + - name: routing_column field_type: usize + comments: It is used for dynamic route, if it is be set, the value of this column will be used as the route required: false + - name: retry_on_conflict + field_type: i32 + required: true + - name: batch_num_messages + field_type: usize + required: true + - name: batch_size_kb + field_type: usize + required: true + - name: concurrent_requests + field_type: usize + required: true FsConfig: fields: - name: fs.path