Skip to content
This repository has been archived by the owner on Feb 7, 2020. It is now read-only.

1.0.0.beta2 #56

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@
/target/
/sandbox/
/logs/
/bin/
/*.class
95 changes: 81 additions & 14 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ indexes)::
}
'


Exports
=======

Expand Down Expand Up @@ -515,7 +514,7 @@ The JSON response of an import may look like this::
.. hint::

- ``imports``: List of successful imports
- ``node_id'': The node id where the import happened
- ``node_id``: The node id where the import happened
- ``took``: Operation time of all imports on the node in milliseconds
- ``imported_files``: List of imported files in the import directory of the node's file system
- ``file_name``: File name of the handled file
Expand All @@ -532,7 +531,7 @@ Dump
The idea behind dump is to export all relevant data to recreate the
cluster as it was at the time of the dump.

The basic usage of the endpoint is:
The basic usage of the endpoint is::

curl -X POST 'http://localhost:9200/_dump'

Expand Down Expand Up @@ -568,7 +567,7 @@ Restore
=======

Dumped data is intended to get restored. This can be done by the _restore
endpoint:
endpoint::

curl -X POST 'http://localhost:9200/_restore'

Expand Down Expand Up @@ -624,20 +623,88 @@ a given query directly into an index::
"fields": ["_id", "_source", ["_index", "'newindex'"]]
}'

An example can be found in the `Search Into DocTest
<src/test/python/search_into.rst>`_.
An example can be found in the `Search Into DocTest <src/test/python/search_into.rst>`_.



Script Support
==============

Script support has been added to ``_import``, ``_searchinto`` and ``_reindex`` endpoints so that you can modify the documents read from dump files
on fly before indexing.

Here is the full scenario (start ES empty, add 3 documents)::

curl -XPUT 'localhost:9200/twitter/tweet/1' -d '
{
"text" : {
"message" : "you know for search 1"
},
"likes": 1
}'

curl -XPUT 'localhost:9200/twitter/tweet/2' -d '
{
"text" : {
"message" : "you know for search 2"
},
"likes": 2
}'

curl -XPUT 'localhost:9200/twitter/tweet/3' -d '
{
"text" : {
"message" : "you know for search 3"
},
"likes": 3
}'

Verify that you have 3 docs in index and that the likes are: 1, 2 and 3. Export documents onto file system::

curl -X POST 'http://localhost:9200/_export' -d '{
"fields": ["_id", "_source", "_version", "_index", "_type"],
"output_file": "/tmp/es-data/dump-${index}-${shard}.gz",
"compression": "gzip"
}'

You will find number of gz files, equal to number of your shards in ``/tmp/es-data/`` folder. If you open them, you will
see that 3 docs are in 3 files, other 2 files are empty. Now stop ES and delete data folder. Start ES again import the
data with the script that modifies like field::

curl -X POST 'http://localhost:9200/_import' -d '{
"directory": "/tmp/es-data",
"compression": "gzip",
"script" : "ctx._source.likes += 1"
}'

Verify that you have 3 docs in index and that the likes are: 2, 3 and 4.

Test ``_searchinto``::

curl -XPOST "http://localhost:9200/twitter/_search_into" -d'
{
"fields": ["_id", "_source", ["_index", "'twitter-new'"]],
"script" : "ctx._source.likes += 1"
}'

verify that you have 3 documents in twitter_new index with likes: 3,4 and 5

Test ``_reindex``::

curl -XPOST "http://localhost:9200/twitter/_reindex" -d'
{
"script" : "ctx._source.likes += 1"
}'

verify that you have 3 documents in twitter index with likes: 3,4 and 5

Installation
============

* Clone this repo with git clone
[email protected]:crate/elasticsearch-inout-plugin.git
* Checkout the tag (find out via git tag) you want to build with
(possibly master is not for your elasticsearch version)
* Run: mvn clean package -DskipTests=true – this does not run any unit
* Clone this repo with git clone ``[email protected]:bremeld/fork-elasticsearch-inout-plugin.git``
* Checkout the branch or tag that matches your ElasticSearch version
* Run: ``mvn clean package -DskipTests=true`` – this does not run any unit
tests, as they take some time. If you want to run them, better run
mvn clean package
* Install the plugin: /path/to/elasticsearch/bin/plugin -install
elasticsearch-inout-plugin -url
file:///$PWD/target/elasticsearch-inout-plugin-$version.jar
* Install the plugin: ``/path/to/elasticsearch/bin/plugin --install elasticsearch-inout-plugin --url
file:///$PWD/target/elasticsearch-inout-plugin-$version.jar``
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>crate</groupId>
<artifactId>elasticsearch-inout-plugin</artifactId>
<version>0.5.0</version>
<version>1.0.0.Beta2</version>
<packaging>jar</packaging>
<description>An Elasticsearch plugin which provides the ability to export and import data by query on server side.</description>
<scm>
Expand All @@ -14,7 +14,7 @@
<url>https://github.com/crate/elasticsearch-inout-plugin</url>
</scm>
<properties>
<elasticsearch.version>0.90.3</elasticsearch.version>
<elasticsearch.version>1.0.0.Beta2</elasticsearch.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected ShardExportResponse shardOperation(ShardExportRequest request) throws
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());
ExportContext context = new ExportContext(0,
new ShardSearchRequest().types(request.types()).filteringAliases(request.filteringAliases()),
shardTarget, indexShard.searcher(), indexService, indexShard, scriptService, cacheRecycler, nodePath);
shardTarget, indexShard.acquireSearcher("inout-plugin"), indexService, indexShard, scriptService, cacheRecycler, nodePath);
ExportContext.setCurrent(context);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;

Expand All @@ -20,7 +21,7 @@
/**
* Container class for export specific informations.
*/
public class ExportContext extends SearchContext {
public class ExportContext extends DefaultSearchContext {

private static final String VAR_SHARD = "${shard}";
private static final String VAR_INDEX = "${index}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Required;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -60,13 +59,11 @@ public BytesReference source() {
return source;
}

@Required
public ExportRequest source(String source) {
return this.source(new BytesArray(source), false);
}


@Required
public ExportRequest source(BytesReference source, boolean unsafe) {
this.source = source;
this.querySourceUnsafe = unsafe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import crate.elasticsearch.action.import_.parser.IImportParser;
import crate.elasticsearch.import_.Importer;
import crate.elasticsearch.script.ScriptProvider;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
Expand All @@ -11,6 +13,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -24,19 +27,28 @@
public abstract class AbstractTransportImportAction extends TransportNodesOperationAction<ImportRequest, ImportResponse, NodeImportRequest, NodeImportResponse>{

private IImportParser importParser;

private ScriptProvider scriptProvider;

private Importer importer;

private String nodePath = "";

private final ScriptService scriptService;

@Inject
public AbstractTransportImportAction(Settings settings, ClusterName clusterName,
ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IImportParser importParser, Importer importer, NodeEnvironment nodeEnv) {
TransportService transportService,
ScriptService scriptService,
ScriptProvider scriptProvider,
IImportParser importParser,
Importer importer, NodeEnvironment nodeEnv) {
super(settings, clusterName, threadPool, clusterService, transportService);
this.importParser = importParser;
this.scriptProvider = scriptProvider;
this.importer = importer;

this.scriptService=scriptService;
File[] paths = nodeEnv.nodeDataLocations();
if (paths.length > 0) {
nodePath = paths[0].getAbsolutePath();
Expand Down Expand Up @@ -110,9 +122,9 @@ protected NodeImportResponse newNodeResponse() {
protected NodeImportResponse nodeOperation(NodeImportRequest request)
throws ElasticSearchException {
ImportContext context = new ImportContext(nodePath);

BytesReference source = request.source();
importParser.parseSource(context, source);
scriptProvider.prepareContextForScriptExecution(context, scriptService);
Importer.Result result = importer.execute(context, request);
return new NodeImportResponse(clusterService.state().nodes().localNode(), result);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
package crate.elasticsearch.action.import_;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

public class ImportContext {
import org.elasticsearch.script.ExecutableScript;

import crate.elasticsearch.script.IScriptContext;

public class ImportContext implements IScriptContext{

private String nodePath;
private boolean compression;
private String directory;
private Pattern file_pattern;
private boolean mappings = false;
private boolean settings = false;

public ImportContext(String nodePath) {
private String scriptString;
private String scriptLang;
private Map<String, Object> scriptParams;
private Map<String, Object> executionContext;
private ExecutableScript executableScript;

public ImportContext(String nodePath) {
super();
this.nodePath = nodePath;
this.executionContext = new HashMap<String, Object>();
}

public boolean compression() {
public boolean compression() {
return compression;
}

Expand Down Expand Up @@ -60,4 +73,59 @@ public boolean settings() {
public void settings(boolean settings) {
this.settings = settings;
}



@Override
public String scriptString() {
return scriptString;
}

@Override
public void scriptString(String scriptString) {
this.scriptString = scriptString;
}

@Override
public String scriptLang() {
return scriptLang;
}

@Override
public void scriptLang(String scriptLang) {
this.scriptLang = scriptLang;
}

@Override
public Map<String, Object> scriptParams() {
return scriptParams;
}

@Override
public void scriptParams(Map<String, Object> scriptParams) {
this.scriptParams = scriptParams;
}


@Override
public void executableScript(ExecutableScript executableScript) {
this.executableScript = executableScript;
}


@Override
public void executionContext(Map<String, Object> executionContext) {
this.executionContext = executionContext;
}

@Override
public Map<String, Object> executionContext() {
return executionContext;
}

@Override
public ExecutableScript executableScript() {
return executableScript;
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package crate.elasticsearch.action.import_;

import org.elasticsearch.action.support.nodes.NodesOperationRequest;
import org.elasticsearch.common.Required;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -32,12 +31,10 @@ public BytesReference source() {
return source;
}

@Required
public ImportRequest source(String source) {
return this.source(new BytesArray(source), false);
}

@Required
public ImportRequest source(BytesReference source, boolean unsafe) {
this.source = source;
return this;
Expand Down
Loading