Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/backoff'
Browse files Browse the repository at this point in the history
Fixes: #23
  • Loading branch information
zorbash committed Jan 12, 2017
2 parents 261f508 + 8a0992b commit b20064a
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 4 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ To start creating your own, read [below](https://github.com/kittoframework/kitto
* Easy to deploy using the provided Docker images, or Heroku
* Can serve assets in production
* Keeps stats about defined jobs and comes with a dashboard to monitor them
* Can apply exponential back-offs to failing jobs
* [Reloads][code-reloading] code upon change in development

## Installation
Expand Down
1 change: 1 addition & 0 deletions lib/kitto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ defmodule Kitto do
defp children(_env) do
[supervisor(__MODULE__, [], function: :start_server),
supervisor(Kitto.Notifier, []),
worker(Kitto.BackoffServer, [[]]),
worker(Kitto.StatsServer, [[]]),
worker(Kitto.Runner, [[name: :runner]])]
end
Expand Down
9 changes: 9 additions & 0 deletions lib/kitto/backoff.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Kitto.Backoff do
@moduledoc """
Specification for a backoff module to be used with Kitto.
"""

@callback succeed(atom) :: any
@callback fail(atom):: any
@callback backoff!(atom) :: any
end
69 changes: 69 additions & 0 deletions lib/kitto/backoff_server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
defmodule Kitto.BackoffServer do
@moduledoc """
Module responsible for keeping and applying a backoff value
for a given atom.
### Configuration
* `:job_min_backoff` - The minimum time in milliseconds to backoff upon failure
* `:job_max_backoff` - The maximum time in milliseconds to backoff upon failure
"""

@behaviour Kitto.Backoff

use GenServer
use Bitwise

alias Kitto.Time

@server __MODULE__
@minval Time.mseconds(:second)
@maxval Time.mseconds({5, :minutes})

@doc false
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: opts[:name] || __MODULE__)
end

@doc false
def init(_), do: {:ok, %{}}

@doc """
Resets the backoff for the given atom to 0
"""
def succeed(name), do: set(name, 0)

@doc """
Increments the backoff value for the provided atom up to the
configured maximum value.
"""
def fail(name) do
case get(name) do
nil -> set(name, min(minval, maxval))
0 -> set(name, min(minval, maxval))
val -> set(name, min(val <<< 1, maxval))
end
end

@doc """
Makes the calling process sleep for the accumulated backoff time
for the given atom
"""
def backoff!(name), do: backoff!(name, name |> get)
defp backoff!(_name, val) when is_nil(val) or val == 0, do: :nop
defp backoff!(_name, val), do: :timer.sleep(val)

def get(name), do: GenServer.call(@server, {:get, name})
def reset, do: GenServer.call(@server, :reset)

### Callbacks
def handle_call(:reset, _from, _state), do: {:reply, nil, %{}}
def handle_call({:get, name}, _from, state), do: {:reply, state[name], state}
def handle_call({:set, name, value}, _from, state) do
{:reply, name, put_in(state[name], value)}
end

defp set(name, value), do: GenServer.call(@server, {:set, name, value})
defp minval, do: Application.get_env(:kitto, :job_min_backoff, @minval)
defp maxval, do: Application.get_env(:kitto, :job_max_backoff, @maxval)
end
19 changes: 15 additions & 4 deletions lib/kitto/stats_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,23 @@ defmodule Kitto.StatsServer do
{:reply, name, Map.merge(state, new_stats)}
end

def handle_cast(:reset, state), do: {:noreply, %{}}
def handle_cast(:reset, _state), do: {:noreply, %{}}
def handle_cast({:measure_call, job, run}, state) do
current_stats = state[job.name]

new_stats = case run do
{:ok, time_took} ->
backoff_module.succeed(job.name)
times_completed = current_stats[:times_completed] + 1
total_running_time = current_stats[:total_running_time] + time_took

%{current_stats |
times_completed: times_completed,
total_running_time: total_running_time
} |> Map.merge(%{avg_time_took: total_running_time / times_completed})
{:error, _} -> %{current_stats | failures: current_stats[:failures] + 1}
{:error, _} ->
backoff_module.fail(job.name)
%{current_stats | failures: current_stats[:failures] + 1}
end

{:noreply, Map.merge(state, %{job.name => new_stats})}
Expand All @@ -76,12 +79,14 @@ defmodule Kitto.StatsServer do
defp update_trigger_count(name),
do: GenServer.call(@server, {:update_trigger_count, name})
defp measure_call(job) do
if backoff_enabled?, do: backoff_module.backoff!(job.name)

