diff --git a/.gitignore b/.gitignore index 4b3ca19..90543e0 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ /target/ /sandbox/ /logs/ +/bin/ +/*.class diff --git a/README.rst b/README.rst index c058566..6276a4c 100644 --- a/README.rst +++ b/README.rst @@ -137,7 +137,6 @@ indexes):: } ' - Exports ======= @@ -515,7 +514,7 @@ The JSON response of an import may look like this:: .. hint:: - ``imports``: List of successful imports - - ``node_id'': The node id where the import happened + - ``node_id``: The node id where the import happened - ``took``: Operation time of all imports on the node in milliseconds - ``imported_files``: List of imported files in the import directory of the node's file system - ``file_name``: File name of the handled file @@ -532,7 +531,7 @@ Dump The idea behind dump is to export all relevant data to recreate the cluster as it was at the time of the dump. -The basic usage of the endpoint is: +The basic usage of the endpoint is:: curl -X POST 'http://localhost:9200/_dump' @@ -568,7 +567,7 @@ Restore ======= Dumped data is intended to get restored. This can be done by the _restore -endpoint: +endpoint:: curl -X POST 'http://localhost:9200/_restore' @@ -624,20 +623,88 @@ a given query directly into an index:: "fields": ["_id", "_source", ["_index", "'newindex'"]] }' -An example can be found in the `Search Into DocTest -`_. +An example can be found in the `Search Into DocTest `_. + + + +Script Support +============== + +Script support has been added to ``_import``, ``_searchinto`` and ``_reindex`` endpoints so that you can modify the documents read from dump files +on fly before indexing. + +Here is the full scenario (start ES empty, add 3 documents):: + + curl -XPUT 'localhost:9200/twitter/tweet/1' -d ' + { + "text" : { + "message" : "you know for search 1" + }, + "likes": 1 + }' + + curl -XPUT 'localhost:9200/twitter/tweet/2' -d ' + { + "text" : { + "message" : "you know for search 2" + }, + "likes": 2 + }' + + curl -XPUT 'localhost:9200/twitter/tweet/3' -d ' + { + "text" : { + "message" : "you know for search 3" + }, + "likes": 3 + }' + +Verify that you have 3 docs in index and that the likes are: 1, 2 and 3. Export documents onto file system:: + + curl -X POST 'http://localhost:9200/_export' -d '{ + "fields": ["_id", "_source", "_version", "_index", "_type"], + "output_file": "/tmp/es-data/dump-${index}-${shard}.gz", + "compression": "gzip" + }' + +You will find number of gz files, equal to number of your shards in ``/tmp/es-data/`` folder. If you open them, you will +see that 3 docs are in 3 files, other 2 files are empty. Now stop ES and delete data folder. Start ES again import the +data with the script that modifies like field:: + + curl -X POST 'http://localhost:9200/_import' -d '{ + "directory": "/tmp/es-data", + "compression": "gzip", + "script" : "ctx._source.likes += 1" + }' + +Verify that you have 3 docs in index and that the likes are: 2, 3 and 4. + +Test ``_searchinto``:: + + curl -XPOST "http://localhost:9200/twitter/_search_into" -d' + { + "fields": ["_id", "_source", ["_index", "'twitter-new'"]], + "script" : "ctx._source.likes += 1" + }' + +verify that you have 3 documents in twitter_new index with likes: 3,4 and 5 + +Test ``_reindex``:: + + curl -XPOST "http://localhost:9200/twitter/_reindex" -d' + { + "script" : "ctx._source.likes += 1" + }' +verify that you have 3 documents in twitter index with likes: 3,4 and 5 Installation ============ -* Clone this repo with git clone - git@github.com:crate/elasticsearch-inout-plugin.git -* Checkout the tag (find out via git tag) you want to build with - (possibly master is not for your elasticsearch version) -* Run: mvn clean package -DskipTests=true – this does not run any unit +* Clone this repo with git clone ``git@github.com:bremeld/fork-elasticsearch-inout-plugin.git`` +* Checkout the branch or tag that matches your ElasticSearch version +* Run: ``mvn clean package -DskipTests=true`` – this does not run any unit tests, as they take some time. If you want to run them, better run mvn clean package -* Install the plugin: /path/to/elasticsearch/bin/plugin -install - elasticsearch-inout-plugin -url - file:///$PWD/target/elasticsearch-inout-plugin-$version.jar +* Install the plugin: ``/path/to/elasticsearch/bin/plugin --install elasticsearch-inout-plugin --url + file:///$PWD/target/elasticsearch-inout-plugin-$version.jar`` diff --git a/pom.xml b/pom.xml index 2ce8918..635046a 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 crate elasticsearch-inout-plugin - 0.5.0 + 1.0.0.Beta2 jar An Elasticsearch plugin which provides the ability to export and import data by query on server side. @@ -14,7 +14,7 @@ https://github.com/crate/elasticsearch-inout-plugin - 0.90.3 + 1.0.0.Beta2 UTF-8 diff --git a/src/main/java/crate/elasticsearch/action/export/AbstractTransportExportAction.java b/src/main/java/crate/elasticsearch/action/export/AbstractTransportExportAction.java index c6e0eda..3fdd7a6 100644 --- a/src/main/java/crate/elasticsearch/action/export/AbstractTransportExportAction.java +++ b/src/main/java/crate/elasticsearch/action/export/AbstractTransportExportAction.java @@ -149,7 +149,7 @@ protected ShardExportResponse shardOperation(ShardExportRequest request) throws SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId()); ExportContext context = new ExportContext(0, new ShardSearchRequest().types(request.types()).filteringAliases(request.filteringAliases()), - shardTarget, indexShard.searcher(), indexService, indexShard, scriptService, cacheRecycler, nodePath); + shardTarget, indexShard.acquireSearcher("inout-plugin"), indexService, indexShard, scriptService, cacheRecycler, nodePath); ExportContext.setCurrent(context); try { diff --git a/src/main/java/crate/elasticsearch/action/export/ExportContext.java b/src/main/java/crate/elasticsearch/action/export/ExportContext.java index cfd55f3..625a561 100644 --- a/src/main/java/crate/elasticsearch/action/export/ExportContext.java +++ b/src/main/java/crate/elasticsearch/action/export/ExportContext.java @@ -10,6 +10,7 @@ import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.internal.DefaultSearchContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; @@ -20,7 +21,7 @@ /** * Container class for export specific informations. */ -public class ExportContext extends SearchContext { +public class ExportContext extends DefaultSearchContext { private static final String VAR_SHARD = "${shard}"; private static final String VAR_INDEX = "${index}"; diff --git a/src/main/java/crate/elasticsearch/action/export/ExportRequest.java b/src/main/java/crate/elasticsearch/action/export/ExportRequest.java index 38578e0..641dd26 100644 --- a/src/main/java/crate/elasticsearch/action/export/ExportRequest.java +++ b/src/main/java/crate/elasticsearch/action/export/ExportRequest.java @@ -3,7 +3,6 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Required; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -60,13 +59,11 @@ public BytesReference source() { return source; } - @Required public ExportRequest source(String source) { return this.source(new BytesArray(source), false); } - @Required public ExportRequest source(BytesReference source, boolean unsafe) { this.source = source; this.querySourceUnsafe = unsafe; diff --git a/src/main/java/crate/elasticsearch/action/import_/AbstractTransportImportAction.java b/src/main/java/crate/elasticsearch/action/import_/AbstractTransportImportAction.java index a663f41..d627d6f 100644 --- a/src/main/java/crate/elasticsearch/action/import_/AbstractTransportImportAction.java +++ b/src/main/java/crate/elasticsearch/action/import_/AbstractTransportImportAction.java @@ -2,6 +2,8 @@ import crate.elasticsearch.action.import_.parser.IImportParser; import crate.elasticsearch.import_.Importer; +import crate.elasticsearch.script.ScriptProvider; + import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.nodes.TransportNodesOperationAction; @@ -11,6 +13,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -24,19 +27,28 @@ public abstract class AbstractTransportImportAction extends TransportNodesOperationAction{ private IImportParser importParser; + + private ScriptProvider scriptProvider; private Importer importer; private String nodePath = ""; + + private final ScriptService scriptService; @Inject public AbstractTransportImportAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, - TransportService transportService, IImportParser importParser, Importer importer, NodeEnvironment nodeEnv) { + TransportService transportService, + ScriptService scriptService, + ScriptProvider scriptProvider, + IImportParser importParser, + Importer importer, NodeEnvironment nodeEnv) { super(settings, clusterName, threadPool, clusterService, transportService); this.importParser = importParser; + this.scriptProvider = scriptProvider; this.importer = importer; - + this.scriptService=scriptService; File[] paths = nodeEnv.nodeDataLocations(); if (paths.length > 0) { nodePath = paths[0].getAbsolutePath(); @@ -110,9 +122,9 @@ protected NodeImportResponse newNodeResponse() { protected NodeImportResponse nodeOperation(NodeImportRequest request) throws ElasticSearchException { ImportContext context = new ImportContext(nodePath); - BytesReference source = request.source(); importParser.parseSource(context, source); + scriptProvider.prepareContextForScriptExecution(context, scriptService); Importer.Result result = importer.execute(context, request); return new NodeImportResponse(clusterService.state().nodes().localNode(), result); } diff --git a/src/main/java/crate/elasticsearch/action/import_/ImportContext.java b/src/main/java/crate/elasticsearch/action/import_/ImportContext.java index 4e30ebe..cd5d20a 100644 --- a/src/main/java/crate/elasticsearch/action/import_/ImportContext.java +++ b/src/main/java/crate/elasticsearch/action/import_/ImportContext.java @@ -1,9 +1,15 @@ package crate.elasticsearch.action.import_; import java.io.File; +import java.util.HashMap; +import java.util.Map; import java.util.regex.Pattern; -public class ImportContext { +import org.elasticsearch.script.ExecutableScript; + +import crate.elasticsearch.script.IScriptContext; + +public class ImportContext implements IScriptContext{ private String nodePath; private boolean compression; @@ -11,12 +17,19 @@ public class ImportContext { private Pattern file_pattern; private boolean mappings = false; private boolean settings = false; - - public ImportContext(String nodePath) { + private String scriptString; + private String scriptLang; + private Map scriptParams; + private Map executionContext; + private ExecutableScript executableScript; + + public ImportContext(String nodePath) { + super(); this.nodePath = nodePath; + this.executionContext = new HashMap(); } - public boolean compression() { + public boolean compression() { return compression; } @@ -60,4 +73,59 @@ public boolean settings() { public void settings(boolean settings) { this.settings = settings; } + + + + @Override + public String scriptString() { + return scriptString; + } + + @Override + public void scriptString(String scriptString) { + this.scriptString = scriptString; + } + + @Override + public String scriptLang() { + return scriptLang; + } + + @Override + public void scriptLang(String scriptLang) { + this.scriptLang = scriptLang; + } + + @Override + public Map scriptParams() { + return scriptParams; + } + + @Override + public void scriptParams(Map scriptParams) { + this.scriptParams = scriptParams; + } + + + @Override + public void executableScript(ExecutableScript executableScript) { + this.executableScript = executableScript; + } + + + @Override + public void executionContext(Map executionContext) { + this.executionContext = executionContext; + } + + @Override + public Map executionContext() { + return executionContext; + } + + @Override + public ExecutableScript executableScript() { + return executableScript; + } + } diff --git a/src/main/java/crate/elasticsearch/action/import_/ImportRequest.java b/src/main/java/crate/elasticsearch/action/import_/ImportRequest.java index a9863b7..ae690e7 100644 --- a/src/main/java/crate/elasticsearch/action/import_/ImportRequest.java +++ b/src/main/java/crate/elasticsearch/action/import_/ImportRequest.java @@ -1,7 +1,6 @@ package crate.elasticsearch.action.import_; import org.elasticsearch.action.support.nodes.NodesOperationRequest; -import org.elasticsearch.common.Required; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -32,12 +31,10 @@ public BytesReference source() { return source; } - @Required public ImportRequest source(String source) { return this.source(new BytesArray(source), false); } - @Required public ImportRequest source(BytesReference source, boolean unsafe) { this.source = source; return this; diff --git a/src/main/java/crate/elasticsearch/action/import_/NodeImportResponse.java b/src/main/java/crate/elasticsearch/action/import_/NodeImportResponse.java index 6d0ad41..2614620 100644 --- a/src/main/java/crate/elasticsearch/action/import_/NodeImportResponse.java +++ b/src/main/java/crate/elasticsearch/action/import_/NodeImportResponse.java @@ -39,6 +39,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) if (counts.invalid > 0) { builder.field(Fields.INVALIDATED, counts.invalid); } + if (counts.deletes > 0) { + builder.field(Fields.DELETES, counts.deletes); + } builder.endObject(); } builder.endArray(); @@ -58,6 +61,7 @@ public void readFrom(StreamInput in) throws IOException { counts.successes = in.readInt(); counts.failures = in.readInt(); counts.invalid = in.readInt(); + counts.deletes = in.readInt(); result.importCounts.add(counts); } } @@ -72,6 +76,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(counts.successes); out.writeInt(counts.failures); out.writeInt(counts.invalid); + out.writeInt(counts.deletes); } } @@ -89,5 +94,6 @@ static final class Fields { static final XContentBuilderString SUCCESSES = new XContentBuilderString("successes"); static final XContentBuilderString FAILURES = new XContentBuilderString("failures"); static final XContentBuilderString INVALIDATED = new XContentBuilderString("invalidated"); + static final XContentBuilderString DELETES = new XContentBuilderString("deletes"); } } diff --git a/src/main/java/crate/elasticsearch/action/import_/TransportImportAction.java b/src/main/java/crate/elasticsearch/action/import_/TransportImportAction.java index e18f25e..686dae7 100644 --- a/src/main/java/crate/elasticsearch/action/import_/TransportImportAction.java +++ b/src/main/java/crate/elasticsearch/action/import_/TransportImportAction.java @@ -2,11 +2,14 @@ import crate.elasticsearch.action.import_.parser.ImportParser; import crate.elasticsearch.import_.Importer; +import crate.elasticsearch.script.ScriptProvider; + import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -18,8 +21,8 @@ public class TransportImportAction extends AbstractTransportImportAction { @Inject public TransportImportAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, - TransportService transportService, ImportParser importParser, Importer importer, NodeEnvironment nodeEnv) { - super(settings, clusterName, threadPool, clusterService, transportService, importParser, importer, nodeEnv); + TransportService transportService, ScriptService scriptService, ScriptProvider scriptProvider, ImportParser importParser, Importer importer, NodeEnvironment nodeEnv) { + super(settings, clusterName, threadPool, clusterService, transportService, scriptService, scriptProvider, importParser, importer, nodeEnv); } @Override diff --git a/src/main/java/crate/elasticsearch/action/import_/parser/ImportParser.java b/src/main/java/crate/elasticsearch/action/import_/parser/ImportParser.java index 6dddb93..eeaa53d 100644 --- a/src/main/java/crate/elasticsearch/action/import_/parser/ImportParser.java +++ b/src/main/java/crate/elasticsearch/action/import_/parser/ImportParser.java @@ -5,18 +5,25 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchParseException; import crate.elasticsearch.action.import_.ImportContext; +import crate.elasticsearch.script.ScriptParseElement; +import crate.elasticsearch.script.ScriptParser; -public class ImportParser implements IImportParser { +public class ImportParser implements IImportParser{ private final ImmutableMap elementParsers; + + private final ImmutableMap scriptElementParsers; + - public ImportParser() { + @Inject + public ImportParser(ScriptParser scriptParser) { Map elementParsers = new HashMap(); elementParsers.put("directory", new DirectoryParseElement()); elementParsers.put("compression", new ImportCompressionParseElement()); @@ -24,6 +31,7 @@ public ImportParser() { elementParsers.put("mappings", new ImportMappingsParseElement()); elementParsers.put("settings", new ImportSettingsParseElement()); this.elementParsers = ImmutableMap.copyOf(elementParsers); + this.scriptElementParsers = ImmutableMap.copyOf(scriptParser.scriptElementParsers()); } /** @@ -45,9 +53,17 @@ public void parseSource(ImportContext context, BytesReference source) throws Imp parser.nextToken(); ImportParseElement element = elementParsers.get(fieldName); if (element == null) { - throw new ImportParseException(context, "No parser for element [" + fieldName + "]"); + // try with script element + ScriptParseElement scriptElement = scriptElementParsers.get(fieldName); + if(scriptElement==null){ + throw new ImportParseException(context, "No parser for element [" + fieldName + "]"); + } else { + scriptElement.parse(parser, context); + } + } + else { + element.parse(parser, context); } - element.parse(parser, context); } else if (token == null) { break; } diff --git a/src/main/java/crate/elasticsearch/action/reindex/ReindexParser.java b/src/main/java/crate/elasticsearch/action/reindex/ReindexParser.java index b3011da..dc9901d 100644 --- a/src/main/java/crate/elasticsearch/action/reindex/ReindexParser.java +++ b/src/main/java/crate/elasticsearch/action/reindex/ReindexParser.java @@ -15,6 +15,7 @@ import crate.elasticsearch.action.searchinto.SearchIntoContext; import crate.elasticsearch.action.searchinto.parser.AbstractSearchIntoParser; import crate.elasticsearch.action.searchinto.parser.ISearchIntoParser; +import crate.elasticsearch.script.ScriptParser; /** * Parser for pay load given to _reindex action. @@ -22,9 +23,9 @@ public class ReindexParser extends AbstractSearchIntoParser implements ISearchIntoParser { private final ImmutableMap elementParsers; - @Inject - public ReindexParser(QueryPhase queryPhase, FetchPhase fetchPhase) { + public ReindexParser(QueryPhase queryPhase, FetchPhase fetchPhase, ScriptParser scriptParser) { + super(scriptParser); Map elementParsers = new HashMap(); elementParsers.putAll(queryPhase.parseElements()); @@ -39,7 +40,7 @@ protected ImmutableMap getElementParsers() { @Override public void parseSource(SearchIntoContext context, BytesReference source) - throws SearchParseException { + throws SearchParseException { context.fieldNames().add("_id"); context.fieldNames().add("_source"); context.outputNames().put("_id", "_id"); diff --git a/src/main/java/crate/elasticsearch/action/reindex/TransportReindexAction.java b/src/main/java/crate/elasticsearch/action/reindex/TransportReindexAction.java index 0fd8752..ab8cf46 100644 --- a/src/main/java/crate/elasticsearch/action/reindex/TransportReindexAction.java +++ b/src/main/java/crate/elasticsearch/action/reindex/TransportReindexAction.java @@ -10,6 +10,7 @@ import org.elasticsearch.transport.TransportService; import crate.elasticsearch.action.searchinto.AbstractTransportSearchIntoAction; +import crate.elasticsearch.script.ScriptProvider; import crate.elasticsearch.searchinto.Writer; public class TransportReindexAction extends AbstractTransportSearchIntoAction { @@ -18,9 +19,9 @@ public class TransportReindexAction extends AbstractTransportSearchIntoAction { public TransportReindexAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, CacheRecycler cacheRecycler, IndicesService indicesService, ScriptService scriptService, - ReindexParser parser, Writer writer) { + ScriptProvider scriptProvider, ReindexParser parser, Writer writer) { super(settings, threadPool, clusterService, transportService, cacheRecycler, indicesService, - scriptService, parser, writer); + scriptService, scriptProvider, parser, writer); } @Override diff --git a/src/main/java/crate/elasticsearch/action/restore/TransportRestoreAction.java b/src/main/java/crate/elasticsearch/action/restore/TransportRestoreAction.java index d19da54..7000d93 100644 --- a/src/main/java/crate/elasticsearch/action/restore/TransportRestoreAction.java +++ b/src/main/java/crate/elasticsearch/action/restore/TransportRestoreAction.java @@ -3,11 +3,14 @@ import crate.elasticsearch.action.import_.AbstractTransportImportAction; import crate.elasticsearch.action.restore.parser.RestoreParser; import crate.elasticsearch.import_.Importer; +import crate.elasticsearch.script.ScriptProvider; + import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -19,8 +22,8 @@ public class TransportRestoreAction extends AbstractTransportImportAction { @Inject public TransportRestoreAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, - TransportService transportService, RestoreParser restoreParser, Importer importer, NodeEnvironment nodeEnv) { - super(settings, clusterName, threadPool, clusterService, transportService, restoreParser, importer, nodeEnv); + TransportService transportService, ScriptService scriptService, ScriptProvider scriptProvider, RestoreParser restoreParser, Importer importer, NodeEnvironment nodeEnv) { + super(settings, clusterName, threadPool, clusterService, transportService, scriptService, scriptProvider, restoreParser, importer, nodeEnv); } @Override diff --git a/src/main/java/crate/elasticsearch/action/searchinto/AbstractTransportSearchIntoAction.java b/src/main/java/crate/elasticsearch/action/searchinto/AbstractTransportSearchIntoAction.java index 07320ab..5bff496 100644 --- a/src/main/java/crate/elasticsearch/action/searchinto/AbstractTransportSearchIntoAction.java +++ b/src/main/java/crate/elasticsearch/action/searchinto/AbstractTransportSearchIntoAction.java @@ -33,9 +33,8 @@ import org.elasticsearch.search.query.QueryPhaseExecutionException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; - import crate.elasticsearch.action.searchinto.parser.ISearchIntoParser; -import crate.elasticsearch.action.searchinto.parser.SearchIntoParser; +import crate.elasticsearch.script.ScriptProvider; import crate.elasticsearch.searchinto.Writer; import crate.elasticsearch.searchinto.WriterResult; @@ -51,6 +50,8 @@ public abstract class AbstractTransportSearchIntoAction extends private final IndicesService indicesService; private final ScriptService scriptService; + + private ScriptProvider scriptProvider; private final ISearchIntoParser parser; @@ -63,11 +64,13 @@ public AbstractTransportSearchIntoAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, CacheRecycler cacheRecycler, IndicesService indicesService, ScriptService scriptService, + ScriptProvider scriptProvider, ISearchIntoParser parser, Writer writer) { super(settings, threadPool, clusterService, transportService); this.indicesService = indicesService; this.cacheRecycler = cacheRecycler; this.scriptService = scriptService; + this.scriptProvider = scriptProvider; this.parser = parser; this.writer = writer; } @@ -168,13 +171,14 @@ protected ShardSearchIntoResponse shardOperation(ShardSearchIntoRequest request.shardId()); SearchIntoContext context = new SearchIntoContext(0, new ShardSearchRequest().types(request.types()).filteringAliases(request.filteringAliases()), - shardTarget, indexShard.searcher(), indexService, indexShard, scriptService, cacheRecycler + shardTarget, indexShard.acquireSearcher("inout-plugin"), indexService, indexShard, scriptService, cacheRecycler ); SearchIntoContext.setCurrent(context); - try { BytesReference source = request.source(); parser.parseSource(context, source); + scriptProvider.prepareContextForScriptExecution(context, scriptService); + context.preProcess(); try { if (context.explain()) { diff --git a/src/main/java/crate/elasticsearch/action/searchinto/SearchIntoContext.java b/src/main/java/crate/elasticsearch/action/searchinto/SearchIntoContext.java index 29faa30..6287c87 100644 --- a/src/main/java/crate/elasticsearch/action/searchinto/SearchIntoContext.java +++ b/src/main/java/crate/elasticsearch/action/searchinto/SearchIntoContext.java @@ -7,11 +7,14 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.DefaultSearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; +import crate.elasticsearch.script.IScriptContext; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -19,13 +22,19 @@ /** * Container class for inout specific informations. */ -public class SearchIntoContext extends SearchContext { +public class SearchIntoContext extends DefaultSearchContext implements IScriptContext { // currently we only support index targets private String targetType = "index"; private List targetNodes; + private String scriptString; + private String scriptLang; + private Map scriptParams; + private Map executionContext; + private ExecutableScript executableScript; + public Map outputNames() { return outputNames; @@ -40,6 +49,7 @@ public SearchIntoContext(long id, ShardSearchRequest request, ScriptService scriptService, CacheRecycler cacheRecycler) { super(id, request, shardTarget, engineSearcher, indexService, indexShard, scriptService, cacheRecycler); + this.executionContext = new HashMap(); } public String targetType() { @@ -58,4 +68,58 @@ public void emptyTargetNodes() { this.targetNodes = ImmutableList.of(); } + @Override + public String scriptString() { + return scriptString; + } + + @Override + public void scriptString(String scriptString) { + this.scriptString = scriptString; + } + + @Override + public String scriptLang() { + return scriptLang; + } + + @Override + public void scriptLang(String scriptLang) { + this.scriptLang = scriptLang; + } + + @Override + public Map scriptParams() { + return scriptParams; + } + + @Override + public void scriptParams(Map scriptParams) { + this.scriptParams = scriptParams; + } + + + @Override + public void executableScript(ExecutableScript executableScript) { + this.executableScript = executableScript; + } + + + @Override + public void executionContext(Map executionContext) { + this.executionContext = executionContext; + } + + @Override + public Map executionContext() { + return executionContext; + } + + @Override + public ExecutableScript executableScript() { + return executableScript; + } + + + } diff --git a/src/main/java/crate/elasticsearch/action/searchinto/SearchIntoRequest.java b/src/main/java/crate/elasticsearch/action/searchinto/SearchIntoRequest.java index b2985e3..0944865 100644 --- a/src/main/java/crate/elasticsearch/action/searchinto/SearchIntoRequest.java +++ b/src/main/java/crate/elasticsearch/action/searchinto/SearchIntoRequest.java @@ -3,7 +3,6 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Required; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -63,13 +62,11 @@ public BytesReference source() { return source; } - @Required public SearchIntoRequest source(String source) { return this.source(new BytesArray(source), false); } - @Required public SearchIntoRequest source(BytesReference source, boolean unsafe) { this.source = source; this.querySourceUnsafe = unsafe; diff --git a/src/main/java/crate/elasticsearch/action/searchinto/TransportSearchIntoAction.java b/src/main/java/crate/elasticsearch/action/searchinto/TransportSearchIntoAction.java index 8ab7e43..6ecd18e 100644 --- a/src/main/java/crate/elasticsearch/action/searchinto/TransportSearchIntoAction.java +++ b/src/main/java/crate/elasticsearch/action/searchinto/TransportSearchIntoAction.java @@ -10,6 +10,7 @@ import org.elasticsearch.transport.TransportService; import crate.elasticsearch.action.searchinto.parser.SearchIntoParser; +import crate.elasticsearch.script.ScriptProvider; import crate.elasticsearch.searchinto.Writer; @@ -23,9 +24,10 @@ public TransportSearchIntoAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, CacheRecycler cacheRecycler, IndicesService indicesService, ScriptService scriptService, + ScriptProvider scriptProvider, SearchIntoParser parser, Writer writer) { super(settings, threadPool, clusterService, transportService, cacheRecycler, indicesService, - scriptService, parser, writer); + scriptService, scriptProvider, parser, writer); } @Override diff --git a/src/main/java/crate/elasticsearch/action/searchinto/parser/AbstractSearchIntoParser.java b/src/main/java/crate/elasticsearch/action/searchinto/parser/AbstractSearchIntoParser.java index 81e120b..8de9a67 100644 --- a/src/main/java/crate/elasticsearch/action/searchinto/parser/AbstractSearchIntoParser.java +++ b/src/main/java/crate/elasticsearch/action/searchinto/parser/AbstractSearchIntoParser.java @@ -2,15 +2,19 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchParseElement; import org.elasticsearch.search.SearchParseException; - import crate.elasticsearch.action.searchinto.SearchIntoContext; +import crate.elasticsearch.script.ScriptParseElement; +import crate.elasticsearch.script.ScriptParser; + +public abstract class AbstractSearchIntoParser implements ISearchIntoParser { + -public abstract class AbstractSearchIntoParser implements ISearchIntoParser { /** * Main method of this class to parse given payload of _search_into action @@ -20,8 +24,16 @@ public abstract class AbstractSearchIntoParser implements ISearchIntoParser { * @throws org.elasticsearch.search.SearchParseException * */ - public void parseSource(SearchIntoContext context, + private final ImmutableMap scriptElementParsers; + + @Inject + public AbstractSearchIntoParser(ScriptParser scriptParser) { + this.scriptElementParsers = ImmutableMap.copyOf(scriptParser.scriptElementParsers()); + } + + public void parseSource(SearchIntoContext context, BytesReference source) throws SearchParseException { + XContentParser parser = null; try { if (source != null) { @@ -35,11 +47,15 @@ public void parseSource(SearchIntoContext context, SearchParseElement element = getElementParsers().get( fieldName); if (element == null) { - throw new SearchParseException(context, - "No parser for element [" + fieldName + - "]"); + ScriptParseElement scriptElement = scriptElementParsers.get(fieldName); + if(scriptElement==null){ + throw new SearchParseException(context, "No parser for element [" + fieldName + "]"); + } else { + scriptElement.parse(parser, context); + } + } else { + element.parse(parser, context); } - element.parse(parser, context); } else if (token == null) { break; } diff --git a/src/main/java/crate/elasticsearch/action/searchinto/parser/SearchIntoParser.java b/src/main/java/crate/elasticsearch/action/searchinto/parser/SearchIntoParser.java index 2745c72..6389c30 100644 --- a/src/main/java/crate/elasticsearch/action/searchinto/parser/SearchIntoParser.java +++ b/src/main/java/crate/elasticsearch/action/searchinto/parser/SearchIntoParser.java @@ -3,6 +3,7 @@ import java.util.HashMap; import java.util.Map; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.mapper.FieldMapper; @@ -13,6 +14,7 @@ import org.elasticsearch.search.query.QueryPhase; import crate.elasticsearch.action.searchinto.SearchIntoContext; +import crate.elasticsearch.script.ScriptParser; /** * Parser for payload given to _search_into action. @@ -20,9 +22,11 @@ public class SearchIntoParser extends AbstractSearchIntoParser implements ISearchIntoParser { private final ImmutableMap elementParsers; + @Inject - public SearchIntoParser(QueryPhase queryPhase, FetchPhase fetchPhase) { + public SearchIntoParser(QueryPhase queryPhase, FetchPhase fetchPhase, ScriptParser scriptParser) { + super(scriptParser); Map elementParsers = new HashMap(); elementParsers.putAll(queryPhase.parseElements()); @@ -57,5 +61,10 @@ protected ImmutableMap getElementParsers() { return elementParsers; } + @Override + public void parseSource(SearchIntoContext context, BytesReference source) + throws SearchParseException { + super.parseSource(context, source); + } } \ No newline at end of file diff --git a/src/main/java/crate/elasticsearch/client/action/import_/ImportRequestBuilder.java b/src/main/java/crate/elasticsearch/client/action/import_/ImportRequestBuilder.java index ffc2514..711d5bf 100644 --- a/src/main/java/crate/elasticsearch/client/action/import_/ImportRequestBuilder.java +++ b/src/main/java/crate/elasticsearch/client/action/import_/ImportRequestBuilder.java @@ -1,14 +1,13 @@ package crate.elasticsearch.client.action.import_; +import crate.elasticsearch.action.import_.ImportAction; +import crate.elasticsearch.action.import_.ImportRequest; +import crate.elasticsearch.action.import_.ImportResponse; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.client.internal.InternalClient; -import crate.elasticsearch.action.import_.ImportAction; -import crate.elasticsearch.action.import_.ImportRequest; -import crate.elasticsearch.action.import_.ImportResponse; - public class ImportRequestBuilder extends ActionRequestBuilder { public ImportRequestBuilder(Client client) { diff --git a/src/main/java/crate/elasticsearch/export/ExportCollector.java b/src/main/java/crate/elasticsearch/export/ExportCollector.java index d7cd963..bf474c8 100644 --- a/src/main/java/crate/elasticsearch/export/ExportCollector.java +++ b/src/main/java/crate/elasticsearch/export/ExportCollector.java @@ -15,6 +15,7 @@ import org.apache.lucene.search.Collector; import org.apache.lucene.search.Scorer; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.text.StringAndBytesText; import org.elasticsearch.common.text.Text; @@ -22,15 +23,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.fieldvisitor.CustomFieldsVisitor; -import org.elasticsearch.index.fieldvisitor.FieldsVisitor; -import org.elasticsearch.index.fieldvisitor.JustUidFieldsVisitor; -import org.elasticsearch.index.fieldvisitor.UidAndSourceFieldsVisitor; +import org.elasticsearch.index.fieldvisitor.*; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.FieldMappers; import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.fetch.FetchSubPhase; +import org.elasticsearch.search.fetch.source.FetchSourceContext; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHitField; @@ -61,17 +60,21 @@ public ExportCollector(ExportContext context, if (!context.hasFieldNames()) { if (context.hasPartialFields()) { - // partial fields need the source, so fetch it, but don't return it + // partial fields need the source, so fetch it fieldsVisitor = new UidAndSourceFieldsVisitor(); - } else if (context.hasScriptFields()) { - // we ask for script fields, and no field names, don't load the source - fieldsVisitor = new JustUidFieldsVisitor(); } else { - sourceRequested = true; - fieldsVisitor = new UidAndSourceFieldsVisitor(); + // no fields specified, default to return source if no explicit indication + if (!context.hasScriptFields() && !context.hasFetchSourceContext()) { + context.fetchSourceContext(new FetchSourceContext(true)); + } + fieldsVisitor = context.sourceRequested() ? new UidAndSourceFieldsVisitor() : new JustUidFieldsVisitor(); } } else if (context.fieldNames().isEmpty()) { - fieldsVisitor = new JustUidFieldsVisitor(); + if (context.sourceRequested()) { + fieldsVisitor = new UidAndSourceFieldsVisitor(); + } else { + fieldsVisitor = new JustUidFieldsVisitor(); + } } else { boolean loadAllStored = false; Set fieldNames = null; @@ -81,7 +84,11 @@ public ExportCollector(ExportContext context, continue; } if (fieldName.equals(SourceFieldMapper.NAME)) { - sourceRequested = true; + if (context.hasFetchSourceContext()) { + context.fetchSourceContext().fetchSource(true); + } else { + context.fetchSourceContext(new FetchSourceContext(true)); + } continue; } FieldMappers x = context.smartNameFieldMappers @@ -99,15 +106,11 @@ public ExportCollector(ExportContext context, } } if (loadAllStored) { - if (sourceRequested || extractFieldNames != null) { - fieldsVisitor = new CustomFieldsVisitor(true, true); // load everything, including _source - } else { - fieldsVisitor = new CustomFieldsVisitor(true, false); - } + fieldsVisitor = new AllFieldsVisitor(); // load everything, including _source } else if (fieldNames != null) { - boolean loadSource = extractFieldNames != null || sourceRequested; + boolean loadSource = extractFieldNames != null || context.sourceRequested(); fieldsVisitor = new CustomFieldsVisitor(fieldNames, loadSource); - } else if (extractFieldNames != null || sourceRequested) { + } else if (extractFieldNames != null || context.sourceRequested()) { fieldsVisitor = new UidAndSourceFieldsVisitor(); } else { fieldsVisitor = new JustUidFieldsVisitor(); @@ -158,8 +161,8 @@ public void collect(int doc) throws IOException { InternalSearchHit searchHit = new InternalSearchHit(doc, fieldsVisitor.uid().id(), typeText, - sourceRequested ? fieldsVisitor.source() : null, - searchFields); + searchFields).sourceRef(fieldsVisitor.source()); + for (FetchSubPhase fetchSubPhase : fetchSubPhases) { FetchSubPhase.HitContext hitContext = new FetchSubPhase.HitContext(); @@ -171,12 +174,9 @@ public void collect(int doc) throws IOException { searchHit.shardTarget(context.shardTarget()); exportFields.hit(searchHit); - BytesStreamOutput os = new BytesStreamOutput(); - XContentBuilder builder = new XContentBuilder(XContentFactory.xContent(XContentType.JSON), os); + XContentBuilder builder = new XContentBuilder(XContentFactory.xContent(XContentType.JSON), out); exportFields.toXContent(builder, ToXContent.EMPTY_PARAMS); builder.flush(); - BytesReference bytes = os.bytes(); - out.write(bytes.array(), bytes.arrayOffset(), bytes.length()); out.write('\n'); out.flush(); numExported++; diff --git a/src/main/java/crate/elasticsearch/export/Exporter.java b/src/main/java/crate/elasticsearch/export/Exporter.java index b5626bd..92f9cf7 100644 --- a/src/main/java/crate/elasticsearch/export/Exporter.java +++ b/src/main/java/crate/elasticsearch/export/Exporter.java @@ -9,6 +9,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.hppc.ObjectContainer; +import org.elasticsearch.common.hppc.cursors.ObjectCursor; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.logging.ESLogger; @@ -159,10 +161,10 @@ private void writeSettingsOrMappings(ExportContext context) { builder.startObject(indexMetaData.index(), XContentBuilder.FieldCaseConversion.NONE); Set types = new HashSet(Arrays.asList(context.types())); boolean noTypes = types.isEmpty(); - for (MappingMetaData mappingMetaData : indexMetaData.mappings().values()) { - if (noTypes || types.contains(mappingMetaData.type())) { - builder.field(mappingMetaData.type()); - builder.map(mappingMetaData.sourceAsMap()); + for (ObjectCursor mappingMetaData : indexMetaData.mappings().values()) { + if (noTypes || types.contains(mappingMetaData.value.type())) { + builder.field(mappingMetaData.value.type()); + builder.map(mappingMetaData.value.sourceAsMap()); } } builder.endObject(); diff --git a/src/main/java/crate/elasticsearch/import_/ImportBulkListener.java b/src/main/java/crate/elasticsearch/import_/ImportBulkListener.java index 1444b31..900928e 100644 --- a/src/main/java/crate/elasticsearch/import_/ImportBulkListener.java +++ b/src/main/java/crate/elasticsearch/import_/ImportBulkListener.java @@ -33,6 +33,10 @@ public void addFailure() { counts.failures++; } + public void addDelete() { + counts.deletes++; + } + public ImportCounts importCounts() { return counts; } diff --git a/src/main/java/crate/elasticsearch/import_/Importer.java b/src/main/java/crate/elasticsearch/import_/Importer.java index 7bcf930..b8d1c2f 100644 --- a/src/main/java/crate/elasticsearch/import_/Importer.java +++ b/src/main/java/crate/elasticsearch/import_/Importer.java @@ -20,7 +20,10 @@ import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; +import crate.elasticsearch.action.import_.ImportContext; +import crate.elasticsearch.action.import_.NodeImportRequest; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -32,7 +35,9 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.hppc.cursors.ObjectCursor; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.settings.ImmutableSettings; @@ -43,6 +48,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.common.xcontent.XContentType; @@ -57,9 +63,6 @@ import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexMissingException; -import crate.elasticsearch.action.import_.ImportContext; -import crate.elasticsearch.action.import_.NodeImportRequest; - public class Importer { private Client client; @@ -130,7 +133,7 @@ public boolean accept(File dir, String name) { for (File file : files) { String fileName = file.getName(); if (!fileName.endsWith(".mapping") && !fileName.endsWith(".settings")) { - ImportCounts counts = handleFile(file, index, type, bulkSize, context.compression()); + ImportCounts counts = handleFile(file, index, type, bulkSize, context); if (counts != null) { result.importCounts.add(counts); } @@ -141,7 +144,7 @@ public boolean accept(File dir, String name) { return result; } - private ImportCounts handleFile(File file, String index, String type, int bulkSize, boolean compression) { + private ImportCounts handleFile(File file, String index, String type, int bulkSize, ImportContext context) { if (file.isFile() && file.canRead()) { ImportBulkListener bulkListener = new ImportBulkListener(file.getAbsolutePath()); BulkProcessor bulkProcessor = BulkProcessor.builder(client, bulkListener) @@ -152,7 +155,7 @@ private ImportCounts handleFile(File file, String index, String type, int bulkSi .build(); try { BufferedReader r; - if (compression) { + if (context.compression()) { GZIPInputStream is = new GZIPInputStream(new FileInputStream(file)); r = new BufferedReader(new InputStreamReader(is)); } else { @@ -162,27 +165,32 @@ private ImportCounts handleFile(File file, String index, String type, int bulkSi while ((line = r.readLine()) != null) { IndexRequest indexRequest; try { - indexRequest = parseObject(line); + indexRequest = parseObject(line, context); + if (indexRequest == null) { + bulkListener.addDelete(); + continue; + } + } catch (ExpiredObjectException e) { + bulkListener.addInvalid(); + continue; } catch (ObjectImportException e) { bulkListener.addFailure(); continue; } - if (indexRequest != null) { - indexRequest.opType(OpType.INDEX); - if (index != null) { - indexRequest.index(index); - } - if (type != null) { - indexRequest.type(type); - } - if (indexRequest.type() != null && indexRequest.index() != null) { - bulkProcessor.add(indexRequest); - } else { - bulkListener.addFailure(); - } + + indexRequest.opType(OpType.INDEX); + if (index != null) { + indexRequest.index(index); + } + if (type != null) { + indexRequest.type(type); + } + if (indexRequest.type() != null && indexRequest.index() != null) { + bulkProcessor.add(indexRequest); } else { - bulkListener.addInvalid(); + bulkListener.addFailure(); } + } } catch (FileNotFoundException e) { // Ignore not existing files, actually they should exist, as they are filtered before. @@ -200,7 +208,7 @@ private ImportCounts handleFile(File file, String index, String type, int bulkSi return null; } - private IndexRequest parseObject(String line) throws ObjectImportException { + private IndexRequest parseObject(String line, ImportContext importContext) throws ObjectImportException, ExpiredObjectException { XContentParser parser = null; try { IndexRequest indexRequest = new IndexRequest(); @@ -247,10 +255,64 @@ private IndexRequest parseObject(String line) throws ObjectImportException { indexRequest.ttl(ttl); } else { // object is invalid, do not import - return null; + throw new ExpiredObjectException(); } } - indexRequest.source(sourceBuilder); + + if(importContext.scriptString()!=null){ + Tuple> sourceAndContent = XContentHelper.convertToMap(sourceBuilder.bytes(), true); + importContext.executionContext().clear(); + importContext.executionContext().put("_index", indexRequest.index()); + importContext.executionContext().put("_type", indexRequest.type()); + importContext.executionContext().put("_id", indexRequest.id()); + importContext.executionContext().put("_version", indexRequest.version()); + importContext.executionContext().put("_source", sourceAndContent.v2()); + importContext.executionContext().put("_routing", indexRequest.routing()); + importContext.executionContext().put("_parent", indexRequest.parent()); + importContext.executionContext().put("_timestamp", indexRequest.timestamp()); + importContext.executionContext().put("_ttl", indexRequest.ttl()); + + try { + importContext.executableScript().setNextVar("ctx", importContext.executionContext()); + importContext.executableScript().run(); + // we need to unwrap the ctx... + importContext.executionContext().putAll((Map) importContext.executableScript().unwrap(importContext.executionContext())); + indexRequest.source(sourceAndContent.v2()); + + String operation = (String) importContext.executionContext().get("op"); + if (!(operation == null || "index".equals(operation))) { + return null; + } + + Object fetchedTimestamp = importContext.executionContext().get("_timestamp"); + if (fetchedTimestamp != null) { + if (fetchedTimestamp instanceof String) { + indexRequest.timestamp(String.valueOf(TimeValue.parseTimeValue((String) fetchedTimestamp, null).millis())); + } else { + indexRequest.timestamp(fetchedTimestamp.toString()); + } + } + Object fetchedTTL = importContext.executionContext().get("_ttl"); + if (fetchedTTL != null) { + Long newTtl = -1L; + if (fetchedTTL instanceof Number) { + newTtl = ((Number) fetchedTTL).longValue(); + + } else { + newTtl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis(); + } + if (newTtl > 0) { + indexRequest.ttl(newTtl); + } + } + + } catch (Exception e) { + throw new ElasticSearchIllegalArgumentException("failed to execute script", e); + } + } else { + indexRequest.source(sourceBuilder); + } + return indexRequest; } catch (ElasticSearchParseException e) { throw new ObjectImportException(e); @@ -355,22 +417,24 @@ private void loadMappings(File file, Map> createdMappings, S private Set getMissingIndexes(Set indexes) { try { - ImmutableMap foundIndices = getIndexMetaData(indexes); - indexes.removeAll(foundIndices.keySet()); + ImmutableOpenMap foundIndices = getIndexMetaData(indexes); + for (ObjectCursor oneIndex : foundIndices.keys()) { + indexes.remove(oneIndex.value); + } } catch (IndexMissingException e) { // all indexes are missing } return indexes; } - private ImmutableMap getIndexMetaData(Set indexes) { + private ImmutableOpenMap getIndexMetaData(Set indexes) { ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest() .filterRoutingTable(true) .filterNodes(true) .filteredIndices(indexes.toArray(new String[indexes.size()])); clusterStateRequest.listenerThreaded(false); ClusterStateResponse response = client.admin().cluster().state(clusterStateRequest).actionGet(); - return ImmutableMap.copyOf(response.getState().metaData().indices()); + return response.getState().metaData().indices(); } @@ -423,6 +487,15 @@ public ObjectImportException(Throwable cause) { } } + class ExpiredObjectException extends ElasticSearchException { + + private static final long serialVersionUID = 2445764408399929056L; + + public ExpiredObjectException() { + super("Object TTL expired, could not be imported."); + } + } + public static class Result { public List importCounts = new ArrayList(); public long took; @@ -433,6 +506,7 @@ public static class ImportCounts { public int successes = 0; public int failures = 0; public int invalid = 0; + public int deletes = 0; } } diff --git a/src/main/java/crate/elasticsearch/module/export/ExportModule.java b/src/main/java/crate/elasticsearch/module/export/ExportModule.java index 2b1e024..8e717e3 100644 --- a/src/main/java/crate/elasticsearch/module/export/ExportModule.java +++ b/src/main/java/crate/elasticsearch/module/export/ExportModule.java @@ -1,18 +1,14 @@ package crate.elasticsearch.module.export; - -import crate.elasticsearch.action.export.AbstractTransportExportAction; +import crate.elasticsearch.action.export.ExportAction; import crate.elasticsearch.action.export.TransportExportAction; -import crate.elasticsearch.action.export.parser.IExportParser; +import crate.elasticsearch.action.export.parser.ExportParser; +import crate.elasticsearch.export.Exporter; import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.MapBinder; -import crate.elasticsearch.action.export.ExportAction; -import crate.elasticsearch.action.export.parser.ExportParser; -import crate.elasticsearch.export.Exporter; - public class ExportModule extends AbstractModule { @Override diff --git a/src/main/java/crate/elasticsearch/rest/action/admin/export/RestExportAction.java b/src/main/java/crate/elasticsearch/rest/action/admin/export/RestExportAction.java index 0e16b14..b74108f 100644 --- a/src/main/java/crate/elasticsearch/rest/action/admin/export/RestExportAction.java +++ b/src/main/java/crate/elasticsearch/rest/action/admin/export/RestExportAction.java @@ -3,7 +3,6 @@ import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; import static org.elasticsearch.rest.RestStatus.OK; -import static org.elasticsearch.rest.action.support.RestActions.splitTypes; import java.io.IOException; @@ -12,6 +11,7 @@ import org.elasticsearch.action.support.IgnoreIndices; import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -52,7 +52,7 @@ protected Action action() { } public void handleRequest(final RestRequest request, final RestChannel channel) { - ExportRequest exportRequest = new ExportRequest(RestActions.splitIndices(request.param("index"))); + ExportRequest exportRequest = new ExportRequest(Strings.splitStringByCommaToArray(request.param("index"))); if (request.hasParam("ignore_indices")) { exportRequest.ignoreIndices(IgnoreIndices.fromString(request.param("ignore_indices"))); @@ -79,7 +79,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel) } } exportRequest.routing(request.param("routing")); - exportRequest.types(splitTypes(request.param("type"))); + exportRequest.types(Strings.splitStringByCommaToArray(request.param("type"))); exportRequest.preference(request.param("preference", "_primary")); } catch (Exception e) { try { diff --git a/src/main/java/crate/elasticsearch/rest/action/admin/searchinto/RestSearchIntoAction.java b/src/main/java/crate/elasticsearch/rest/action/admin/searchinto/RestSearchIntoAction.java index 85af589..91e3b46 100644 --- a/src/main/java/crate/elasticsearch/rest/action/admin/searchinto/RestSearchIntoAction.java +++ b/src/main/java/crate/elasticsearch/rest/action/admin/searchinto/RestSearchIntoAction.java @@ -3,7 +3,6 @@ import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; import static org.elasticsearch.rest.RestStatus.OK; -import static org.elasticsearch.rest.action.support.RestActions.splitTypes; import java.io.IOException; @@ -12,6 +11,7 @@ import org.elasticsearch.action.support.IgnoreIndices; import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -55,7 +55,7 @@ protected Action scriptParams); + + void executableScript(ExecutableScript executableScript); + + void executionContext(Map executionContext); + + public String scriptString(); + + public String scriptLang(); + + public Map scriptParams(); + + ExecutableScript executableScript(); + + Map executionContext(); + +} diff --git a/src/main/java/crate/elasticsearch/script/ScriptLangParseElement.java b/src/main/java/crate/elasticsearch/script/ScriptLangParseElement.java new file mode 100644 index 0000000..f455658 --- /dev/null +++ b/src/main/java/crate/elasticsearch/script/ScriptLangParseElement.java @@ -0,0 +1,18 @@ +package crate.elasticsearch.script; + + +import org.elasticsearch.common.xcontent.XContentParser; + +public class ScriptLangParseElement implements ScriptParseElement { + + @Override + public void parse(XContentParser parser, IScriptContext context) + throws Exception { + XContentParser.Token token = parser.currentToken(); + if (token.isValue()) { + context.scriptLang(parser.text()); + } + + } + +} diff --git a/src/main/java/crate/elasticsearch/script/ScriptParamsParseElement.java b/src/main/java/crate/elasticsearch/script/ScriptParamsParseElement.java new file mode 100644 index 0000000..5508028 --- /dev/null +++ b/src/main/java/crate/elasticsearch/script/ScriptParamsParseElement.java @@ -0,0 +1,20 @@ +package crate.elasticsearch.script; + +import org.elasticsearch.common.xcontent.XContentParser; + + +public class ScriptParamsParseElement implements ScriptParseElement { + + @Override + public void parse(XContentParser parser, IScriptContext context) + throws Exception { + XContentParser.Token token = parser.currentToken(); + if (token.isValue()) { + context.scriptParams(parser.map()); + } + + } + + + +} diff --git a/src/main/java/crate/elasticsearch/script/ScriptParseElement.java b/src/main/java/crate/elasticsearch/script/ScriptParseElement.java new file mode 100644 index 0000000..f343aee --- /dev/null +++ b/src/main/java/crate/elasticsearch/script/ScriptParseElement.java @@ -0,0 +1,8 @@ +package crate.elasticsearch.script; + +import org.elasticsearch.common.xcontent.XContentParser; + +public interface ScriptParseElement { + + void parse(XContentParser parser, IScriptContext context) throws Exception; +} diff --git a/src/main/java/crate/elasticsearch/script/ScriptParser.java b/src/main/java/crate/elasticsearch/script/ScriptParser.java new file mode 100644 index 0000000..c186d72 --- /dev/null +++ b/src/main/java/crate/elasticsearch/script/ScriptParser.java @@ -0,0 +1,26 @@ +package crate.elasticsearch.script; + +import java.util.HashMap; +import java.util.Map; + +import org.elasticsearch.common.collect.ImmutableMap; + + +public class ScriptParser { + + private ImmutableMap scriptElementParsers; + + public ScriptParser() { + Map scriptElementParsers = new HashMap(); + scriptElementParsers.put("script", new ScriptStringParseElement()); + scriptElementParsers.put("lang", new ScriptLangParseElement()); + scriptElementParsers.put("params", new ScriptParamsParseElement()); + this.scriptElementParsers = ImmutableMap.copyOf(scriptElementParsers); + } + + public ImmutableMap scriptElementParsers() { + return scriptElementParsers; + } + + +} diff --git a/src/main/java/crate/elasticsearch/script/ScriptProvider.java b/src/main/java/crate/elasticsearch/script/ScriptProvider.java new file mode 100644 index 0000000..7148d61 --- /dev/null +++ b/src/main/java/crate/elasticsearch/script/ScriptProvider.java @@ -0,0 +1,20 @@ +package crate.elasticsearch.script; + +import org.elasticsearch.script.ExecutableScript; +import org.elasticsearch.script.ScriptService; + +import java.util.Map; + +public class ScriptProvider { + + public IScriptContext prepareContextForScriptExecution(IScriptContext context, ScriptService scriptService) { + String scriptString = context.scriptString(); + String scriptLang = context.scriptLang(); + Map scriptParams = context.scriptParams(); + if (context.scriptString() != null) { + ExecutableScript executableScript = scriptService.executable(scriptLang, scriptString, scriptParams); + context.executableScript(executableScript); + } + return context; + } +} diff --git a/src/main/java/crate/elasticsearch/script/ScriptStringParseElement.java b/src/main/java/crate/elasticsearch/script/ScriptStringParseElement.java new file mode 100644 index 0000000..25b75e1 --- /dev/null +++ b/src/main/java/crate/elasticsearch/script/ScriptStringParseElement.java @@ -0,0 +1,24 @@ +package crate.elasticsearch.script; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.xcontent.XContentParser; + +public class ScriptStringParseElement implements ScriptParseElement { + + protected final ESLogger logger = ESLoggerFactory.getLogger(this.getClass().getName()); + + + @Override + public void parse(XContentParser parser, IScriptContext context) + throws Exception { + XContentParser.Token token = parser.currentToken(); + if (token.isValue()) { + String content = parser.text(); + logger.info("Added script into the context: " + content); + context.scriptString(content); + } + + } + +} diff --git a/src/main/java/crate/elasticsearch/searchinto/BulkWriterCollector.java b/src/main/java/crate/elasticsearch/searchinto/BulkWriterCollector.java index f884cfd..28186e1 100644 --- a/src/main/java/crate/elasticsearch/searchinto/BulkWriterCollector.java +++ b/src/main/java/crate/elasticsearch/searchinto/BulkWriterCollector.java @@ -2,6 +2,7 @@ import crate.elasticsearch.action.searchinto.SearchIntoContext; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; @@ -12,7 +13,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.assistedinject.Assisted; import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; @@ -27,14 +28,14 @@ import java.io.IOException; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; public class BulkWriterCollector extends WriterCollector { - private static final ESLogger logger = Loggers.getLogger( - BulkWriterCollector.class); + protected final ESLogger logger = ESLoggerFactory.getLogger(this.getClass().getName()); private Client client; private Client transportClient; @@ -168,7 +169,6 @@ private void closeClient() { @Override public void close() throws WriterException { - logger.debug("close()"); try { bulkProcessor.close(); } catch (ElasticSearchException e) { @@ -204,8 +204,64 @@ public WriterResult getResult() { @Override public void collectHit(SearchHit hit) throws IOException { mappedFields.hit(hit); - IndexRequest req = mappedFields.newIndexRequest(); - bulkProcessor.add(req); + IndexRequest indexRequest = mappedFields.newIndexRequest(); + + // here we hook scripts + if(context.scriptString()!=null){ + Map map = indexRequest.sourceAsMap(); + context.executionContext().clear(); + context.executionContext().put("_index", indexRequest.index()); + context.executionContext().put("_type", indexRequest.type()); + context.executionContext().put("_id", indexRequest.id()); + context.executionContext().put("_version", indexRequest.version()); + context.executionContext().put("_source", map); + context.executionContext().put("_routing", indexRequest.routing()); + context.executionContext().put("_parent", indexRequest.parent()); + context.executionContext().put("_timestamp", indexRequest.timestamp()); + context.executionContext().put("_ttl", indexRequest.ttl()); + + try { + context.executableScript().setNextVar("ctx", context.executionContext()); + context.executableScript().run(); + // we need to unwrap the ctx... + context.executionContext().putAll((Map) context.executableScript().unwrap(context.executionContext())); + indexRequest.source(map); + + String operation = (String) context.executionContext().get("op"); + if (!(operation == null || "index".equals(operation))) { + // delete or unknown request - do not index this item + indexRequest = null; + } + Object fetchedTimestamp = context.executionContext().get("_timestamp"); + if (fetchedTimestamp != null) { + if (fetchedTimestamp instanceof String) { + indexRequest.timestamp(String.valueOf(TimeValue.parseTimeValue((String) fetchedTimestamp, null).millis())); + } else { + indexRequest.timestamp(fetchedTimestamp.toString()); + } + } + Object fetchedTTL = context.executionContext().get("_ttl"); + if (fetchedTTL != null) { + Long newTtl = -1L; + if (fetchedTTL instanceof Number) { + newTtl = ((Number) fetchedTTL).longValue(); + + } else { + newTtl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis(); + } + if (newTtl > 0) { + indexRequest.ttl(newTtl); + } + } + } catch (Exception e) { + throw new ElasticSearchIllegalArgumentException("failed to execute script", e); + } + } + + // end of hook + if(indexRequest!=null) { + bulkProcessor.add(indexRequest); + } } } diff --git a/src/main/java/crate/elasticsearch/searchinto/WriterCollector.java b/src/main/java/crate/elasticsearch/searchinto/WriterCollector.java index 61b209c..6c5e5ff 100644 --- a/src/main/java/crate/elasticsearch/searchinto/WriterCollector.java +++ b/src/main/java/crate/elasticsearch/searchinto/WriterCollector.java @@ -7,18 +7,19 @@ import org.apache.lucene.search.Collector; import org.apache.lucene.search.Scorer; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.text.StringAndBytesText; import org.elasticsearch.common.text.Text; -import org.elasticsearch.index.fieldvisitor.CustomFieldsVisitor; -import org.elasticsearch.index.fieldvisitor.FieldsVisitor; -import org.elasticsearch.index.fieldvisitor.JustUidFieldsVisitor; -import org.elasticsearch.index.fieldvisitor.UidAndSourceFieldsVisitor; +import org.elasticsearch.index.fieldvisitor.*; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.FieldMappers; import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.fetch.FetchSubPhase; +import org.elasticsearch.search.fetch.source.FetchSourceContext; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHitField; @@ -29,6 +30,8 @@ public abstract class WriterCollector extends Collector { + protected final ESLogger logger = ESLoggerFactory.getLogger(this.getClass().getName()); + private List extractFieldNames; protected FieldsVisitor fieldsVisitor; protected SearchIntoContext context; @@ -79,7 +82,7 @@ public void collect(int doc) throws IOException { InternalSearchHit searchHit = new InternalSearchHit(doc, fieldsVisitor.uid().id(), typeText, - sourceRequested ? fieldsVisitor.source() : null, searchFields); + searchFields).sourceRef(fieldsVisitor.source()); // it looks like it is safe to reuse the HitContext, // the cache is only used by the highlighter which we do not use. @@ -106,19 +109,21 @@ public WriterCollector(SearchIntoContext context, this.mappedFields = new MappedFields(context); if (!context.hasFieldNames()) { if (context.hasPartialFields()) { - // partial fields need the source, so fetch it, - // but don't return it + // partial fields need the source, so fetch it fieldsVisitor = new UidAndSourceFieldsVisitor(); - } else if (context.hasScriptFields()) { - // we ask for script fields, and no field names, - // don't load the source - fieldsVisitor = new JustUidFieldsVisitor(); } else { - sourceRequested = true; - fieldsVisitor = new UidAndSourceFieldsVisitor(); + // no fields specified, default to return source if no explicit indication + if (!context.hasScriptFields() && !context.hasFetchSourceContext()) { + context.fetchSourceContext(new FetchSourceContext(true)); + } + fieldsVisitor = context.sourceRequested() ? new UidAndSourceFieldsVisitor() : new JustUidFieldsVisitor(); } } else if (context.fieldNames().isEmpty()) { - fieldsVisitor = new JustUidFieldsVisitor(); + if (context.sourceRequested()) { + fieldsVisitor = new UidAndSourceFieldsVisitor(); + } else { + fieldsVisitor = new JustUidFieldsVisitor(); + } } else { boolean loadAllStored = false; Set fieldNames = null; @@ -128,7 +133,11 @@ public WriterCollector(SearchIntoContext context, continue; } if (fieldName.equals(SourceFieldMapper.NAME)) { - sourceRequested = true; + if (context.hasFetchSourceContext()) { + context.fetchSourceContext().fetchSource(true); + } else { + context.fetchSourceContext(new FetchSourceContext(true)); + } continue; } FieldMappers x = context.smartNameFieldMappers(fieldName); @@ -145,21 +154,11 @@ public WriterCollector(SearchIntoContext context, } } if (loadAllStored) { - if (sourceRequested || extractFieldNames != null) { - fieldsVisitor = new CustomFieldsVisitor(true, - true); // load - // everything, - // including - // _source - } else { - fieldsVisitor = new CustomFieldsVisitor(true, false); - } + fieldsVisitor = new AllFieldsVisitor(); // load everything, including _source } else if (fieldNames != null) { - boolean loadSource = extractFieldNames != null || - sourceRequested; - fieldsVisitor = new CustomFieldsVisitor(fieldNames, - loadSource); - } else if (extractFieldNames != null || sourceRequested) { + boolean loadSource = extractFieldNames != null || context.sourceRequested(); + fieldsVisitor = new CustomFieldsVisitor(fieldNames, loadSource); + } else if (extractFieldNames != null || context.sourceRequested()) { fieldsVisitor = new UidAndSourceFieldsVisitor(); } else { fieldsVisitor = new JustUidFieldsVisitor(); diff --git a/src/test/java/crate/elasticsearch/module/export/test/RestExportActionTest.java b/src/test/java/crate/elasticsearch/module/export/test/RestExportActionTest.java index 3a59151..5cac202 100644 --- a/src/test/java/crate/elasticsearch/module/export/test/RestExportActionTest.java +++ b/src/test/java/crate/elasticsearch/module/export/test/RestExportActionTest.java @@ -231,17 +231,40 @@ public void testOutputFileAndOutputCommand() { */ @Test public void testForceOverwrite() { - String filename = "/tmp/filename.export"; - ExportResponse response = executeExportRequest("{\"output_file\": \"" + filename + - "\", \"fields\": [\"name\"], \"force_overwrite\": \"true\"}"); + String filename = "/tmp/filename-" + System.currentTimeMillis() + "-${index}-${shard}.export"; + int lineCount= 0; + { + ExportResponse response = executeExportRequest("{\"output_file\": \"" + filename + + "\", \"fields\": [\"name\"], \"force_overwrite\": \"false\"}"); + + List> infos = getExports(response); + assertEquals(2, infos.size()); + String out1 = infos.get(0).get("output_file").toString(); + String out2 = infos.get(1).get("output_file").toString(); + List lines = readLines(out1); + lines.addAll(readLines(out2)); + assertTrue(lines.size() > 0); + lineCount = lines.size(); + } + { + ExportResponse response = executeExportRequest("{\"output_file\": \"" + filename + + "\", \"fields\": [\"name\"], \"force_overwrite\": \"false\"}"); - List> infos = getExports(response); - assertEquals(2, infos.size()); - assertEquals("/tmp/filename.export", infos.get(0).get("output_file").toString()); - assertEquals("/tmp/filename.export", infos.get(1).get("output_file").toString()); - List lines = readLines(filename); - assertEquals(2, lines.size()); - assertEquals("{\"name\":\"bike\"}", lines.get(0)); + List> infos = getExports(response); + assertEquals(0, infos.size()); + } + { + ExportResponse response = executeExportRequest("{\"output_file\": \"" + filename + + "\", \"fields\": [\"name\"], \"force_overwrite\": \"true\"}"); + + List> infos = getExports(response); + assertEquals(2, infos.size()); + String out1 = infos.get(0).get("output_file").toString(); + String out2 = infos.get(1).get("output_file").toString(); + List lines = readLines(out1); + lines.addAll(readLines(out2)); + assertEquals(lineCount, lines.size()); + } } /** diff --git a/src/test/java/crate/elasticsearch/module/import_/test/RestImportActionTest.java b/src/test/java/crate/elasticsearch/module/import_/test/RestImportActionTest.java index 326c56a..c48889a 100644 --- a/src/test/java/crate/elasticsearch/module/import_/test/RestImportActionTest.java +++ b/src/test/java/crate/elasticsearch/module/import_/test/RestImportActionTest.java @@ -19,8 +19,12 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; import org.junit.Test; + import crate.elasticsearch.action.export.ExportAction; import crate.elasticsearch.action.export.ExportRequest; import crate.elasticsearch.action.export.ExportResponse; @@ -31,6 +35,9 @@ public class RestImportActionTest extends AbstractRestActionTest { + //private final static Logger logger = Logger.getLogger(RestImportActionTest.class); + protected final ESLogger logger = ESLoggerFactory.getLogger(this.getClass().getName()); + /** * An import directory must be specified in the post data of the request, otherwise * an 'No directory defined' exception is delivered in the output. @@ -83,6 +90,99 @@ public void testImportWithoutIndexOrType() { assertFalse(existsWithField("205", "name", "205")); } + /** + * Test import using a script tag to modify field + */ + @Test + public void testImportWithScriptElementModifyingField() { + String path = getClass().getResource("/importdata/import_2").getPath(); + + ImportResponse response = executeImportRequest("test","d","{\"directory\": \"" + path + "\", \"script\": \"ctx._source.name += ' scripted'; \"}"); + List> imports = getImports(response); + Map nodeInfo = imports.get(0); + assertTrue(nodeInfo.get("imported_files").toString().matches( + "\\[\\{file_name=(.*)/importdata/import_2/import_2.json, successes=4, failures=0\\}\\]")); + assertTrue(existsWithField("202", "name", "202 scripted")); + assertTrue(existsWithField("203", "name", "203 scripted")); + assertTrue(existsWithField("204", "name", "204 scripted")); + assertTrue(existsWithField("205", "name", "205 scripted")); + } + + /** + * Test import using a script tag to add field + */ + @Test + public void testImportWithScriptElementAddingField() { + String path = getClass().getResource("/importdata/import_2").getPath(); + ImportResponse response = executeImportRequest("test","d", "{\"directory\": \"" + path + "\", \"script\": \"ctx._source.name2 = ctx._source.name + ' scripted'; \"}"); + List> imports = getImports(response); + Map nodeInfo = imports.get(0); + assertTrue(nodeInfo.get("imported_files").toString().matches( + "\\[\\{file_name=(.*)/importdata/import_2/import_2.json, successes=4, failures=0\\}\\]")); + assertTrue(existsWithField("202", "name", "202")); + assertTrue(existsWithField("203", "name", "203")); + assertTrue(existsWithField("204", "name", "204")); + assertTrue(existsWithField("205", "name", "205")); + assertTrue(existsWithField("202", "name2", "202 scripted")); + assertTrue(existsWithField("203", "name2", "203 scripted")); + assertTrue(existsWithField("204", "name2", "204 scripted")); + assertTrue(existsWithField("205", "name2", "205 scripted")); + } + + /** + * Test import using a script tag to modify timestamp and ttl + */ + @Test + public void testImportWithScriptElementModifyingTimestampAndTtl() { + esSetup.execute(deleteAll(), createIndex("test").withSettings( + fromClassPath("essetup/settings/test_a.json")).withMapping("d", + "{\"d\": {\"_timestamp\": {\"enabled\": true, \"store\": \"yes\"}}}")); + + long ts = System.currentTimeMillis(); + long ttl = 60*60*1000; + long tenSecs = 10*1000; + + String path = getClass().getResource("/importdata/import_4").getPath(); + ImportResponse response = executeImportRequest("{\"directory\": \"" + path + "\", \"script\": \"ctx._timestamp = "+ts+"L; ctx._ttl = '60m'; \"}"); + List> imports = getImports(response); + assertEquals(1, imports.size()); + Map nodeInfo = imports.get(0); + assertNotNull(nodeInfo.get("node_id")); + assertTrue(Long.valueOf(nodeInfo.get("took").toString()) > 0); + assertTrue(nodeInfo.get("imported_files").toString().matches( + "\\[\\{file_name=(.*)/importdata/import_4/import_4.json, successes=2, failures=0, invalidated=1}]")); + + GetRequestBuilder rb = new GetRequestBuilder(esSetup.client(), "test"); + + GetResponse res = rb.setType("d").setId("402").setFields("_ttl", "_timestamp").execute().actionGet(); + assertEquals(ts, res.getField("_timestamp").getValue()); + assertTrue(ttl > ((Number)res.getField("_ttl").getValue()).longValue()); + assertTrue(ttl - ((Number)res.getField("_ttl").getValue()).longValue() < tenSecs); + + res = rb.setType("d").setId("403").setFields("_ttl", "_timestamp").execute().actionGet(); + assertEquals(ts, res.getField("_timestamp").getValue()); + assertTrue(ttl > ((Number)res.getField("_ttl").getValue()).longValue()); + assertTrue(ttl - ((Number)res.getField("_ttl").getValue()).longValue() < tenSecs); + } + + + /** + * Test import using a script tag to delete a record + */ + @Test + public void testImportWithScriptElementDeletingRecord() { + String path = getClass().getResource("/importdata/import_2").getPath(); + ImportResponse response = executeImportRequest("test","d","{\"directory\": \"" + path + "\", \"script\": \"if (ctx._id == '204') ctx.op = 'delete'; \"}"); + List> imports = getImports(response); + Map nodeInfo = imports.get(0); + assertTrue(nodeInfo.get("imported_files").toString().matches( + "\\[\\{file_name=(.*)/importdata/import_2/import_2.json, successes=3, failures=0, deletes=1\\}\\]")); + assertTrue(existsWithField("202", "name", "202")); + assertTrue(existsWithField("203", "name", "203")); + assertFalse(exists("204")); + assertTrue(existsWithField("205", "name", "205")); + } + /** * If the index and/or type are given in the URI, all objects are imported * into the given index/type. @@ -90,11 +190,8 @@ public void testImportWithoutIndexOrType() { @Test public void testImportIntoIndexAndType() { String path = getClass().getResource("/importdata/import_2").getPath(); - ImportRequest request = new ImportRequest(); - request.index("another_index"); - request.type("e"); - request.source("{\"directory\": \"" + path + "\"}"); - ImportResponse response = esSetup.client().execute(ImportAction.INSTANCE, request).actionGet(); + + ImportResponse response = executeImportRequest("another_index", "e", "{\"directory\": \"" + path + "\"}"); List> imports = getImports(response); Map nodeInfo = imports.get(0); @@ -326,8 +423,8 @@ public void testMappings() { executeImportRequest("{\"directory\": \"" + path + "\", \"mappings\": true}"); ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest().filteredIndices("index1"); - ImmutableMap mappings = ImmutableMap.copyOf( - esSetup.client().admin().cluster().state(clusterStateRequest).actionGet().getState().metaData().index("index1").getMappings()); + ImmutableOpenMap mappings = + esSetup.client().admin().cluster().state(clusterStateRequest).actionGet().getState().metaData().index("index1").getMappings(); assertEquals("{\"1\":{\"_timestamp\":{\"enabled\":true,\"store\":true},\"_ttl\":{\"enabled\":true,\"default\":86400000},\"properties\":{\"name\":{\"type\":\"string\",\"store\":true}}}}", mappings.get("1").source().toString()); } @@ -371,6 +468,16 @@ private boolean existsWithField(String id, String field, String value, String in return res.isExists() && res.getSourceAsMap().get(field).equals(value); } + private boolean exists(String id) { + return exists(id, "test", "d"); + } + + private boolean exists(String id, String index, String type) { + GetRequestBuilder rb = new GetRequestBuilder(esSetup.client(), index); + GetResponse res = rb.setType(type).setId(id).execute().actionGet(); + return res.isExists(); + } + private static List> getImports(ImportResponse resp) { return get(resp, "imports"); } @@ -396,4 +503,13 @@ private ImportResponse executeImportRequest(String source) { return esSetup.client().execute(ImportAction.INSTANCE, request).actionGet(); } + private ImportResponse executeImportRequest(String index, String type, String source) { + ImportRequest request = new ImportRequest(); + request.index(index); + request.type(type); + request.source(source); + return esSetup.client().execute(ImportAction.INSTANCE, request).actionGet(); + } + + } diff --git a/src/test/java/crate/elasticsearch/module/searchinto/test/RestSearchIntoActionTest.java b/src/test/java/crate/elasticsearch/module/searchinto/test/RestSearchIntoActionTest.java index 1d0ffb3..016a9b8 100644 --- a/src/test/java/crate/elasticsearch/module/searchinto/test/RestSearchIntoActionTest.java +++ b/src/test/java/crate/elasticsearch/module/searchinto/test/RestSearchIntoActionTest.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.get.GetResponse; import org.junit.Test; +import crate.elasticsearch.action.import_.ImportResponse; import crate.elasticsearch.action.searchinto.SearchIntoAction; import crate.elasticsearch.action.searchinto.SearchIntoRequest; import crate.elasticsearch.action.searchinto.SearchIntoResponse; @@ -18,7 +19,58 @@ public class RestSearchIntoActionTest extends AbstractRestActionTest { - @Test + @Test + public void testSearchIntoWithScriptElementModifyingField() { + esSetup.execute(createIndex("test").withMapping("a", + "{\"a\":{\"_source\": {\"enabled\": true}}}")); + esSetup.execute(index("test", "a", "1").withSource("{\"name\": \"John\"}")); + SearchIntoRequest request = new SearchIntoRequest("test"); + request.source("{\"fields\": [\"_id\", \"_source\", [\"_index\", \"'newindex'\"]]" + ", \"script\": \"ctx._source.name += ' scripted'\"}"); + SearchIntoResponse res = esSetup.client().execute(SearchIntoAction.INSTANCE, request).actionGet(); + assertEquals(1, res.getSuccessfulShards()); + List> writes = getWrites(res); + assertEquals(1, writes.size()); + GetRequestBuilder rb = new GetRequestBuilder(esSetup.client(), "newindex"); + GetResponse gr = rb.setType("a").setId("1").setFields("name").execute().actionGet(); + assertEquals("John scripted", gr.getField("name").getValue()); + } + + @Test + public void testSearchIntoWithScriptElementAddingField() { + esSetup.execute(createIndex("test").withMapping("a", + "{\"a\":{\"_source\": {\"enabled\": true}}}")); + esSetup.execute(index("test", "a", "1").withSource("{\"name\": \"John\"}")); + SearchIntoRequest request = new SearchIntoRequest("test"); + request.source("{\"fields\": [\"_id\", \"_source\", [\"_index\", \"'newindex'\"]]" + ", \"script\": \"ctx._source.name1 = ctx._source.name + ' scripted'\"}"); + SearchIntoResponse res = esSetup.client().execute(SearchIntoAction.INSTANCE, request).actionGet(); + assertEquals(1, res.getSuccessfulShards()); + List> writes = getWrites(res); + assertEquals(1, writes.size()); + GetRequestBuilder rb = new GetRequestBuilder(esSetup.client(), "newindex"); + GetResponse gr = rb.setType("a").setId("1").setFields("name1").execute().actionGet(); + assertEquals("John scripted", gr.getField("name1").getValue()); + } + + @Test + public void testSearchIntoWithScriptElementDeletingRecord() { + esSetup.execute(createIndex("test").withMapping("a", + "{\"a\":{\"_source\": {\"enabled\": true}}}")); + esSetup.execute(createIndex("newindex").withMapping("a", + "{\"a\":{\"_source\": {\"enabled\": true}}}")); + esSetup.execute(index("test", "a", "1").withSource("{\"name\": \"John\"}")); + SearchIntoRequest request = new SearchIntoRequest("test"); + request.source("{\"fields\": [\"_id\", \"_source\", [\"_index\", \"'newindex'\"]]" + ", \"script\": \"if (ctx._id == '1') ctx.op = 'delete'; \"}"); + SearchIntoResponse res = esSetup.client().execute(SearchIntoAction.INSTANCE, request).actionGet(); + assertEquals(1, res.getSuccessfulShards()); + List> writes = getWrites(res); + assertEquals(1, writes.size()); + GetRequestBuilder rb = new GetRequestBuilder(esSetup.client(), "newindex"); + GetResponse gr = rb.setType("a").setId("1").setFields("name").execute().actionGet(); + assertFalse(gr.isExists()); + } + + + @Test public void testSearchIntoWithoutSource() { esSetup.execute(createIndex("test").withMapping("a", "{\"a\":{\"_source\": {\"enabled\": false}}}")); @@ -29,6 +81,7 @@ public void testSearchIntoWithoutSource() { assertEquals(1, res.getFailedShards()); assertTrue(res.getShardFailures()[0].reason().contains("Parse Failure [The _source field of index test and type a is not stored.]")); } + @Test public void testNestedObjectsRewriting() { @@ -36,7 +89,6 @@ public void testNestedObjectsRewriting() { SearchIntoRequest request = new SearchIntoRequest("test"); request.source("{\"fields\": [\"_id\", [\"x.city\", \"_source.city\"], [\"x.surname\", \"_source.name.surname\"], [\"x.name\", \"_source.name.name\"], [\"_index\", \"'newindex'\"]]}"); SearchIntoResponse res = esSetup.client().execute(SearchIntoAction.INSTANCE, request).actionGet(); - GetRequestBuilder rb = new GetRequestBuilder(esSetup.client(), "newindex"); GetResponse getRes = rb.setType("a").setId("1").execute().actionGet(); assertTrue(getRes.isExists()); @@ -88,5 +140,11 @@ private static List> get(SearchIntoResponse resp, String key } return (List>) res.get(key); } + + private static List> getWrites(SearchIntoResponse resp) { + return get(resp, "writes"); + } + + } diff --git a/src/test/python/reindex.rst b/src/test/python/reindex.rst index 88159dd..2f44bdb 100644 --- a/src/test/python/reindex.rst +++ b/src/test/python/reindex.rst @@ -54,7 +54,7 @@ be closed first and then reopened:: >>> post("/test/_close", {}) {"ok":true,"acknowledged":true} >>> put("/test/_settings", {"analysis": {"analyzer": {"myan": {"type": "stop", "stopwords": ["nice"]}}}}) - {"ok":true} + {"ok":true,"acknowledged":true} >>> post("/test/_open", {}) {"ok":true,"acknowledged":true} >>> refresh() diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties new file mode 100644 index 0000000..f0c03f5 --- /dev/null +++ b/src/test/resources/log4j.properties @@ -0,0 +1,15 @@ +log4j.rootLogger=error, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%t %d{ABSOLUTE} %5p %c{1}:%L - %m%n + +log4j.category.crate.elasticsearch=info + +log4j.category.org.elasticsearch.node=WARN +log4j.category.org.elasticsearch.plugins=WARN +log4j.category.org.elasticsearch.transport=WARN +log4j.category.org.elasticsearch.cluster=WARN +log4j.category.org.elasticsearch.http=WARN +log4j.category.org.elasticsearch.discovery=WARN +log4j.category.org.elasticsearch.gateway=WARN \ No newline at end of file