diff --git a/IHP/Job/Queue.hs b/IHP/Job/Queue.hs index de8ec4500..2bb334a49 100644 --- a/IHP/Job/Queue.hs +++ b/IHP/Job/Queue.hs @@ -41,10 +41,10 @@ fetchNextJob :: forall job. , Show (PrimaryKey (GetTableName job)) , PG.FromField (PrimaryKey (GetTableName job)) , Table job - ) => BackoffStrategy -> UUID -> IO (Maybe job) -fetchNextJob backoffStrategy workerId = do - let query = PG.Query ("UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE ((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW() ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id") - let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds) + ) => Maybe Int -> BackoffStrategy -> UUID -> IO (Maybe job) +fetchNextJob timeoutInMicroseconds backoffStrategy workerId = do + let query = PG.Query ("UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE (((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW()) " <> timeoutCondition timeoutInMicroseconds <> " ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id") + let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds, timeoutInMicroseconds) result :: [PG.Only (Id job)] <- sqlQuery query params case result of @@ -68,12 +68,12 @@ fetchNextJob backoffStrategy workerId = do -- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@ -- You will see that @"Something changed in the projects table"@ is printed onto the screen. -- -watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ()) -watchForJob pgListener tableName pollInterval backoffStrategy onNewJob = do +watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> Maybe Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ()) +watchForJob pgListener tableName pollInterval timeoutInMicroseconds backoffStrategy onNewJob = do let tableNameBS = cs tableName sqlExec (createNotificationTrigger tableNameBS) () - poller <- pollForJob tableName pollInterval backoffStrategy onNewJob + poller <- pollForJob tableName pollInterval timeoutInMicroseconds backoffStrategy onNewJob subscription <- pgListener |> PGListener.subscribe (channelName tableNameBS) (const (Concurrent.putMVar onNewJob JobAvailable)) pure (subscription, poller) @@ -86,10 +86,10 @@ watchForJob pgListener tableName pollInterval backoffStrategy onNewJob = do -- -- This function returns a Async. Call 'cancel' on the async to stop polling the database. -- -pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ()) -pollForJob tableName pollInterval backoffStrategy onNewJob = do - let query = PG.Query ("SELECT COUNT(*) FROM ? WHERE ((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW() LIMIT 1") - let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds) +pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Maybe Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ()) +pollForJob tableName pollInterval timeoutInMicroseconds backoffStrategy onNewJob = do + let query = PG.Query ("SELECT COUNT(*) FROM ? WHERE (((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW()) " <> timeoutCondition timeoutInMicroseconds <> " LIMIT 1") + let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds, timeoutInMicroseconds) Async.asyncBound do forever do count :: Int <- sqlQueryScalar query params @@ -258,4 +258,8 @@ instance IHP.Controller.Param.ParamReader JobStatus where retryQuery :: BackoffStrategy -> ByteString retryQuery LinearBackoff {} = "updated_at < NOW() + (interval '1 second' * ?)" -retryQuery ExponentialBackoff {} = "updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count)" \ No newline at end of file +retryQuery ExponentialBackoff {} = "updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count)" + +timeoutCondition :: Maybe Int -> ByteString +timeoutCondition (Just timeoutInMicroseconds) = "OR (status = 'job_status_running' AND locked_by IS NOT NULL AND locked_at + ((? + 1000000) || 'microseconds')::interval < NOW())" -- Add 1000000 here to avoid race condition with the Haskell based timeout mechanism +timeoutCondition Nothing = "AND (? IS NULL)" \ No newline at end of file diff --git a/IHP/Job/Runner.hs b/IHP/Job/Runner.hs index eab19de53..3e8f48ceb 100644 --- a/IHP/Job/Runner.hs +++ b/IHP/Job/Runner.hs @@ -169,7 +169,7 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do case receivedAction of JobAvailable -> do - maybeJob <- Queue.fetchNextJob @job (backoffStrategy @job) workerId + maybeJob <- Queue.fetchNextJob @job (timeoutInMicroseconds @job) (backoffStrategy @job) workerId case maybeJob of Just job -> do Log.info ("Starting job: " <> tshow job) @@ -191,7 +191,7 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do loop - (subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) (backoffStrategy @job) action + (subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) (timeoutInMicroseconds @job) (backoffStrategy @job) action pure JobWorkerProcess { runners, subscription, poller, action } diff --git a/IHP/Job/Types.hs b/IHP/Job/Types.hs index 135ff4173..736a3804c 100644 --- a/IHP/Job/Types.hs +++ b/IHP/Job/Types.hs @@ -27,7 +27,7 @@ class Job job where maxAttempts :: (?job :: job) => Int maxAttempts = 10 - timeoutInMicroseconds :: (?job :: job) => Maybe Int + timeoutInMicroseconds :: Maybe Int timeoutInMicroseconds = Nothing -- | While jobs are typically fetch using pg_notiy, we have to poll the queue table