Skip to content

Commit

Permalink
add Types.OTHER for postgresql geometry column type
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Jul 9, 2024
1 parent ba0a9bc commit 59e3667
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package com.alibaba.datax.common.plugin;

import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.IDataxReader;
import com.qlangtech.tis.datax.IDataxWriter;
import com.qlangtech.tis.datax.impl.DataxProcessor;

/**
* Created by jingxing on 14-8-24.
*/
Expand All @@ -13,6 +18,15 @@ public JobPluginCollector getJobPluginCollector() {
return jobPluginCollector;
}

protected final IDataxProcessor loadProcessor() {
IDataxProcessor processor = DataxProcessor.load(null, this.containerContext.getTISDataXName());
return processor;
}

protected final IDataxWriter loadDataXWriter() {
return this.loadProcessor().getWriter(null);
}

/**
* @param jobPluginCollector the jobPluginCollector to set
*/
Expand Down
4 changes: 4 additions & 0 deletions elasticsearchwriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
</properties>

<dependencies>
<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-plugin</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.alibaba.datax.plugin.writer.elasticsearchwriter;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
Expand All @@ -11,13 +12,21 @@
import io.searchbox.indices.DeleteIndex;
import io.searchbox.indices.IndicesExists;
import io.searchbox.indices.aliases.*;
import io.searchbox.indices.mapping.GetMapping;
import io.searchbox.indices.mapping.GetMapping.Builder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Created by xiongfeng.bxf on 17/2/8.
Expand All @@ -32,23 +41,118 @@ public ESClient(ESInitialization es) {


public boolean indicesExists(String indexName) throws Exception {
boolean isIndicesExists = false;
// boolean isIndicesExists = false;
JestResult rst = es.jestClient.execute(new IndicesExists.Builder(indexName).build());
return parseResponse(rst, new ParseResponse<Boolean>() {
@Override
public Boolean success(JestResult rst) {
return true;
}

@Override
public Boolean faild() {
return false;
}
});

// if (rst.isSucceeded()) {
// isIndicesExists = true;
// } else {
// switch (rst.getResponseCode()) {
// case 404:
// isIndicesExists = false;
// break;
// case 401:
// // 无权访问
// default:
// log.warn(rst.getErrorMessage());
// break;
// }
// }
// return isIndicesExists;
}

private <T> T parseResponse(JestResult rst, ParseResponse<T> parseResponse) {
if (rst.isSucceeded()) {
isIndicesExists = true;
return parseResponse.success(rst);
} else {
switch (rst.getResponseCode()) {
case 404:
isIndicesExists = false;
break;
return parseResponse.faild();
case 401:
// 无权访问
default:
log.warn(rst.getErrorMessage());
break;
}
}
return isIndicesExists;

return parseResponse.faild();
}

interface ParseResponse<T> {
T success(JestResult rst);

T faild();
}

public static class SchemaCol {
private final String name;
private final String type;

public SchemaCol(String name, String type) {
this.name = name;
this.type = type;
}

@Override
public String toString() {
return "SchemaCol{" +
"name='" + name + '\'' +
", type='" + type + '\'' +
'}';
}
}

/**
* 取得index的Schema mapping
*
* @param indexName
* @return
* @throws Exception
*/
public List<SchemaCol> getMapping(String indexName) throws Exception {
GetMapping.Builder builder = new GetMapping.Builder();
builder.addIndex(indexName);
JestResult rst = es.jestClient.execute(builder.build());

return parseResponse(rst, new ParseResponse<List<SchemaCol>>() {
@Override
public List<SchemaCol> success(JestResult rst) {
JsonObject obj = rst.getJsonObject();
JsonObject mapping = obj.getAsJsonObject(indexName);
JsonObject props = Objects.requireNonNull(mapping.getAsJsonObject("mappings"), "mapping can not be null")
.getAsJsonObject("properties");
JsonObject typeMeta = null;
List<SchemaCol> parseMapping = Lists.newArrayList();
SchemaCol col = null;
for (Map.Entry<String, JsonElement> entry : Objects.requireNonNull(props, "props can not be null").entrySet()) {

typeMeta = entry.getValue().getAsJsonObject();

col = new SchemaCol(entry.getKey(), typeMeta.get("type").getAsString());
parseMapping.add(col);
}
return parseMapping;
}

@Override
public List<SchemaCol> faild() {
return Collections.emptyList();
}
});

// rst.getSourceAsObject()
}

public boolean deleteIndex(String indexName) throws Exception {
Expand All @@ -64,57 +168,48 @@ public boolean deleteIndex(String indexName) throws Exception {
return true;
}

public boolean createIndex(String indexName, String typeName, Object mappings, String settings, boolean dynamic) throws Exception {
public boolean createIndex(String indexName, String typeName
, Pair<String, List<ESColumn>> mappings
, String settings, boolean dynamic) throws Exception {
JestResult rst = null;
if (!indicesExists(indexName)) {
log.info("create index " + indexName);
rst = es.jestClient.execute(new CreateIndex.Builder(indexName).settings(settings).mappings(String.valueOf(mappings)).setParameter("master_timeout", "5m").build());
rst = es.jestClient.execute(new CreateIndex.Builder(indexName)
.settings(settings)
.mappings((mappings.getKey()))
.setParameter("master_timeout", "5m").build());
//index_already_exists_exception
if (!rst.isSucceeded()) {
log.error("createIndex faild:{} ", rst.getErrorMessage());
//if (getStatus(rst) == 400) {
// log.info(String.format("index [%s] already exists", indexName));
// return true;
//}
// else {
// log.error(rst.getErrorMessage());
//
// }
return false;
throw new IllegalStateException("createIndex faild:" + rst.getErrorMessage());
} else {
log.info(String.format("create [%s] index success", indexName));
log.info(mappings.getKey());
return true;
}
} else {
//
ESColumn cfg = null;
// SchemaCol ecfg = null;
List<ESColumn> cfgCols = mappings.getValue();
Set<String> existCols = this.getMapping(indexName).stream().map((col) -> col.name).collect(Collectors.toSet());
;
if (cfgCols.size() != existCols.size()) {
throw new IllegalStateException("created schemaMapping cols size:"
+ existCols.size()
+ " is not equal with config schema mapping size:" + cfgCols.size());
}

for (int idx = 0; idx < cfgCols.size(); idx++) {
cfg = cfgCols.get(idx);
if (!existCols.contains(cfg.getName())) {
throw new IllegalStateException("column index:" + idx + ",config in TIS name:"
+ cfg.getName() + " is not exist in elastic schema cols:" + String.join(",", existCols)
+ ", please make a schema mapping synchronizing");
}
}
}
// int idx = 0;
// while (idx < 5) {
// if (indicesExists(indexName)) {
// break;
// }
// Thread.sleep(2000);
// idx++;
// }
// if (idx >= 5) {
// return false;
// }
//
// if (dynamic) {
// log.info("ignore mappings");
// return true;
// }
// log.info("create mappings for " + indexName + " " + mappings);
// rst = jestClient.execute(new PutMapping.Builder(indexName, typeName, mappings)
// .setParameter("master_timeout", "5m").build());
// if (!rst.isSucceeded()) {
// if (getStatus(rst) == 400) {
// log.info(String.format("index [%s] mappings already exists", indexName));
// } else {
// log.error(rst.getErrorMessage());
// return false;
// }
// } else {
// log.info(String.format("index [%s] put mappings success", indexName));
// }
return true;

return false;
}

public JestResult execute(Action<JestResult> clientRequest) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.qlangtech.tis.datax.IDataxProcessor;
import io.searchbox.client.JestResult;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
Expand Down Expand Up @@ -40,6 +41,9 @@ public void init() {

@Override
public void prepare() {

IInitialElasticSearchIndex initialElasticSearchIndex = (IInitialElasticSearchIndex) this.loadDataXWriter();

/**
* 注意:此方法仅执行一次。
* 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。
Expand Down Expand Up @@ -68,9 +72,10 @@ public void prepare() {
esClient.deleteIndex(indexName);
}
// 强制创建,内部自动忽略已存在的情况
if (!esClient.createIndex(indexName, typeName, mappings, settings, dynamic)) {
throw new IOException("create index or mapping failed");
}
initialElasticSearchIndex.initialIndex(this.loadProcessor());
// if (!esClient.createIndex(indexName, typeName, mappings, settings, dynamic)) {
// throw new IOException("create index or mapping failed");
// }
} catch (Exception ex) {
throw DataXException.asDataXException(ESWriterErrorCode.ES_MAPPINGS, ex.toString());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.alibaba.datax.plugin.writer.elasticsearchwriter;

import com.qlangtech.tis.datax.IDataxProcessor;

import java.util.List;

/**
* @author: 百岁([email protected]
* @create: 2024-07-06 21:02
**/
public interface IInitialElasticSearchIndex {
public List<ESColumn> initialIndex(IDataxProcessor dataxProcessor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.alibaba.datax.plugin.writer.elasticsearchwriter;

import com.alibaba.datax.plugin.writer.elasticsearchwriter.ESClient.SchemaCol;
import junit.framework.TestCase;
import org.junit.Assert;

import java.util.List;

/**
* @author: 百岁([email protected]
* @create: 2024-07-06 17:27
**/
public class TestESClient extends TestCase {
public void testGetMapping() throws Exception {
String endpoint = "http://192.168.28.201:9200";
ESInitialization config = (ESInitialization.create(endpoint, null, null,
false,
300000,
false,
false));
ESClient esClient = new ESClient(config);
List<SchemaCol> mapping = esClient.getMapping("video");
Assert.assertTrue(mapping.size() > 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ protected Record buildRecord(final DataXCol2Index col2Index, RecordSender record
case Types.LONGVARCHAR:
case Types.NVARCHAR:
case Types.LONGNVARCHAR:
case Types.OTHER:
String rawData;
if (StringUtils.isBlank(mandatoryEncoding)) {
rawData = rs.getString(i);
Expand Down
Loading

0 comments on commit 59e3667

Please sign in to comment.