Skip to content
This repository has been archived by the owner on Feb 7, 2020. It is now read-only.

Scripting support - modify documents during reindex, query into, import #52

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@
/target/
/sandbox/
/logs/
/bin/
/*.class
77 changes: 76 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ indexes)::
}
'


Exports
=======

Expand Down Expand Up @@ -628,6 +627,82 @@ An example can be found in the `Search Into DocTest
<src/test/python/search_into.rst>`_.



Script Support
==============

Script support has been added to _import, _searchinto and _reindex endpoints so that you can modify the documents read from dump files
on fly before indexing.
Here is the full scenario:
- start ES with no index
- add 3 documents:

curl -XPUT 'localhost:9200/twitter/tweet/1' -d '
{
"text" : {
"message" : "you know for search 1"
},
"likes": 1
}'

curl -XPUT 'localhost:9200/twitter/tweet/2' -d '
{
"text" : {
"message" : "you know for search 2"
},
"likes": 2
}'

curl -XPUT 'localhost:9200/twitter/tweet/3' -d '
{
"text" : {
"message" : "you know for search 3"
},
"likes": 3
}'

- Verify that you have 3 docs in index and that the likes are: 1, 2 and 3.
- Export documents onto FS:

curl -X POST 'http://localhost:9200/_export' -d '{
"fields": ["_id", "_source", "_version", "_index", "_type"],
"output_file": "/tmp/es-data/dump-${index}-${shard}.gz",
"compression": "gzip"
}
'

- You will find number of gz files, equal to number of your shards in /tmp/es-data/ folder. If you open them, you will
see that 3 docs are in 3 files, other 2 files are empty.

- now stop ES and delete data folder.

- start ES again import the data with the script that modifies like field.

curl -X POST 'http://localhost:9200/_import' -d '{
"directory": "/tmp/es-data",
"compression": "gzip",
"script" : "ctx._source.likes += 1"
}
'
- Verify that you have 3 docs in index and that the likes are: 2, 3 and 4.

- Test _searchinto
curl -XPOST "http://localhost:9200/twitter/_search_into" -d'
{
"fields": ["_id", "_source", ["_index", "'twitter-new'"]],
"script" : "ctx._source.likes += 1"
}'

verify that you have 3 documents in twitter_new index with likes: 3,4 and 5

- Test _reindex
curl -XPOST "http://localhost:9200/twitter/_reindex" -d'
{
"script" : "ctx._source.likes += 1"
}'

verify that you have 3 documents in twitter index with likes: 3,4 and 5

Installation
============

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>crate</groupId>
<artifactId>elasticsearch-inout-plugin</artifactId>
<version>0.5.0</version>
<version>0.6.0</version>
<packaging>jar</packaging>
<description>An Elasticsearch plugin which provides the ability to export and import data by query on server side.</description>
<scm>
Expand All @@ -14,7 +14,7 @@
<url>https://github.com/crate/elasticsearch-inout-plugin</url>
</scm>
<properties>
<elasticsearch.version>0.90.3</elasticsearch.version>
<elasticsearch.version>1.0.0.beta1</elasticsearch.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected ShardExportResponse shardOperation(ShardExportRequest request) throws
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());
ExportContext context = new ExportContext(0,
new ShardSearchRequest().types(request.types()).filteringAliases(request.filteringAliases()),
shardTarget, indexShard.searcher(), indexService, indexShard, scriptService, cacheRecycler, nodePath);
shardTarget, indexShard.acquireSearcher("inout-plugin"), indexService, indexShard, scriptService, cacheRecycler, nodePath);
ExportContext.setCurrent(context);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;

Expand All @@ -20,7 +21,7 @@
/**
* Container class for export specific informations.
*/
public class ExportContext extends SearchContext {
public class ExportContext extends DefaultSearchContext {

private static final String VAR_SHARD = "${shard}";
private static final String VAR_INDEX = "${index}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Required;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -60,13 +59,11 @@ public BytesReference source() {
return source;
}

@Required
public ExportRequest source(String source) {
return this.source(new BytesArray(source), false);
}


@Required
public ExportRequest source(BytesReference source, boolean unsafe) {
this.source = source;
this.querySourceUnsafe = unsafe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import crate.elasticsearch.action.import_.parser.IImportParser;
import crate.elasticsearch.import_.Importer;
import crate.elasticsearch.script.ScriptProvider;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
Expand All @@ -11,6 +13,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -24,19 +27,28 @@
public abstract class AbstractTransportImportAction extends TransportNodesOperationAction<ImportRequest, ImportResponse, NodeImportRequest, NodeImportResponse>{

private IImportParser importParser;

private ScriptProvider scriptProvider;

private Importer importer;

private String nodePath = "";

private final ScriptService scriptService;

@Inject
public AbstractTransportImportAction(Settings settings, ClusterName clusterName,
ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IImportParser importParser, Importer importer, NodeEnvironment nodeEnv) {
TransportService transportService,
ScriptService scriptService,
ScriptProvider scriptProvider,
IImportParser importParser,
Importer importer, NodeEnvironment nodeEnv) {
super(settings, clusterName, threadPool, clusterService, transportService);
this.importParser = importParser;
this.scriptProvider = scriptProvider;
this.importer = importer;

this.scriptService=scriptService;
File[] paths = nodeEnv.nodeDataLocations();
if (paths.length > 0) {
nodePath = paths[0].getAbsolutePath();
Expand Down Expand Up @@ -110,9 +122,9 @@ protected NodeImportResponse newNodeResponse() {
protected NodeImportResponse nodeOperation(NodeImportRequest request)
throws ElasticSearchException {
ImportContext context = new ImportContext(nodePath);

BytesReference source = request.source();
importParser.parseSource(context, source);
scriptProvider.prepareContextForScriptExecution(context, scriptService);
Importer.Result result = importer.execute(context, request);
return new NodeImportResponse(clusterService.state().nodes().localNode(), result);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
package crate.elasticsearch.action.import_;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

public class ImportContext {
import org.elasticsearch.script.ExecutableScript;

import crate.elasticsearch.script.IScriptContext;

public class ImportContext implements IScriptContext{

private String nodePath;
private boolean compression;
private String directory;
private Pattern file_pattern;
private boolean mappings = false;
private boolean settings = false;

public ImportContext(String nodePath) {
private String scriptString;
private String scriptLang;
private Map<String, Object> scriptParams;
private Map<String, Object> executionContext;
private ExecutableScript executableScript;

public ImportContext(String nodePath) {
super();
this.nodePath = nodePath;
this.executionContext = new HashMap<String, Object>();
}

public boolean compression() {
public boolean compression() {
return compression;
}

Expand Down Expand Up @@ -60,4 +73,59 @@ public boolean settings() {
public void settings(boolean settings) {
this.settings = settings;
}



@Override
public String scriptString() {
return scriptString;
}

@Override
public void scriptString(String scriptString) {
this.scriptString = scriptString;
}

@Override
public String scriptLang() {
return scriptLang;
}

@Override
public void scriptLang(String scriptLang) {
this.scriptLang = scriptLang;
}

@Override
public Map<String, Object> scriptParams() {
return scriptParams;
}

@Override
public void scriptParams(Map<String, Object> scriptParams) {
this.scriptParams = scriptParams;
}


@Override
public void executableScript(ExecutableScript executableScript) {
this.executableScript = executableScript;
}


@Override
public void executionContext(Map<String, Object> executionContext) {
this.executionContext = executionContext;
}

@Override
public Map<String, Object> executionContext() {
return executionContext;
}

@Override
public ExecutableScript executableScript() {
return executableScript;
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package crate.elasticsearch.action.import_;

import org.elasticsearch.action.support.nodes.NodesOperationRequest;
import org.elasticsearch.common.Required;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -32,12 +31,10 @@ public BytesReference source() {
return source;
}

@Required
public ImportRequest source(String source) {
return this.source(new BytesArray(source), false);
}

@Required
public ImportRequest source(BytesReference source, boolean unsafe) {
this.source = source;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params)
if (counts.invalid > 0) {
builder.field(Fields.INVALIDATED, counts.invalid);
}
if (counts.deletes > 0) {
builder.field(Fields.DELETES, counts.deletes);
}
builder.endObject();
}
builder.endArray();
Expand All @@ -58,6 +61,7 @@ public void readFrom(StreamInput in) throws IOException {
counts.successes = in.readInt();
counts.failures = in.readInt();
counts.invalid = in.readInt();
counts.deletes = in.readInt();
result.importCounts.add(counts);
}
}
Expand All @@ -72,6 +76,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(counts.successes);
out.writeInt(counts.failures);
out.writeInt(counts.invalid);
out.writeInt(counts.deletes);
}
}

Expand All @@ -89,5 +94,6 @@ static final class Fields {
static final XContentBuilderString SUCCESSES = new XContentBuilderString("successes");
static final XContentBuilderString FAILURES = new XContentBuilderString("failures");
static final XContentBuilderString INVALIDATED = new XContentBuilderString("invalidated");
static final XContentBuilderString DELETES = new XContentBuilderString("deletes");
}
}
Loading