Skip to content

Commit

Permalink
delete pipeline in registry (#12414)
Browse files Browse the repository at this point in the history
deletes the pipeline in the pipelines_registry if it is terminated and is removed in the source

Fixed: #12414
  • Loading branch information
kaisecheng authored Nov 6, 2020
1 parent 6bb2bd6 commit 244a9f4
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 3 deletions.
4 changes: 3 additions & 1 deletion logstash-core/lib/logstash/pipeline_action.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
require "logstash/pipeline_action/create"
require "logstash/pipeline_action/stop"
require "logstash/pipeline_action/reload"
require "logstash/pipeline_action/delete"

module LogStash module PipelineAction
ORDERING = {
LogStash::PipelineAction::Create => 100,
LogStash::PipelineAction::Reload => 200,
LogStash::PipelineAction::Stop => 300
LogStash::PipelineAction::Stop => 300,
LogStash::PipelineAction::Delete => 400
}
end end
38 changes: 38 additions & 0 deletions logstash-core/lib/logstash/pipeline_action/delete.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

require "logstash/pipeline_action/base"

module LogStash module PipelineAction
class Delete < Base
attr_reader :pipeline_id

def initialize(pipeline_id)
@pipeline_id = pipeline_id
end

def execute(agent, pipelines_registry)
success = pipelines_registry.delete_pipeline(@pipeline_id)

LogStash::ConvergeResult::ActionResult.create(self, success)
end

def to_s
"PipelineAction::Delete<#{pipeline_id}>"
end
end
end end
27 changes: 26 additions & 1 deletion logstash-core/lib/logstash/pipelines_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ def put(pipeline_id, state)
def remove(pipeline_id)
@lock.synchronize do
@states.delete(pipeline_id)
@locks.delete(pipeline_id)
end
end

Expand Down Expand Up @@ -209,6 +208,32 @@ def reload_pipeline(pipeline_id, &reload_block)
lock.unlock
end

# Delete the pipeline that is terminated
# @param pipeline_id [String, Symbol] the pipeline id
# @return [Boolean] pipeline delete success
def delete_pipeline(pipeline_id)
lock = @states.get_lock(pipeline_id)
lock.lock

state = @states.get(pipeline_id)

if state.nil?
logger.error("Attempted to delete a pipeline that does not exists", :pipeline_id => pipeline_id)
return false
end

if state.terminated?
@states.remove(pipeline_id)
logger.info("Removed pipeline from registry successfully", :pipeline_id => pipeline_id)
return true
else
logger.info("Attempted to delete a pipeline that is not terminated", :pipeline_id => pipeline_id)
return false
end
ensure
lock.unlock
end

# @param pipeline_id [String, Symbol] the pipeline id
# @return [Pipeline] the pipeline object or nil if none for pipeline_id
def get_pipeline(pipeline_id)
Expand Down
7 changes: 6 additions & 1 deletion logstash-core/lib/logstash/state_resolver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,19 @@ def resolve(pipelines_registry, pipeline_configs)
end
end

configured_pipelines = pipeline_configs.map { |config| config.pipeline_id.to_sym }
configured_pipelines = pipeline_configs.each_with_object(Set.new) { |config, set| set.add(config.pipeline_id.to_sym) }

# If one of the running pipeline is not in the pipeline_configs, we assume that we need to
# stop it.
pipelines_registry.running_pipelines.keys
.select { |pipeline_id| !configured_pipelines.include?(pipeline_id) }
.each { |pipeline_id| actions << LogStash::PipelineAction::Stop.new(pipeline_id) }

# If one of the terminated pipeline is not in the pipeline_configs, delete it in registry.
pipelines_registry.non_running_pipelines.keys
.select { |pipeline_id| !configured_pipelines.include?(pipeline_id) }
.each { |pipeline_id| actions << LogStash::PipelineAction::Delete.new(pipeline_id)}

actions.sort # See logstash/pipeline_action.rb
end
end
Expand Down
42 changes: 42 additions & 0 deletions logstash-core/spec/logstash/pipelines_registry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,48 @@
end
end

context "deleting a pipeline" do
context "when pipeline is in registry" do
before :each do
subject.create_pipeline(pipeline_id, pipeline) { true }
end

it "should not delete pipeline if pipeline is not terminated" do
expect(pipeline).to receive(:finished_execution?).and_return(false)
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
expect(logger).to receive(:info)
expect(subject.delete_pipeline(pipeline_id)).to be_falsey
expect(subject.get_pipeline(pipeline_id)).not_to be_nil
end

it "should delete pipeline if pipeline is terminated" do
expect(pipeline).to receive(:finished_execution?).and_return(true)
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
expect(logger).to receive(:info)
expect(subject.delete_pipeline(pipeline_id)).to be_truthy
expect(subject.get_pipeline(pipeline_id)).to be_nil
end

it "should recreate pipeline if pipeline is delete and create again" do
expect(pipeline).to receive(:finished_execution?).and_return(true)
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
expect(logger).to receive(:info)
expect(subject.delete_pipeline(pipeline_id)).to be_truthy
expect(subject.get_pipeline(pipeline_id)).to be_nil
subject.create_pipeline(pipeline_id, pipeline) { true }
expect(subject.get_pipeline(pipeline_id)).not_to be_nil
end
end

context "when pipeline is not in registry" do
it "should log error" do
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
expect(logger).to receive(:error)
expect(subject.delete_pipeline(pipeline_id)).to be_falsey
end
end
end

context "pipelines collections" do
context "with a non terminated pipelines" do
before :each do
Expand Down
40 changes: 40 additions & 0 deletions logstash-core/spec/logstash/state_resolver_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,45 @@
end
end
end

context "when a pipeline stops" do
let(:main_pipeline) { mock_pipeline(:main) }
let(:main_pipeline_config) { main_pipeline.pipeline_config }
let(:pipelines) do
r = LogStash::PipelinesRegistry.new
r.create_pipeline(:main, main_pipeline) { true }
r
end

before do
expect(main_pipeline).to receive(:finished_execution?).at_least(:once).and_return(true)
end

context "when pipeline config contains a new one and the existing" do
let(:pipeline_configs) { [mock_pipeline_config(:hello_world), main_pipeline_config ] }

it "creates the new one and keep the other one stop" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:create, :hello_world])
expect(pipelines.non_running_pipelines.size).to eq(1)
end
end

context "when pipeline config contains an updated pipeline" do
let(:pipeline_configs) { [mock_pipeline_config(:main, "input { generator {}}")] }

it "should reload the stopped pipeline" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:reload, :main])
end
end

context "when pipeline config contains no pipeline" do
let(:pipeline_configs) { [] }

it "should delete the stopped one" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:delete, :main])
end
end
end

end
end

0 comments on commit 244a9f4

Please sign in to comment.