Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make requests return exception only on 2xx codes #1203

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -182,22 +182,20 @@ def join_bulk_responses(bulk_responses)
def bulk_send(body_stream, batch_actions)
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}

response = @pool.post(@bulk_path, params, body_stream.string)

@bulk_response_metrics.increment(response.code.to_s)

case response.code
when 200 # OK
LogStash::Json.load(response.body)
when 413 # Payload Too Large
begin
response = @pool.post(@bulk_path, params, body_stream.string)
@bulk_response_metrics.increment(response.code.to_s)
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
@bulk_response_metrics.increment(e.response_code.to_s)
raise e unless e.response_code == 413
# special handling for 413, treat it as a document level issue
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
else
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
response.code, url, body_stream.to_s, response.body
)
emulate_batch_error_response(batch_actions, 413, 'payload_too_large')
rescue => e # it may be a network issue instead, re-raise
raise e
end

LogStash::Json.load(response.body)
end

def emulate_batch_error_response(actions, http_code, reason)
Expand Down Expand Up @@ -411,6 +409,9 @@ def host_to_url(h)
def exists?(path, use_get=false)
response = use_get ? @pool.get(path) : @pool.head(path)
response.code >= 200 && response.code <= 299
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
return true if e.code == 404
raise e
end

def template_exists?(template_endpoint, name)
Expand All @@ -420,7 +421,10 @@ def template_exists?(template_endpoint, name)
def template_put(template_endpoint, name, template)
path = "#{template_endpoint}/#{name}"
logger.info("Installing Elasticsearch template", name: name)
@pool.put(path, nil, LogStash::Json.dump(template))
response = @pool.put(path, nil, LogStash::Json.dump(template))
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
return response if e.code == 404
raise e
end

# ILM methods
Expand All @@ -432,17 +436,15 @@ def rollover_alias_exists?(name)

# Create a new rollover alias
def rollover_alias_put(alias_name, alias_definition)
begin
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
logger.info("Created rollover alias", name: alias_name)
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if e.response_code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
raise e
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
logger.info("Created rollover alias", name: alias_name)
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if e.response_code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
raise e
end

def get_xpack_info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,8 @@ def perform_request(url, method, path, params={}, body=nil)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string)
end

# 404s are excluded because they are valid codes in the case of
# template installation. We might need a better story around this later
# but for our current purposes this is correct
code = resp.code
if code < 200 || code > 299 && code != 404
if code < 200 || code > 299 # assume anything not 2xx is an error that the layer above needs to interpret
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body)
end

Expand Down
24 changes: 10 additions & 14 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,11 @@ def get_license(url)
def health_check_request(url)
logger.debug("Running health check to see if an Elasticsearch connection is working",
:healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
begin
response = perform_request_to_url(url, :head, @healthcheck_path)
return response, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
return nil, e
end
response = perform_request_to_url(url, :head, @healthcheck_path)
return response, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
return nil, e
end

def healthcheck!(register_phase = true)
Expand Down Expand Up @@ -312,13 +310,11 @@ def healthcheck!(register_phase = true)
end

def get_root_path(url, params={})
begin
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
return resp, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
return nil, e
end
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
return resp, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
return nil, e
end

def test_serverless_connection(url, root_response)
Expand Down
Loading