diff --git a/IHP/Job/Queue.hs b/IHP/Job/Queue.hs index 88c7fae40..de8ec4500 100644 --- a/IHP/Job/Queue.hs +++ b/IHP/Job/Queue.hs @@ -41,10 +41,10 @@ fetchNextJob :: forall job. , Show (PrimaryKey (GetTableName job)) , PG.FromField (PrimaryKey (GetTableName job)) , Table job - ) => UUID -> IO (Maybe job) -fetchNextJob workerId = do - let query = "UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE ((status = ?) OR (status = ? AND updated_at < NOW() + interval '30 seconds')) AND locked_by IS NULL AND run_at <= NOW() ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id" - let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry) + ) => BackoffStrategy -> UUID -> IO (Maybe job) +fetchNextJob backoffStrategy workerId = do + let query = PG.Query ("UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE ((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW() ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id") + let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds) result :: [PG.Only (Id job)] <- sqlQuery query params case result of @@ -68,12 +68,12 @@ fetchNextJob workerId = do -- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@ -- You will see that @"Something changed in the projects table"@ is printed onto the screen. -- -watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ()) -watchForJob pgListener tableName pollInterval onNewJob = do +watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ()) +watchForJob pgListener tableName pollInterval backoffStrategy onNewJob = do let tableNameBS = cs tableName sqlExec (createNotificationTrigger tableNameBS) () - poller <- pollForJob tableName pollInterval onNewJob + poller <- pollForJob tableName pollInterval backoffStrategy onNewJob subscription <- pgListener |> PGListener.subscribe (channelName tableNameBS) (const (Concurrent.putMVar onNewJob JobAvailable)) pure (subscription, poller) @@ -86,10 +86,10 @@ watchForJob pgListener tableName pollInterval onNewJob = do -- -- This function returns a Async. Call 'cancel' on the async to stop polling the database. -- -pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ()) -pollForJob tableName pollInterval onNewJob = do - let query = "SELECT COUNT(*) FROM ? WHERE ((status = ?) OR (status = ? AND updated_at < NOW() + interval '30 seconds')) AND locked_by IS NULL AND run_at <= NOW() LIMIT 1" - let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry) +pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ()) +pollForJob tableName pollInterval backoffStrategy onNewJob = do + let query = PG.Query ("SELECT COUNT(*) FROM ? WHERE ((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW() LIMIT 1") + let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds) Async.asyncBound do forever do count :: Int <- sqlQueryScalar query params @@ -255,3 +255,7 @@ instance InputValue JobStatus where instance IHP.Controller.Param.ParamReader JobStatus where readParameter = IHP.Controller.Param.enumParamReader + +retryQuery :: BackoffStrategy -> ByteString +retryQuery LinearBackoff {} = "updated_at < NOW() + (interval '1 second' * ?)" +retryQuery ExponentialBackoff {} = "updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count)" \ No newline at end of file diff --git a/IHP/Job/Runner.hs b/IHP/Job/Runner.hs index 1c3b26f97..eab19de53 100644 --- a/IHP/Job/Runner.hs +++ b/IHP/Job/Runner.hs @@ -169,7 +169,7 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do case receivedAction of JobAvailable -> do - maybeJob <- Queue.fetchNextJob @job workerId + maybeJob <- Queue.fetchNextJob @job (backoffStrategy @job) workerId case maybeJob of Just job -> do Log.info ("Starting job: " <> tshow job) @@ -191,7 +191,7 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do loop - (subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) action + (subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) (backoffStrategy @job) action pure JobWorkerProcess { runners, subscription, poller, action } diff --git a/IHP/Job/Types.hs b/IHP/Job/Types.hs index 33d18fba7..135ff4173 100644 --- a/IHP/Job/Types.hs +++ b/IHP/Job/Types.hs @@ -7,6 +7,7 @@ module IHP.Job.Types , Worker (..) , JobWorkerProcess (..) , JobWorkerProcessMessage (..) +, BackoffStrategy (..) ) where @@ -15,6 +16,11 @@ import IHP.FrameworkConfig import qualified IHP.PGListener as PGListener import qualified Control.Concurrent as Concurrent +data BackoffStrategy + = LinearBackoff { delayInSeconds :: !Int } + | ExponentialBackoff { delayInSeconds :: !Int } + deriving (Eq, Show) + class Job job where perform :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => job -> IO () @@ -38,6 +44,9 @@ class Job job where maxConcurrency :: Int maxConcurrency = 16 + backoffStrategy :: BackoffStrategy + backoffStrategy = LinearBackoff { delayInSeconds = 30 } + class Worker application where workers :: application -> [JobWorker]