Skip to content

Commit

Permalink
Add ability to customize bulk batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
bmax committed Apr 26, 2020
1 parent d5f1782 commit caaaf9a
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 65 deletions.
23 changes: 19 additions & 4 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@
#
# ==== HTTP Compression
#
# This plugin supports request and response compression. Response compression is enabled by default and
# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for
# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in
# This plugin supports request and response compression. Response compression is enabled by default and
# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for
# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in
# Elasticsearch[https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http] to take advantage of response compression when using this plugin
#
# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression`
# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression`
# setting in their Logstash config file.
#
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
Expand All @@ -103,6 +103,8 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base

config_name "elasticsearch"

DEFAULT_BATCH_SIZE = 20 * 1024 * 1024 # 20MiB

# The Elasticsearch action to perform. Valid actions are:
#
# - index: indexes a document (an event from Logstash).
Expand Down Expand Up @@ -242,6 +244,19 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# Custom Headers to send on each request to elasticsearch nodes
config :custom_headers, :validate => :hash, :default => {}

# Bulk batch size is used to determine at what point to send the bulk requests.
# The criteria used for default value is:
# 1. We need a number that's less than 100MiB because ES
# won't accept bulks larger than that.
# 2. It must be large enough to amortize the connection constant
# across multiple requests.
# 3. It must be small enough that even if multiple threads hit this size
# we won't use a lot of heap.
#
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
# made sense. We picked one on the lowish side to not use too much heap.
config :bulk_batch_size, :validate => :number, :default => DEFAULT_BATCH_SIZE

# @override to handle proxy => '' as if none was set
def config_init(params)
proxy = params['proxy']
Expand Down
7 changes: 3 additions & 4 deletions lib/logstash/outputs/elasticsearch/common_configs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def self.included(mod)
# Joda formats are defined http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html[here].
mod.config :index, :validate => :string, :default => DEFAULT_INDEX_NAME

mod.config :document_type,
:validate => :string,
mod.config :document_type,
:validate => :string,
:deprecated => "Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature"

# From Logstash 1.3 onwards, a template is applied to Elasticsearch during
Expand Down Expand Up @@ -69,7 +69,7 @@ def self.included(mod)
# The version to use for indexing. Use sprintf syntax like `%{my_version}` to use a field value here.
# See https://www.elastic.co/blog/elasticsearch-versioning-support.
mod.config :version, :validate => :string

# The version_type to use for indexing.
# See https://www.elastic.co/blog/elasticsearch-versioning-support.
# See also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types
Expand Down Expand Up @@ -145,7 +145,6 @@ def self.included(mod)
# here like `pipeline => "%{INGEST_PIPELINE}"`
mod.config :pipeline, :validate => :string, :default => nil


# -----
# ILM configurations (beta)
# -----
Expand Down
30 changes: 8 additions & 22 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,6 @@
require 'stringio'

module LogStash; module Outputs; class ElasticSearch;
# This is a constant instead of a config option because
# there really isn't a good reason to configure it.
#
# The criteria used are:
# 1. We need a number that's less than 100MiB because ES
# won't accept bulks larger than that.
# 2. It must be large enough to amortize the connection constant
# across multiple requests.
# 3. It must be small enough that even if multiple threads hit this size
# we won't use a lot of heap.
#
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
# made sense. We picked one on the lowish side to not use too much heap.
TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB

class HttpClient
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count
# This is here in case we use DEFAULT_OPTIONS in the future
Expand Down Expand Up @@ -52,6 +37,7 @@ class HttpClient
def initialize(options={})
@logger = options[:logger]
@metric = options[:metric]
@bulk_batch_size = options[:bulk_batch_size]
@bulk_request_metrics = @metric.namespace(:bulk_requests)
@bulk_response_metrics = @bulk_request_metrics.namespace(:responses)

Expand Down Expand Up @@ -110,7 +96,7 @@ def bulk(actions)
if http_compression
body_stream.set_encoding "BINARY"
stream_writer = Zlib::GzipWriter.new(body_stream, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
else
else
stream_writer = body_stream
end
bulk_responses = []
Expand All @@ -119,7 +105,7 @@ def bulk(actions)
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
LogStash::Json.dump(action)
as_json << "\n"
if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES
if (body_stream.size + as_json.bytesize) > @bulk_batch_size
bulk_responses << bulk_send(body_stream) unless body_stream.size == 0
end
stream_writer.write(as_json)
Expand Down Expand Up @@ -215,7 +201,7 @@ def scheme
else
nil
end

calculated_scheme = calculate_property(uris, :scheme, explicit_scheme, sniffing)

if calculated_scheme && calculated_scheme !~ /https?/
Expand All @@ -235,7 +221,7 @@ def port
# Enter things like foo:123, bar and wind up with foo:123, bar:9200
calculate_property(uris, :port, nil, sniffing) || 9200
end

def uris
@options[:hosts]
end
Expand All @@ -254,7 +240,7 @@ def http_compression

def build_adapter(options)
timeout = options[:timeout] || 0

adapter_options = {
:socket_timeout => timeout,
:request_timeout => timeout,
Expand All @@ -281,7 +267,7 @@ def build_adapter(options)
adapter_class = ::LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter
adapter = adapter_class.new(@logger, adapter_options)
end

def build_pool(options)
adapter = build_adapter(options)

Expand Down Expand Up @@ -331,7 +317,7 @@ def host_to_url(h)
h.query
end
prefixed_raw_query = raw_query && !raw_query.empty? ? "?#{raw_query}" : nil

raw_url = "#{raw_scheme}://#{postfixed_userinfo}#{raw_host}:#{raw_port}#{prefixed_raw_path}#{prefixed_raw_query}"

::LogStash::Util::SafeURI.new(raw_url)
Expand Down
11 changes: 6 additions & 5 deletions lib/logstash/outputs/elasticsearch/http_client_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ def self.build(logger, hosts, params)
:http_compression => params["http_compression"],
:headers => params["custom_headers"] || {}
}

client_settings[:proxy] = params["proxy"] if params["proxy"]

common_options = {
:client_settings => client_settings,
:metric => params["metric"],
:resurrect_delay => params["resurrect_delay"]
:resurrect_delay => params["resurrect_delay"],
:bulk_batch_size => params["bulk_batch_size"]
}

if params["sniffing"]
Expand Down Expand Up @@ -65,7 +66,7 @@ def self.build(logger, hosts, params)
LogStash::ConfigurationError,
"External versioning requires the presence of a version number."
) if external_version_types.include?(params.fetch('version_type', '')) and params.fetch("version", nil) == nil


