Skip to content

Commit

Permalink
Merge pull request #1956 from digitallyinduced/worker-expontentiall-b…
Browse files Browse the repository at this point in the history
…ackoff

Worker: Exponential Backoff
  • Loading branch information
mpscholten authored Apr 26, 2024
2 parents 7f33abd + bea5b8a commit 74aa93d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
26 changes: 15 additions & 11 deletions IHP/Job/Queue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ fetchNextJob :: forall job.
, Show (PrimaryKey (GetTableName job))
, PG.FromField (PrimaryKey (GetTableName job))
, Table job
) => UUID -> IO (Maybe job)
fetchNextJob workerId = do
let query = "UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE ((status = ?) OR (status = ? AND updated_at < NOW() + interval '30 seconds')) AND locked_by IS NULL AND run_at <= NOW() ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id"
let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry)
) => BackoffStrategy -> UUID -> IO (Maybe job)
fetchNextJob backoffStrategy workerId = do
let query = PG.Query ("UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE ((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW() ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id")
let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds)

result :: [PG.Only (Id job)] <- sqlQuery query params
case result of
Expand All @@ -68,12 +68,12 @@ fetchNextJob workerId = do
-- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@
-- You will see that @"Something changed in the projects table"@ is printed onto the screen.
--
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ())
watchForJob pgListener tableName pollInterval onNewJob = do
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ())
watchForJob pgListener tableName pollInterval backoffStrategy onNewJob = do
let tableNameBS = cs tableName
sqlExec (createNotificationTrigger tableNameBS) ()

poller <- pollForJob tableName pollInterval onNewJob
poller <- pollForJob tableName pollInterval backoffStrategy onNewJob
subscription <- pgListener |> PGListener.subscribe (channelName tableNameBS) (const (Concurrent.putMVar onNewJob JobAvailable))

pure (subscription, poller)
Expand All @@ -86,10 +86,10 @@ watchForJob pgListener tableName pollInterval onNewJob = do
--
-- This function returns a Async. Call 'cancel' on the async to stop polling the database.
--
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ())
pollForJob tableName pollInterval onNewJob = do
let query = "SELECT COUNT(*) FROM ? WHERE ((status = ?) OR (status = ? AND updated_at < NOW() + interval '30 seconds')) AND locked_by IS NULL AND run_at <= NOW() LIMIT 1"
let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry)
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ())
pollForJob tableName pollInterval backoffStrategy onNewJob = do
let query = PG.Query ("SELECT COUNT(*) FROM ? WHERE ((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW() LIMIT 1")
let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds)
Async.asyncBound do
forever do
count :: Int <- sqlQueryScalar query params
Expand Down Expand Up @@ -255,3 +255,7 @@ instance InputValue JobStatus where

instance IHP.Controller.Param.ParamReader JobStatus where
readParameter = IHP.Controller.Param.enumParamReader

retryQuery :: BackoffStrategy -> ByteString
retryQuery LinearBackoff {} = "updated_at < NOW() + (interval '1 second' * ?)"
retryQuery ExponentialBackoff {} = "updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count)"
4 changes: 2 additions & 2 deletions IHP/Job/Runner.hs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do

case receivedAction of
JobAvailable -> do
maybeJob <- Queue.fetchNextJob @job workerId
maybeJob <- Queue.fetchNextJob @job (backoffStrategy @job) workerId
case maybeJob of
Just job -> do
Log.info ("Starting job: " <> tshow job)
Expand All @@ -191,7 +191,7 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do

loop

(subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) action
(subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) (backoffStrategy @job) action


pure JobWorkerProcess { runners, subscription, poller, action }
9 changes: 9 additions & 0 deletions IHP/Job/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module IHP.Job.Types
, Worker (..)
, JobWorkerProcess (..)
, JobWorkerProcessMessage (..)
, BackoffStrategy (..)
)
where

Expand All @@ -15,6 +16,11 @@ import IHP.FrameworkConfig
import qualified IHP.PGListener as PGListener
import qualified Control.Concurrent as Concurrent

data BackoffStrategy
= LinearBackoff { delayInSeconds :: !Int }
| ExponentialBackoff { delayInSeconds :: !Int }
deriving (Eq, Show)

class Job job where
perform :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => job -> IO ()

Expand All @@ -38,6 +44,9 @@ class Job job where
maxConcurrency :: Int
maxConcurrency = 16

backoffStrategy :: BackoffStrategy
backoffStrategy = LinearBackoff { delayInSeconds = 30 }

class Worker application where
workers :: application -> [JobWorker]

Expand Down

0 comments on commit 74aa93d

Please sign in to comment.