run = timed_call(job.job)

GenServer.cast(@server, {:measure_call, job, run})

if elem(run, 0) == :error do
raise Kitto.Job.Error, %{exception: elem(run, 1), job: job}
else
GenServer.cast(@server, {:measure_call, job, run})
end
end

Expand All @@ -92,4 +97,10 @@ defmodule Kitto.StatsServer do
e -> {:error, e}
end
end

defp backoff_enabled?, do: Application.get_env :kitto, :job_backoff_enabled?, true

defp backoff_module do
Application.get_env :kitto, :backoff_module, Kitto.BackoffServer
end
end
79 changes: 79 additions & 0 deletions test/backoff_server_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
defmodule Kitto.BackoffServerTest do
use ExUnit.Case, async: true

alias Kitto.BackoffServer, as: Subject

@min 1

setup do
Subject.reset

on_exit fn ->
Application.delete_env :kitto, :job_min_backoff
Application.delete_env :kitto, :job_max_backoff
end
end

test "#succeed resets to 0 the backoff for a job" do
Subject.succeed :italian_job

assert Subject.get(:italian_job) == 0
end

test "#reset resets the state of the server to an empty map" do
Subject.fail :failjob
Subject.fail :otherjob
Subject.succeed :successjob

Subject.reset

assert is_nil(Subject.get(:failjob))
assert is_nil(Subject.get(:otherjob))
assert is_nil(Subject.get(:successjob))
end

test "#fail increases the backoff value exponentially (power of 2)" do
Subject.fail :failjob

val = Subject.get :failjob

Subject.fail :failjob
assert Subject.get(:failjob) == val * 2

Subject.fail :failjob
assert Subject.get(:failjob) == val * 4
end

test "#backoff! puts the current process to sleep for backoff time" do
maxval = 100
Application.put_env :kitto, :job_mix_backoff, 64
Application.put_env :kitto, :job_max_backoff, maxval
Subject.fail :failjob

{time, _} = :timer.tc fn -> Subject.backoff! :failjob end

assert_in_delta time / 1000, maxval, 5
end

describe "when :job_min_backoff is configured" do
setup [:set_job_min_backoff]

test "#fail initializes the backoff to the min value" do
Subject.fail :failjob

assert Subject.get(:failjob) == @min
end
end

describe "when :job_min_backoff is not configured" do
test "#fail initializes the backoff to the default min value" do
Subject.fail :failjob

assert Subject.get(:failjob) == Kitto.Time.mseconds(:second)
end
end

defp set_job_min_backoff(_context) do
Application.put_env :kitto, :job_min_backoff, @min
end
end
50 changes: 50 additions & 0 deletions test/stats_server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,24 @@ defmodule Kitto.StatsServerTest do

alias Kitto.StatsServer

defmodule BackoffMock do
@behaviour Kitto.Backoff

def succeed(_), do: send self, {:ok, :mocked}
def fail(_), do: send self, {:ok, :mocked}
def backoff!(_), do: send self, {:ok, :mocked}
end

setup do
Application.put_env :kitto, :backoff_module, Kitto.StatsServerTest.BackoffMock
definition = %{file: "jobs/dummy.exs", line: 1}
job = %{name: :dummy_job, options: %{}, definition: definition, job: fn -> :ok end}

on_exit fn ->
Application.delete_env :kitto, :backoff_module
Application.delete_env :kitto, :job_backoff_enabled?
end

%{
successful_job: job,
failing_job: %{job | job: fn -> raise RuntimeError end},
Expand Down Expand Up @@ -119,4 +133,40 @@ defmodule Kitto.StatsServerTest do
~r/Stacktrace: .*? anonymous fn/,
fn -> StatsServer.measure(job) end
end

describe "when :job_backoff_enabled? is set to false" do
setup [:disable_job_backoff]

test "#measure does not apply backoffs", context do
StatsServer.measure(context.successful_job)

refute_received {:ok, :mocked}
end
end

describe "when :job_backoff_enabled? is set to true" do
setup [:enable_job_backoff]

test "#measure applies backoffs", context do
StatsServer.measure(context.successful_job)

assert_received {:ok, :mocked}
end
end

describe "when :job_backoff_enabled? is not set" do
test "#measure applies backoffs", context do
StatsServer.measure(context.successful_job)

assert_received {:ok, :mocked}
end
end

defp disable_job_backoff(_context) do
Application.put_env :kitto, :job_backoff_enabled?, false
end

defp enable_job_backoff(_context) do
Application.put_env :kitto, :job_backoff_enabled?, true
end
end

0 comments on commit b20064a

Please sign in to comment.