# Create API setup
raise(
Expand Down Expand Up @@ -144,7 +145,7 @@ def self.setup_ssl(logger, params)

def self.setup_basic_auth(logger, params)
user, password = params["user"], params["password"]

return {} unless user && password && password.value

{
Expand Down
63 changes: 48 additions & 15 deletions spec/integration/outputs/index_spec.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
require_relative "../../../spec/es_spec_helper"
require "logstash/outputs/elasticsearch"

describe "TARGET_BULK_BYTES", :integration => true do
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
describe "BATCH_BULK_SIZE", :integration => true do
let(:batch_bulk_size) { LogStash::Outputs::ElasticSearch::DEFAULT_BATCH_SIZE }
let(:event_count) { 1000 }
let(:events) { event_count.times.map { event }.to_a }
let(:config) {
Expand All @@ -23,11 +23,11 @@
end

describe "batches that are too large for one" do
let(:event) { LogStash::Event.new("message" => "a " * (((target_bulk_bytes/2) / event_count)+1)) }
let(:event) { LogStash::Event.new("message" => "a " * (((batch_bulk_size/2) / event_count)+1)) }

it "should send in two batches" do
expect(subject.client).to have_received(:bulk_send).twice do |payload|
expect(payload.size).to be <= target_bulk_bytes
expect(payload.size).to be <= batch_bulk_size
end
end

Expand All @@ -38,7 +38,40 @@

it "should send in one batch" do
expect(subject.client).to have_received(:bulk_send).once do |payload|
expect(payload.size).to be <= target_bulk_bytes
expect(payload.size).to be <= batch_bulk_size
end
end
end
end

describe "custom bulk size set" do
let(:batch_bulk_size) { 5 * 1024 * 1024 }
let(:config) {
{
"hosts" => get_host_port,
"index" => index,
"bulk_batch_size" => batch_bulk_size
}
}

describe "batches that are too large for one" do
let(:event) { LogStash::Event.new("message" => "a " * (((batch_bulk_size/2) / event_count)+1)) }

it "should send in two batches" do
expect(subject.client).to have_received(:bulk_send).twice do |payload|
expect(payload.size).to be <= batch_bulk_size
end
end

describe "batches that fit in one" do
# Normally you'd want to generate a request that's just 1 byte below the limit, but it's
# impossible to know how many bytes an event will serialize as with bulk proto overhead
let(:event) { LogStash::Event.new("message" => "a") }

it "should send in one batch" do
expect(subject.client).to have_received(:bulk_send).once do |payload|
expect(payload.size).to be <= batch_bulk_size
end
end
end
end
Expand All @@ -53,7 +86,7 @@
let(:config) { "not implemented" }
let(:events) { event_count.times.map { event }.to_a }
subject { LogStash::Outputs::ElasticSearch.new(config) }

let(:es_url) { "http://#{get_host_port}" }
let(:index_url) {"#{es_url}/#{index}"}
let(:http_client_options) { {} }
Expand All @@ -65,7 +98,7 @@
subject.register
subject.multi_receive([])
end

shared_examples "an indexer" do |secure|
it "ships events" do
subject.multi_receive(events)
Expand All @@ -85,13 +118,13 @@
expect(doc["_index"]).to eq(index)
end
end

it "sets the correct content-type header" do
expected_manticore_opts = {:headers => {"Content-Type" => "application/json"}, :body => anything}
if secure
expected_manticore_opts = {
:headers => {"Content-Type" => "application/json"},
:body => anything,
:headers => {"Content-Type" => "application/json"},
:body => anything,
:auth => {
:user => user,
:password => password,
Expand Down Expand Up @@ -146,22 +179,22 @@
:auth => {
:user => user,
:password => password
},
},
:ssl => {
:enabled => true,
:ca_file => cacert
}
}
end
it_behaves_like("an indexer", true)

describe "with a password requiring escaping" do
let(:user) { "f@ncyuser" }
let(:password) { "ab%12#" }

include_examples("an indexer", true)
end

describe "with a user/password requiring escaping in the URL" do
let(:config) do
{
Expand All @@ -171,7 +204,7 @@
"index" => index
}
end

include_examples("an indexer", true)
end
end
Expand Down
Loading

0 comments on commit caaaf9a

Please sign in to comment.