From 9183b314e3de53227839044439b2b2cf2e4bf5cd Mon Sep 17 00:00:00 2001 From: Marc Scholten Date: Fri, 26 Apr 2024 14:34:33 -0700 Subject: [PATCH 1/2] Added exponential backoff to IHP job runner --- IHP/Job/Queue.hs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/IHP/Job/Queue.hs b/IHP/Job/Queue.hs index 88c7fae40..c77c8ae4b 100644 --- a/IHP/Job/Queue.hs +++ b/IHP/Job/Queue.hs @@ -43,8 +43,9 @@ fetchNextJob :: forall job. , Table job ) => UUID -> IO (Maybe job) fetchNextJob workerId = do - let query = "UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE ((status = ?) OR (status = ? AND updated_at < NOW() + interval '30 seconds')) AND locked_by IS NULL AND run_at <= NOW() ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id" - let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry) + let baseDelay :: Int = 30 -- base delay in seconds + let query = "UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE ((status = ?) OR (status = ? AND updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count))) AND locked_by IS NULL AND run_at <= NOW() ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id" + let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry, baseDelay) result :: [PG.Only (Id job)] <- sqlQuery query params case result of @@ -88,7 +89,8 @@ watchForJob pgListener tableName pollInterval onNewJob = do -- pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ()) pollForJob tableName pollInterval onNewJob = do - let query = "SELECT COUNT(*) FROM ? WHERE ((status = ?) OR (status = ? AND updated_at < NOW() + interval '30 seconds')) AND locked_by IS NULL AND run_at <= NOW() LIMIT 1" + let baseDelay :: Int = 30 -- base delay in seconds + let query = "SELECT COUNT(*) FROM ? WHERE ((status = ?) OR (status = ? AND updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count))) AND locked_by IS NULL AND run_at <= NOW() LIMIT 1" let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry) Async.asyncBound do forever do From bea5b8adb70a391bc621953bafb666a386806999 Mon Sep 17 00:00:00 2001 From: Marc Scholten Date: Fri, 26 Apr 2024 15:12:50 -0700 Subject: [PATCH 2/2] Support exponential and linear backoffs for better BC --- IHP/Job/Queue.hs | 28 +++++++++++++++------------- IHP/Job/Runner.hs | 4 ++-- IHP/Job/Types.hs | 9 +++++++++ 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/IHP/Job/Queue.hs b/IHP/Job/Queue.hs index c77c8ae4b..de8ec4500 100644 --- a/IHP/Job/Queue.hs +++ b/IHP/Job/Queue.hs @@ -41,11 +41,10 @@ fetchNextJob :: forall job. , Show (PrimaryKey (GetTableName job)) , PG.FromField (PrimaryKey (GetTableName job)) , Table job - ) => UUID -> IO (Maybe job) -fetchNextJob workerId = do - let baseDelay :: Int = 30 -- base delay in seconds - let query = "UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE ((status = ?) OR (status = ? AND updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count))) AND locked_by IS NULL AND run_at <= NOW() ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id" - let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry, baseDelay) + ) => BackoffStrategy -> UUID -> IO (Maybe job) +fetchNextJob backoffStrategy workerId = do + let query = PG.Query ("UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE ((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW() ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id") + let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds) result :: [PG.Only (Id job)] <- sqlQuery query params case result of @@ -69,12 +68,12 @@ fetchNextJob workerId = do -- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@ -- You will see that @"Something changed in the projects table"@ is printed onto the screen. -- -watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ()) -watchForJob pgListener tableName pollInterval onNewJob = do +watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ()) +watchForJob pgListener tableName pollInterval backoffStrategy onNewJob = do let tableNameBS = cs tableName sqlExec (createNotificationTrigger tableNameBS) () - poller <- pollForJob tableName pollInterval onNewJob + poller <- pollForJob tableName pollInterval backoffStrategy onNewJob subscription <- pgListener |> PGListener.subscribe (channelName tableNameBS) (const (Concurrent.putMVar onNewJob JobAvailable)) pure (subscription, poller) @@ -87,11 +86,10 @@ watchForJob pgListener tableName pollInterval onNewJob = do -- -- This function returns a Async. Call 'cancel' on the async to stop polling the database. -- -pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ()) -pollForJob tableName pollInterval onNewJob = do - let baseDelay :: Int = 30 -- base delay in seconds - let query = "SELECT COUNT(*) FROM ? WHERE ((status = ?) OR (status = ? AND updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count))) AND locked_by IS NULL AND run_at <= NOW() LIMIT 1" - let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry) +pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ()) +pollForJob tableName pollInterval backoffStrategy onNewJob = do + let query = PG.Query ("SELECT COUNT(*) FROM ? WHERE ((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW() LIMIT 1") + let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds) Async.asyncBound do forever do count :: Int <- sqlQueryScalar query params @@ -257,3 +255,7 @@ instance InputValue JobStatus where instance IHP.Controller.Param.ParamReader JobStatus where readParameter = IHP.Controller.Param.enumParamReader + +retryQuery :: BackoffStrategy -> ByteString +retryQuery LinearBackoff {} = "updated_at < NOW() + (interval '1 second' * ?)" +retryQuery ExponentialBackoff {} = "updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count)" \ No newline at end of file diff --git a/IHP/Job/Runner.hs b/IHP/Job/Runner.hs index 1c3b26f97..eab19de53 100644 --- a/IHP/Job/Runner.hs +++ b/IHP/Job/Runner.hs @@ -169,7 +169,7 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do case receivedAction of JobAvailable -> do - maybeJob <- Queue.fetchNextJob @job workerId + maybeJob <- Queue.fetchNextJob @job (backoffStrategy @job) workerId case maybeJob of Just job -> do Log.info ("Starting job: " <> tshow job) @@ -191,7 +191,7 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do loop - (subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) action + (subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) (backoffStrategy @job) action pure JobWorkerProcess { runners, subscription, poller, action } diff --git a/IHP/Job/Types.hs b/IHP/Job/Types.hs index 33d18fba7..135ff4173 100644 --- a/IHP/Job/Types.hs +++ b/IHP/Job/Types.hs @@ -7,6 +7,7 @@ module IHP.Job.Types , Worker (..) , JobWorkerProcess (..) , JobWorkerProcessMessage (..) +, BackoffStrategy (..) ) where @@ -15,6 +16,11 @@ import IHP.FrameworkConfig import qualified IHP.PGListener as PGListener import qualified Control.Concurrent as Concurrent +data BackoffStrategy + = LinearBackoff { delayInSeconds :: !Int } + | ExponentialBackoff { delayInSeconds :: !Int } + deriving (Eq, Show) + class Job job where perform :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => job -> IO () @@ -38,6 +44,9 @@ class Job job where maxConcurrency :: Int maxConcurrency = 16 + backoffStrategy :: BackoffStrategy + backoffStrategy = LinearBackoff { delayInSeconds = 30 } + class Worker application where workers :: application -> [JobWorker]