From a05292824bf492abfd3f4bcd1e79a9e9c415145d Mon Sep 17 00:00:00 2001 From: pemontto Date: Tue, 21 Sep 2021 14:41:39 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Add=20use=5Fmetadata=20option?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/index.asciidoc | 23 ++++ lib/logstash/outputs/elasticsearch.rb | 23 +++- spec/integration/outputs/index_spec.rb | 2 +- .../integration/outputs/index_version_spec.rb | 27 ++++ .../outputs/ingest_pipeline_spec.rb | 119 +++++++++++++++++- 5 files changed, 188 insertions(+), 6 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 031f2224..0240196e 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -367,6 +367,7 @@ This plugin supports the following configuration options plus the | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>, one of `["internal", "external", "external_gt", "external_gte", "force"]`|No @@ -1095,6 +1096,28 @@ Create a new document with this parameter as json string if `document_id` doesn' Username to authenticate to a secure Elasticsearch cluster +[id="plugins-{type}s-{plugin}-use_metadata"] +===== `use_metadata` + + * Value type is <> + * Default value is `false` + +Use and preference output parameters defined in the document metadata. The <> (`@metadata._index`), <> (`@metadata._id_`), and <> (`@metadata.pipeline`) can be set by their respective `@metadata` fields. + +E.g. to index a document to index `myindex` with id `myid` with the ingest pipeline `mypipeline`: + +[source,json] +----- +{ + "message": "foo", + "@metadata": { + "_index": "myindex", + "_id": "myid", + "pipeline": "mypipeline" + } +} +----- + [id="plugins-{type}s-{plugin}-validate_after_inactivity"] ===== `validate_after_inactivity` diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 25dd866a..eceb3b58 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -75,12 +75,12 @@ # # ==== HTTP Compression # -# This plugin supports request and response compression. Response compression is enabled by default and -# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for -# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in +# This plugin supports request and response compression. Response compression is enabled by default and +# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for +# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in # Elasticsearch[https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http] to take advantage of response compression when using this plugin # -# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression` +# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression` # setting in their Logstash config file. # class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base @@ -251,6 +251,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # ILM policy to use, if undefined the default policy will be used. config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY + # ILM policy to use, if undefined the default policy will be used. + config :use_metadata, :validate => :boolean, :default => false + attr_reader :client attr_reader :default_index attr_reader :default_ilm_rollover_alias @@ -419,6 +422,7 @@ def common_event_params(event) } if @pipeline + logger.debug("Pipeline params BEFORE", params: params) value = event.sprintf(@pipeline) # convention: empty string equates to not using a pipeline # this is useful when using a field reference in the pipeline setting, e.g. @@ -426,6 +430,17 @@ def common_event_params(event) # pipeline => "%{[@metadata][pipeline]}" # } params[:pipeline] = value unless value.empty? + logger.debug("Pipeline params AFTER", params: params) + end + + if @use_metadata + logger.debug("@metadata params BEFORE", params: params) + params[:_id] = event.get("[@metadata][_id]") || params[:_id] + event_index = event.get("[@metadata][_index]") + params[:_index] = event.sprintf(event_index) if event_index && !event_index.empty? + event_pipeline = event.get("[@metadata][pipeline]") + params[:pipeline] = event.sprintf(event_pipeline) if event_pipeline && !event_pipeline.empty? + logger.debug("@metadata params AFTER", params: params) end params diff --git a/spec/integration/outputs/index_spec.rb b/spec/integration/outputs/index_spec.rb index 16dac139..f805dfd1 100644 --- a/spec/integration/outputs/index_spec.rb +++ b/spec/integration/outputs/index_spec.rb @@ -46,7 +46,7 @@ end describe "indexing" do - let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type) } + let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type, "@metadata" => { "_id" => "test-id", "_index" => "test-index", "pipeline" => "test-pipeline" }) } let(:index) { 10.times.collect { rand(10).to_s }.join("") } let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" } let(:event_count) { 1 + rand(2) } diff --git a/spec/integration/outputs/index_version_spec.rb b/spec/integration/outputs/index_version_spec.rb index 0b1ecda9..62ea8fa4 100644 --- a/spec/integration/outputs/index_version_spec.rb +++ b/spec/integration/outputs/index_version_spec.rb @@ -94,5 +94,32 @@ expect(r2["_source"]["message"]).to eq('foo') end end + + describe "use metadata" do + let(:settings) do + { + "index" => "logstash-index", + "hosts" => get_host_port(), + "use_metadata" => true, + } + end + + it "should use @metadata._id for document_id" do + id = "new_doc_id_1" + subject.multi_receive([LogStash::Event.new("@metadata" => { "_id" => id }, "message" => "foo")]) + r = es.get(:index => "logstash-index", :type => doc_type, :id => id, :refresh => true) + expect(r["_id"]).to eq(id) + expect(r["_source"]["message"]).to eq("foo") + end + it "should use @metadata._index for index" do + id = "new_doc_id_2" + new_index = "logstash-index-new" + subject.multi_receive([LogStash::Event.new("@metadata" => { "_id" => id, "_index" => new_index }, "message" => "foo")]) + r = es.get(:index => new_index, :type => doc_type, :id => id, :refresh => true) + expect(r["_id"]).to eq(id) + expect(r["_index"]).to eq(new_index) + expect(r["_source"]["message"]).to eq("foo") + end + end end end diff --git a/spec/integration/outputs/ingest_pipeline_spec.rb b/spec/integration/outputs/ingest_pipeline_spec.rb index c76fdaef..0073478d 100644 --- a/spec/integration/outputs/ingest_pipeline_spec.rb +++ b/spec/integration/outputs/ingest_pipeline_spec.rb @@ -60,7 +60,7 @@ end it "indexes using the proper pipeline" do - results = @es.search(:index => 'logstash-*', :q => "message:\"netcat\"") + results = @es.search(:index => "logstash-*", :q => "message:\"netcat\"") expect(results).to have_hits(1) expect(results["hits"]["hits"][0]["_source"]["response"]).to eq("200") expect(results["hits"]["hits"][0]["_source"]["bytes"]).to eq("182") @@ -72,3 +72,120 @@ expect(results["hits"]["hits"][0]["_source"]["junkfieldaaaa"]).to eq(nil) end end + +describe "Ingest pipeline from metadata", :integration => true do + subject! do + require "logstash/outputs/elasticsearch" + settings = { + "hosts" => "#{get_host_port()}", + "pipeline" => "apache-logs", + "data_stream" => "false", + "use_metadata" => true, + } + next LogStash::Outputs::ElasticSearch.new(settings) + end + + let(:http_client) { Manticore::Client.new } + let(:ingest_url) { "http://#{get_host_port()}/_ingest/pipeline/apache-logs" } + let(:apache_logs_pipeline) { + ' + { + "description" : "Pipeline to parse Apache logs", + "processors" : [ + { + "grok": { + "field": "message", + "patterns": ["%{COMBINEDAPACHELOG}"] + } + } + ] + }' + } + + let(:add_field_ingest_url) { "http://#{get_host_port()}/_ingest/pipeline/add-field" } + let(:add_field_logs_pipeline) { + ' + { + "description": "Add field foo with value bar", + "processors": [ + { + "set": { + "field": "foo", + "value": "bar" + } + } + ] + }' + } + + before :each do + # Delete all templates first. + require "elasticsearch" + + # Clean ES of data before we start. + @es = get_client + @es.indices.delete_template(:name => "*") + + # This can fail if there are no indexes, ignore failure. + @es.indices.delete(:index => "*") rescue nil + + # delete existing ingest pipeline + http_client.delete(ingest_url).call + + # register pipelines + http_client.put(ingest_url, :body => apache_logs_pipeline, :headers => { "Content-Type" => "application/json" }).call + http_client.put(add_field_ingest_url, :body => add_field_logs_pipeline, :headers => { "Content-Type" => "application/json" }).call + + #TODO: Use esclient + #@es.ingest.put_pipeline :id => 'apache_pipeline', :body => pipeline_defintion + + subject.register + subject.multi_receive([ + LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'), + LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"', "@metadata" => { "_id" => "id1", "_index" => "index1", "pipeline" => "add-field" }), + LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"', "@metadata" => { "_id" => "id2", "_index" => "index2", "pipeline" => "" }), + ]) + @es.indices.refresh + + #Wait or fail until everything's indexed. + Stud::try(10.times) do + r = @es.search(index: "logstash-*") + expect(r).to have_hits(1) + r = @es.search(index: "index1") + expect(r).to have_hits(1) + r = @es.search(index: "index2") + expect(r).to have_hits(1) + sleep(0.1) + end + end + + it "indexes using the correct pipeline when @metadata.pipeline not defined" do + results = @es.search(:index => "logstash-*", :q => "message:\"netcat\"") + expect(results).to have_hits(1) + expect(results["hits"]["hits"][0]["_source"]["response"]).to eq("200") + expect(results["hits"]["hits"][0]["_source"]["bytes"]).to eq("182") + expect(results["hits"]["hits"][0]["_source"]["verb"]).to eq("GET") + expect(results["hits"]["hits"][0]["_source"]["request"]).to eq("/scripts/netcat-webserver") + expect(results["hits"]["hits"][0]["_source"]["auth"]).to eq("-") + expect(results["hits"]["hits"][0]["_source"]["ident"]).to eq("-") + expect(results["hits"]["hits"][0]["_source"]["clientip"]).to eq("183.60.215.50") + expect(results["hits"]["hits"][0]["_source"]["junkfieldaaaa"]).to eq(nil) + end + + it "indexes using the @metadata._index, @metadata._id, and @metadata.pipeline when defined" do + results = @es.search(:index => "index1", :q => "message:\"netcat\"") + expect(results).to have_hits(1) + expect(results["hits"]["hits"][0]["_id"]).to eq("id1") + expect(results["hits"]["hits"][0]["_index"]).to eq("index1") + expect(results["hits"]["hits"][0]["_source"]["foo"]).to eq("bar") + end + + it "indexes ignore empty @metadata.pipeline values" do + results = @es.search(:index => "index2", :q => "message:\"netcat\"") + expect(results).to have_hits(1) + expect(results["hits"]["hits"][0]["_id"]).to eq("id2") + expect(results["hits"]["hits"][0]["_index"]).to eq("index2") + expect(results["hits"]["hits"][0]["_source"]).not_to include("foo") + end + +end