From fd59a73358b9b214b6c4972f197c14982692f6a7 Mon Sep 17 00:00:00 2001 From: gaoliang Date: Wed, 26 Jul 2023 11:06:40 +0800 Subject: [PATCH] [feature-#1775][connector][http] http supports offline mode --- .../chunjun-connector-http/pom.xml | 28 ++++ .../connector/http/client/HttpClient.java | 44 +++++- .../http/client/OfflineJsonResponseParse.java | 81 +++++++++++ .../connector/http/common/ConstantValue.java | 1 + .../connector/http/common/HttpRestConfig.java | 11 ++ .../http/inputformat/HttpInputFormat.java | 1 + .../http/table/HttpDynamicTableFactory.java | 26 +++- .../http/table/HttpDynamicTableSource.java | 7 +- .../connector/http/table/HttpOptions.java | 36 +++++ .../http/offline_http_array_page_print.sql | 126 ++++++++++++++++++ .../sql/http/offline_http_array_print.sql | 80 +++++++++++ .../http/offline_http_single_page_print.sql | 76 +++++++++++ .../sql/http/offline_http_single_print.sql | 51 +++++++ .../http/http-offline-source.md" | 126 ++++++++++++++++++ 14 files changed, 680 insertions(+), 14 deletions(-) create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/OfflineJsonResponseParse.java create mode 100644 chunjun-examples/sql/http/offline_http_array_page_print.sql create mode 100644 chunjun-examples/sql/http/offline_http_array_print.sql create mode 100644 chunjun-examples/sql/http/offline_http_single_page_print.sql create mode 100644 chunjun-examples/sql/http/offline_http_single_print.sql create mode 100644 "docs/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/http/http-offline-source.md" diff --git a/chunjun-connectors/chunjun-connector-http/pom.xml b/chunjun-connectors/chunjun-connector-http/pom.xml index 1408c7eb3c..d117b09da0 100644 --- a/chunjun-connectors/chunjun-connector-http/pom.xml +++ b/chunjun-connectors/chunjun-connector-http/pom.xml @@ -48,6 +48,30 @@ javacsv 2.0 + + + com.jayway.jsonpath + json-path + 2.8.0 + + + net.minidev + json-smart + + + + + + net.minidev + json-smart + 2.4.10 + + + slf4j-api + org.slf4j + 1.7.36 + provided + @@ -56,6 +80,10 @@ org.apache.maven.plugins maven-antrun-plugin + + org.apache.maven.plugins + maven-shade-plugin + diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java index 8cb9ad959d..51e182797c 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java @@ -26,6 +26,7 @@ import com.dtstack.chunjun.util.GsonUtil; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpStatus; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpUriRequest; @@ -41,9 +42,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static com.dtstack.chunjun.connector.http.common.ConstantValue.CSV_DECODE; -import static com.dtstack.chunjun.connector.http.common.ConstantValue.TEXT_DECODE; -import static com.dtstack.chunjun.connector.http.common.ConstantValue.XML_DECODE; +import static com.dtstack.chunjun.connector.http.common.ConstantValue.*; @Slf4j public class HttpClient { @@ -134,6 +133,9 @@ public void initPosition(HttpRequestParam requestParam, String response) { } public void execute() { + if (restConfig.getLimitRequestTime() < restConfig.getRequestTime()) { + return; + } if (!running) { return; @@ -182,6 +184,9 @@ public void execute() { first = false; requestRetryTime = 3; requestNumber++; + // 子类和父类使用同一个对象,可以向上汇报请求次数进度,以便及时触发finish + Integer requestTime = restConfig.getRequestTime(); + restConfig.setRequestTime(++requestTime); } public void doExecute(int retryTime) { @@ -204,6 +209,21 @@ public void doExecute(int retryTime) { String responseValue; int responseStatus; try { + Map requestParam = currentParam.getParam(); + Map requestBody = currentParam.getBody(); + if (StringUtils.isNotBlank(restConfig.getPageParamName())) { + Integer pagePosition = + restConfig.getStartIndex() + + restConfig.getStep() * restConfig.getRequestTime(); + if (pagePosition > restConfig.getEndIndex()) { + return; + } + if ("get".equals(restConfig.getRequestMode())) { + requestParam.put(restConfig.getPageParamName(), pagePosition); + } else { + requestBody.put(restConfig.getPageParamName(), pagePosition); + } + } HttpUriRequest request = HttpUtil.getRequest( @@ -221,7 +241,8 @@ public void doExecute(int retryTime) { return; } - responseValue = EntityUtils.toString(httpResponse.getEntity()); + // utf-8:支持中文 + responseValue = EntityUtils.toString(httpResponse.getEntity(), "utf-8"); responseStatus = httpResponse.getStatusLine().getStatusCode(); } catch (Throwable e) { // 只要本次请求中出现了异常 都会进行重试,如果重试次数达到了就真正结束任务 @@ -264,9 +285,16 @@ public void doExecute(int retryTime) { } } - responseParse.parse(responseValue, responseStatus, HttpRequestParam.copy(currentParam)); - while (responseParse.hasNext()) { - processData(responseParse.next()); + if (StringUtils.isBlank(responseValue)) { + reachEnd = true; + running = false; + } else { + responseParse.parse( + responseValue, responseStatus, HttpRequestParam.copy(currentParam)); + while (responseParse.hasNext()) { + // 一条一条数据的增加 + processData(responseParse.next()); + } } if (-1 != restConfig.getCycles() && requestNumber >= restConfig.getCycles()) { @@ -342,6 +370,8 @@ protected ResponseParse getResponseParse(AbstractRowConverter converter) { return new XmlResponseParse(restConfig, converter); case TEXT_DECODE: return new TextResponseParse(restConfig, converter); + case OFFLINE_JSON_DECODE: + return new OfflineJsonResponseParse(restConfig, converter); default: return new JsonResponseParse(restConfig, converter); } diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/OfflineJsonResponseParse.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/OfflineJsonResponseParse.java new file mode 100644 index 0000000000..77ce92b4c1 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/OfflineJsonResponseParse.java @@ -0,0 +1,81 @@ +package com.dtstack.chunjun.connector.http.client; + +import com.dtstack.chunjun.connector.http.common.HttpRestConfig; +import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.util.GsonUtil; + +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.jayway.jsonpath.JsonPath; +import org.apache.commons.lang3.StringUtils; + +import java.util.Iterator; +import java.util.Map; + +/** @Description 离线任务 @Author lianggao @Date 2023/6/1 下午5:50 */ +public class OfflineJsonResponseParse extends ResponseParse { + + private String responseValue; + private HttpRequestParam requestParam; + private final Gson gson; + private Iterator iterator; + /** true:single true:array false:single false:array */ + String parserFlag; + + String jsonPath; + + public OfflineJsonResponseParse(HttpRestConfig config, AbstractRowConverter converter) { + super(config, converter); + this.gson = GsonUtil.GSON; + this.jsonPath = config.getJsonPath(); + this.parserFlag = !StringUtils.isBlank(jsonPath) + ":" + config.getReturnedDataType(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ResponseValue next() throws Exception { + String rowJson = gson.toJson(iterator.next()); + Map data = + DefaultRestHandler.gson.fromJson(rowJson, GsonUtil.gsonMapTypeToken); + return new ResponseValue(converter.toInternal(data), requestParam, responseValue); + } + + @Override + public void parse(String responseValue, int responseStatus, HttpRequestParam requestParam) { + this.responseValue = responseValue; + this.requestParam = requestParam; + runParseJson(); + } + + public void runParseJson() { + switch (parserFlag) { + case "false:array": + iterator = JsonPath.read(responseValue, "$"); + break; + case "true:single": + Object read = JsonPath.read(responseValue, jsonPath); + iterator = Lists.newArrayList(read).iterator(); + break; + case "true:array": + Object eval = JsonPath.read(responseValue, jsonPath); + if (eval instanceof net.minidev.json.JSONArray) { + iterator = ((net.minidev.json.JSONArray) eval).iterator(); + } else { + // 如果为null 则直接报错,返回解析错误的数据 + if (eval == null) { + throw new RuntimeException( + "response parsing is incorrect Please check the conf ,get response is" + + responseValue); + } + iterator = Lists.newArrayList(eval).iterator(); + } + break; + default: + Lists.newArrayList(responseValue); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/ConstantValue.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/ConstantValue.java index b4d207469f..3de7cd9a4b 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/ConstantValue.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/ConstantValue.java @@ -35,6 +35,7 @@ public class ConstantValue { public static final String TEXT_DECODE = "text"; public static final String DEFAULT_DECODE = "json"; + public static final String OFFLINE_JSON_DECODE = "offline-json"; public static final String PREFIX = "${"; public static final String SUFFIX = "}"; diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/HttpRestConfig.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/HttpRestConfig.java index 7c20bec1dc..c12ce83e46 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/HttpRestConfig.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/HttpRestConfig.java @@ -83,6 +83,17 @@ public class HttpRestConfig extends CommonConfig { /** 请求的超时时间 单位毫秒 */ private long timeOut = 10000; + private String returnedDataType; + private String jsonPath; + + private String pageParamName; + private Integer endIndex; + private Integer startIndex; + private Integer Step; + + private Integer limitRequestTime = 1; + private Integer requestTime = 0; + public String getFieldTypes() { return fieldTypes; } diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/inputformat/HttpInputFormat.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/inputformat/HttpInputFormat.java index 5c4117c1fc..e10f8aaab3 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/inputformat/HttpInputFormat.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/inputformat/HttpInputFormat.java @@ -82,6 +82,7 @@ protected void openInternal(InputSplit inputSplit) { protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException { ResponseValue value = myHttpClient.takeEvent(); if (null == value) { + reachEnd = httpRestConfig.getLimitRequestTime().equals(httpRestConfig.getRequestTime()); return null; } if (value.isNormal()) { diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/table/HttpDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/table/HttpDynamicTableFactory.java index c1ec98ed06..78f3edf576 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/table/HttpDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/table/HttpDynamicTableFactory.java @@ -35,6 +35,7 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import org.apache.commons.lang3.StringUtils; import java.util.HashSet; import java.util.List; @@ -69,7 +70,12 @@ public Set> optionalOptions() { options.add(HttpOptions.DELAY); options.add(HttpOptions.DATA_SUBJECT); options.add(HttpOptions.CYCLES); - + options.add(HttpOptions.RETURNEDDATATYPE); + options.add(HttpOptions.JSONPATH); + options.add(HttpOptions.PAGEPARAMNAME); + options.add(HttpOptions.STEP); + options.add(HttpOptions.STARTINDEX); + options.add(HttpOptions.ENDINDEX); return options; } @@ -141,6 +147,24 @@ private HttpRestConfig getRestapiConf(ReadableConfig config) { httpRestConfig.setRequestMode(config.get(HttpOptions.METHOD)); httpRestConfig.setDataSubject(config.get(HttpOptions.DATA_SUBJECT)); httpRestConfig.setCycles(config.get(HttpOptions.CYCLES)); + + httpRestConfig.setReturnedDataType(config.get(HttpOptions.RETURNEDDATATYPE)); + httpRestConfig.setJsonPath(config.get(HttpOptions.JSONPATH)); + + if (StringUtils.isNotBlank(config.get(HttpOptions.PAGEPARAMNAME))) { + httpRestConfig.setPageParamName(config.get(HttpOptions.PAGEPARAMNAME)); + httpRestConfig.setStep(config.get(HttpOptions.STEP)); + httpRestConfig.setStartIndex(config.get(HttpOptions.STARTINDEX)); + httpRestConfig.setEndIndex(config.get(HttpOptions.ENDINDEX)); + + Integer limitRequestTime = + (config.get(HttpOptions.ENDINDEX) - config.get(HttpOptions.STARTINDEX)) + / config.get(HttpOptions.STEP) + + 1; + httpRestConfig.setLimitRequestTime(limitRequestTime); + httpRestConfig.setCycles(limitRequestTime); + } + httpRestConfig.setParam( gson.fromJson( config.get(HttpOptions.PARAMS), diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/table/HttpDynamicTableSource.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/table/HttpDynamicTableSource.java index 7f0def8ea8..7a22d9f2ba 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/table/HttpDynamicTableSource.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/table/HttpDynamicTableSource.java @@ -114,11 +114,6 @@ public String asSummaryString() { @Override public ChangelogMode getChangelogMode() { - return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.UPDATE_BEFORE) - .addContainedKind(RowKind.UPDATE_AFTER) - .addContainedKind(RowKind.DELETE) - .build(); + return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).build(); } } diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/table/HttpOptions.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/table/HttpOptions.java index 0bfaf10eb5..5752243d29 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/table/HttpOptions.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/table/HttpOptions.java @@ -76,4 +76,40 @@ public class HttpOptions { .longType() .defaultValue(1L) .withDescription("request cycle"); + + public static final ConfigOption RETURNEDDATATYPE = + ConfigOptions.key("returned-data-type") + .stringType() + .defaultValue("single") + .withDescription("The data structure returned is single or array"); + + public static final ConfigOption JSONPATH = + ConfigOptions.key("json-path") + .stringType() + .defaultValue("") + .withDescription("json Path"); + + public static final ConfigOption PAGEPARAMNAME = + ConfigOptions.key("page-param-name") + .stringType() + .defaultValue("") + .withDescription("Pagination request page name,for example: pageName"); + + public static final ConfigOption STARTINDEX = + ConfigOptions.key("start-index") + .intType() + .defaultValue(1) + .withDescription("The initial page number of multiple requests"); + + public static final ConfigOption ENDINDEX = + ConfigOptions.key("end-index") + .intType() + .defaultValue(1) + .withDescription("The final page number of multiple requests"); + + public static final ConfigOption STEP = + ConfigOptions.key("step") + .intType() + .defaultValue(1) + .withDescription("The step size of the requested page number"); } diff --git a/chunjun-examples/sql/http/offline_http_array_page_print.sql b/chunjun-examples/sql/http/offline_http_array_page_print.sql new file mode 100644 index 0000000000..9385f3c5fb --- /dev/null +++ b/chunjun-examples/sql/http/offline_http_array_page_print.sql @@ -0,0 +1,126 @@ +CREATE TABLE source +( + category varchar, + author varchar, + title varchar, + price double +) WITH ( + 'connector' = 'http-x' + ,'url' = 'http://localhost:8088/api/arraypage' + ,'intervalTime'= '3000' + ,'method'='get' --请求方式:get 、post + ,'decode'='offline-json' -- 数据格式:只支持json模式 + ,'header'='[ {"key":"headerName1","value":"headerValue1"},{"key":"headerName2","value":"headerValue2"}]' -- 请求header + ,'body'='[ {"key":"bodyName1","value":"bodyValue1"} ]' -- 请求体 + ,'params'='[ {"key":"paramsKey1","value":"paramsValue1"} ]' -- 请求参数:用于拼接url + ,'returned-data-type'='array' -- 返回数据类型:single:单条数据;array:数组数据 + ,'json-path'='$.store.book' -- json路径:jsonpath用于解析指定层的json数据 + -- 以下4个参数要同时存在: + ,'page-param-name'='pagenum' -- 多次请求参数1:分页参数名:例如:pageNum + ,'start-index'='1' -- 多次请求参数2:开始的位置 + ,'end-index'='10000' -- 多次请求参数3:结束的位置 + ,'step'='1' -- 多次请求参数4:步长:默认值为1 + ); + +CREATE TABLE sink +( + category varchar, + author varchar, + title varchar, + price double +) WITH ( + 'connector' = 'print' + ); + + +insert into sink +select * +from source ; + +-- print说明: + +-- 数据原型 +-- { +-- "store": { +-- "book": [{ +-- "category": "reference", +-- "author": "Nigel Rees", +-- "title": "Sayings of the Century", +-- "price": 8.95 +-- }, { +-- "category": "fiction", +-- "author": "Evelyn Waugh", +-- "title": "Sword of Honour", +-- "price": 12.99 +-- }, { +-- "category": "fiction", +-- "author": "Herman Melville", +-- "title": "Moby Dick", +-- "isbn": "0-553-21311-3", +-- "price": 8.99 +-- }, { +-- "category": "fiction", +-- "author": "J. R. R. Tolkien", +-- "title": "The Lord of the Rings", +-- "isbn": "0-395-19395-8", +-- "price": 22.99 +-- } +-- ], +-- "bicycle": { +-- "color": "red", +-- "price": 19.95 +-- } +-- } +-- } + +-- $.store.book 选择的是book数组下的多条数据 +-- +-- source表的字段名要对应起来:category、author、title、price +-- 7> +I[reference, Nigel Rees, Sayings of the Century, 8.95] +-- 6> +I[reference, Nigel Rees, Sayings of the Century, 8.95] +-- 10> +I[reference, Nigel Rees, Sayings of the Century, 8.95] +-- 11> +I[reference, Nigel Rees, Sayings of the Century, 8.95] +-- 8> +I[reference, Nigel Rees, Sayings of the Century, 8.95] +-- 12> +I[reference, Nigel Rees, Sayings of the Century, 8.95] +-- 12> +I[fiction, Evelyn Waugh, Sword of Honour, 12.99] +-- 1> +I[reference, Nigel Rees, Sayings of the Century, 8.95] +-- 3> +I[reference, Nigel Rees, Sayings of the Century, 8.95] +-- 9> +I[reference, Nigel Rees, Sayings of the Century, 8.95] +-- 5> +I[reference, Nigel Rees, Sayings of the Century, 8.95] +-- 12> +I[fiction, Herman Melville, Moby Dick, 8.99] +-- 1> +I[fiction, Evelyn Waugh, Sword of Honour, 12.99] +-- 2> +I[reference, Nigel Rees, Sayings of the Century, 8.95] +-- 3> +I[fiction, Evelyn Waugh, Sword of Honour, 12.99] +-- 8> +I[fiction, Evelyn Waugh, Sword of Honour, 12.99] +-- 11> +I[fiction, Evelyn Waugh, Sword of Honour, 12.99] +-- 10> +I[fiction, Evelyn Waugh, Sword of Honour, 12.99] +-- 7> +I[fiction, Evelyn Waugh, Sword of Honour, 12.99] +-- 6> +I[fiction, Evelyn Waugh, Sword of Honour, 12.99] +-- 2> +I[fiction, Evelyn Waugh, Sword of Honour, 12.99] +-- 4> +I[reference, Nigel Rees, Sayings of the Century, 8.95] +-- 3> +I[fiction, Herman Melville, Moby Dick, 8.99] +-- 1> +I[fiction, Herman Melville, Moby Dick, 8.99] +-- 10> +I[fiction, Herman Melville, Moby Dick, 8.99] +-- 12> +I[fiction, J. R. R. Tolkien, The Lord of the Rings, 22.99] +-- 2> +I[fiction, Herman Melville, Moby Dick, 8.99] +-- 4> +I[fiction, Evelyn Waugh, Sword of Honour, 12.99] +-- 5> +I[fiction, Evelyn Waugh, Sword of Honour, 12.99] +-- 3> +I[fiction, J. R. R. Tolkien, The Lord of the Rings, 22.99] +-- 1> +I[fiction, J. R. R. Tolkien, The Lord of the Rings, 22.99] +-- 2> +I[fiction, J. R. R. Tolkien, The Lord of the Rings, 22.99] +-- 4> +I[fiction, Herman Melville, Moby Dick, 8.99] +-- 5> +I[fiction, Herman Melville, Moby Dick, 8.99] +-- 9> +I[fiction, Evelyn Waugh, Sword of Honour, 12.99] +-- 7> +I[fiction, Herman Melville, Moby Dick, 8.99] +-- 4> +I[fiction, J. R. R. Tolkien, The Lord of the Rings, 22.99] +-- 10> +I[fiction, J. R. R. Tolkien, The Lord of the Rings, 22.99] +-- 6> +I[fiction, Herman Melville, Moby Dick, 8.99] +-- 9> +I[fiction, Herman Melville, Moby Dick, 8.99] +-- 8> +I[fiction, Herman Melville, Moby Dick, 8.99] +-- 7> +I[fiction, J. R. R. Tolkien, The Lord of the Rings, 22.99] +-- 11> +I[fiction, Herman Melville, Moby Dick, 8.99] +-- 5> +I[fiction, J. R. R. Tolkien, The Lord of the Rings, 22.99] +-- 9> +I[fiction, J. R. R. Tolkien, The Lord of the Rings, 22.99] +-- 8> +I[fiction, J. R. R. Tolkien, The Lord of the Rings, 22.99] +-- 6> +I[fiction, J. R. R. Tolkien, The Lord of the Rings, 22.99] +-- 11> +I[fiction, J. R. R. Tolkien, The Lord of the Rings, 22.99] diff --git a/chunjun-examples/sql/http/offline_http_array_print.sql b/chunjun-examples/sql/http/offline_http_array_print.sql new file mode 100644 index 0000000000..c821eed7f0 --- /dev/null +++ b/chunjun-examples/sql/http/offline_http_array_print.sql @@ -0,0 +1,80 @@ +-- 请求一次 解析一次请求的多条数据 +CREATE TABLE source +( + axis varchar, + `value` integer, + createTime varchar +) WITH ( + 'connector' = 'http-x' + ,'url' = 'http://localhost:8088/api/array' + ,'intervalTime'= '3000' + ,'method'='get' --请求方式:get 、post + ,'decode'='offline-json' -- 数据格式:只支持json模式 + ,'header'='[ {"key":"headerName1","value":"headerValue1"},{"key":"headerName2","value":"headerValue2"}]' -- 请求header + ,'body'='[ {"key":"bodyName1","value":"bodyValue1"} ]' -- 请求体 + ,'params'='[ {"key":"paramsKey1","value":"paramsValue1"} ]' -- 请求参数:用于拼接url + ,'returned-data-type'='array' -- 返回数据类型:single:单条数据;array:数组数据 + ,'json-path'='$' -- json路径:jsonpath用于解析指定层的json数据 + ); + +CREATE TABLE sink +( + axis varchar, + `value` integer, + createTime varchar +) WITH ( + 'connector' = 'print' + ); + + +insert into sink +select * +from source ; + +-- 测试用例说明 +-- $ 选择的是book数组下的多条数据 +-- +-- +I[series1, 9191352, 2023-01-04 00:07:20] +-- +I[series1, 6645322, 2023-01-04 00:14:47] +-- +I[series1, 2078369, 2023-01-04 00:22:13] +-- +I[series1, 7325410, 2023-01-04 00:29:30] +-- +I[series1, 7448456, 2023-01-04 00:37:04] +-- +I[series1, 5808077, 2023-01-04 00:44:30] +-- +I[series1, 5625821, 2023-01-04 00:52:06] +-- { +-- "axis": "series1", +-- "value": 9191352, +-- "createTime": "2023-01-04 00:07:20" +-- }, +-- { +-- "axis": "series1", +-- "value": 6645322, +-- "createTime": "2023-01-04 00:14:47" +-- }, +-- { +-- "axis": "series1", +-- "value": 2078369, +-- "createTime": "2023-01-04 00:22:13" +-- }, +-- { +-- "axis": "series1", +-- "value": 7325410, +-- "createTime": "2023-01-04 00:29:30" +-- }, +-- { +-- "axis": "series1", +-- "value": 7448456, +-- "createTime": "2023-01-04 00:37:04" +-- }, +-- { +-- "axis": "series1", +-- "value": 5808077, +-- "createTime": "2023-01-04 00:44:30" +-- }, +-- { +-- "axis": "series1", +-- "value": 5625821, +-- "createTime": "2023-01-04 00:52:06" +-- } +-- ] + diff --git a/chunjun-examples/sql/http/offline_http_single_page_print.sql b/chunjun-examples/sql/http/offline_http_single_page_print.sql new file mode 100644 index 0000000000..bc2ee57f45 --- /dev/null +++ b/chunjun-examples/sql/http/offline_http_single_page_print.sql @@ -0,0 +1,76 @@ +-- 请求多次 每次请求都为一条数据 +CREATE TABLE source +( + color varchar, + price double + +) WITH ( + 'connector' = 'http-x' + ,'url' = 'http://localhost:8088/api/arraypage' + ,'intervalTime'= '3000' + ,'method'='get' --请求方式:get 、post + ,'decode'='offline-json' -- 数据格式:只支持json模式 + ,'header'='[ {"key":"headerName1","value":"headerValue1"},{"key":"headerName2","value":"headerValue2"}]' -- 请求header + ,'body'='[ {"key":"bodyName1","value":"bodyValue1"} ]' -- 请求体 + ,'params'='[ {"key":"paramsKey1","value":"paramsValue1"} ]' -- 请求参数:用于拼接url + ,'returned-data-type'='single' -- 返回数据类型:single:单条数据;array:数组数据 + ,'json-path'='$.store.bicycle' -- json路径:jsonpath用于解析指定层的json数据 + -- 以下4个参数要同时存在: + ,'page-param-name'='pagenum' -- 多次请求参数1:分页参数名:例如:pageNum + ,'start-index'='1' -- 多次请求参数2:开始的位置 + ,'end-index'='40000' -- 多次请求参数3:结束的位置 + ,'step'='1' -- 多次请求参数4:步长:默认值为1 + ); + +CREATE TABLE sink +( + color varchar, + price double +) WITH ( + 'connector' = 'print' + ); + + +insert into sink +select * +from source ; + +-- $.store.book 选择的是book数组下的多条数据 +-- +-- source表的字段名要对应起来:category、author、title、price +-- +I[red, 19.95] +-- +I[red, 19.95] +-- +I[red, 19.95] +-- +I[red, 19.95] +-- { +-- "store": { +-- "book": [{ +-- "category": "reference", +-- "author": "Nigel Rees", +-- "title": "Sayings of the Century", +-- "price": 8.95 +-- }, { +-- "category": "fiction", +-- "author": "Evelyn Waugh", +-- "title": "Sword of Honour", +-- "price": 12.99 +-- }, { +-- "category": "fiction", +-- "author": "Herman Melville", +-- "title": "Moby Dick", +-- "isbn": "0-553-21311-3", +-- "price": 8.99 +-- }, { +-- "category": "fiction", +-- "author": "J. R. R. Tolkien", +-- "title": "The Lord of the Rings", +-- "isbn": "0-395-19395-8", +-- "price": 22.99 +-- } +-- ], +-- "bicycle": { +-- "color": "red", +-- "price": 19.95 +-- } +-- } +-- } diff --git a/chunjun-examples/sql/http/offline_http_single_print.sql b/chunjun-examples/sql/http/offline_http_single_print.sql new file mode 100644 index 0000000000..923572df1a --- /dev/null +++ b/chunjun-examples/sql/http/offline_http_single_print.sql @@ -0,0 +1,51 @@ +-- 解析单条数据 +CREATE TABLE source +( + axis varchar, + `value` integer, + createTime varchar +) WITH ( + 'connector' = 'http-x' + ,'url' = 'http://localhost:8088/api/single' + ,'intervalTime'= '3000' + ,'method'='get' --请求方式:get 、post + ,'decode'='offline-json' -- 数据格式:只支持json模式 + ,'header'='[ {"key":"headerName1","value":"headerValue1"},{"key":"headerName2","value":"headerValue2"}]' -- 请求header + ,'body'='[ {"key":"bodyName1","value":"bodyValue1"} ]' -- 请求体 + ,'params'='[ {"key":"paramsKey1","value":"paramsValue1"} ]' -- 请求参数:用于拼接url + ,'returned-data-type'='single' -- 返回数据类型:single:单条数据;array:数组数据 + ,'json-path'='$' -- json路径:jsonpath用于解析指定层的json数据 + -- 以下4个参数要同时存在: + ,'page-param-name'='' -- 多次请求参数1:分页参数名:例如:pageNum + ,'start-index'='1' -- 多次请求参数2:开始的位置 + ,'end-index'='1' -- 多次请求参数3:结束的位置 + ,'step'='1' -- 多次请求参数4:步长:默认值为1 + ); + +CREATE TABLE sink +( + axis varchar, + `value` integer, + createTime varchar +) WITH ( + 'connector' = 'print' + ); + + +insert into sink +select * +from source ; + +-- $.store获取的是store子层的数据,有book数组,bicycle数据。 +-- 当source字段名为bicycle时选择的是 bicycle下的数据,+I({color=red, price=19.95}) +-- { +-- "store": { +-- "book": [{ +-- } +-- ], +-- "bicycle": { +-- "color": "red", +-- "price": 19.95 +-- } +-- } +-- } diff --git "a/docs/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/http/http-offline-source.md" "b/docs/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/http/http-offline-source.md" new file mode 100644 index 0000000000..cdcf5dee6c --- /dev/null +++ "b/docs/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/http/http-offline-source.md" @@ -0,0 +1,126 @@ +# Http Source + +## 一、介绍 + +支持从http数据源以离线读取接口数据,分为单次拉取数据、多次分页拉取数据 + +## 二、支持版本 + +HTTP/1.1 是当前广泛使用的 HTTP 协议版本,它引入了一些改进,包括持久连接、流水线化、范围请求等。 +通常,大多数现代的 HTTP 库和服务器都支持 HTTP/1.1。 + +目前测试的HTTP协议版本也是1.1。 + + +## 三、插件名称 + +| 支持模式 | 连接器标识 | +|------|--------| +| SQL | http-x | + +## 四、参数说明 + +### 1、sql模式 + +- **url** + + - 描述:请求url + - 必选:是 + - 参数类型:string + - 默认值:无 +
+ +- **decode** + - 描述:decode type + - 注意:对于按照离线模式消费数据的值为:offline-json + - 必选:是 + - 参数类型:string + - 默认值:json +
+ +- **method** + - 描述:请求方法(post、get等) + - 必选:是 + - 参数类型:string + - 默认值:post +
+ +- **header** + + - 描述:请求的header + - 必选:否 + - 参数类型:string + - 默认值:[] +
+ +- **params** + + - 描述:请求参数 + - 必选:否 + - 参数类型:String + - 默认值:[] +
+ +- **returned-data-type** + + - 描述:请求返回数据类型。如:single:单条数据;array:数组数据,会转为多条数据。 + - 必选:是 + - 参数类型:String + - 默认值:single +
+ +- **json-path** + + - 描述:按照jsonpath语法,定位和提取json的数据。 + 如:样例数据{ "store":{"bicycle":"111"}},json-path=$.store,则取得是{"bicycle":"111"} + - 必选:否 + - 参数类型:string + - 默认值:无 +
+ +- **page-param-name** + + - 描述:分页读取时,分页参数名。例如:pageNum + - 必选:否 + - 参数类型:String + - 默认值:无 +
+ +- **startIndex** + + - 描述:分页开始页 + - + - 必选:否 + - 参数类型:int + - 默认值:1 +
+ +- **endIndex** + + - 描述:分页结束页 + - 必选:否 + - 参数类型:int + - 默认值:1 +
+ + +- **step** + + - 描述:请求步长,比如第一次请求为第1页,步长为2,那下次一次请求是第3页 + - 必选:否 + - 参数类型:int + - 默认值:1 +
+ + + +## 五、数据类型 + +| 是否支持 | 类型名称 | +| :------: |:-------------------------------------------------------------:| +| 支持 | BOOLEAN、INTEGER、BIGINT、DATE、FLOAT、DOUBLE、CHAR、VARCHAR、DECIMAL | + + +## 六、脚本示例 + +见项目内`chunjun-examples`文件夹。