Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job: Restart jobs that should have timed out, usually when the worker… #1966

Merged
merged 1 commit into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions IHP/Job/Queue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ fetchNextJob :: forall job.
, Show (PrimaryKey (GetTableName job))
, PG.FromField (PrimaryKey (GetTableName job))
, Table job
) => BackoffStrategy -> UUID -> IO (Maybe job)
fetchNextJob backoffStrategy workerId = do
let query = PG.Query ("UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE ((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW() ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id")
let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds)
) => Maybe Int -> BackoffStrategy -> UUID -> IO (Maybe job)
fetchNextJob timeoutInMicroseconds backoffStrategy workerId = do
let query = PG.Query ("UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE (((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW()) " <> timeoutCondition timeoutInMicroseconds <> " ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id")
let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds, timeoutInMicroseconds)

result :: [PG.Only (Id job)] <- sqlQuery query params
case result of
Expand All @@ -68,12 +68,12 @@ fetchNextJob backoffStrategy workerId = do
-- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@
-- You will see that @"Something changed in the projects table"@ is printed onto the screen.
--
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ())
watchForJob pgListener tableName pollInterval backoffStrategy onNewJob = do
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> Maybe Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ())
watchForJob pgListener tableName pollInterval timeoutInMicroseconds backoffStrategy onNewJob = do
let tableNameBS = cs tableName
sqlExec (createNotificationTrigger tableNameBS) ()

poller <- pollForJob tableName pollInterval backoffStrategy onNewJob
poller <- pollForJob tableName pollInterval timeoutInMicroseconds backoffStrategy onNewJob
subscription <- pgListener |> PGListener.subscribe (channelName tableNameBS) (const (Concurrent.putMVar onNewJob JobAvailable))

pure (subscription, poller)
Expand All @@ -86,10 +86,10 @@ watchForJob pgListener tableName pollInterval backoffStrategy onNewJob = do
--
-- This function returns a Async. Call 'cancel' on the async to stop polling the database.
--
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ())
pollForJob tableName pollInterval backoffStrategy onNewJob = do
let query = PG.Query ("SELECT COUNT(*) FROM ? WHERE ((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW() LIMIT 1")
let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds)
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Maybe Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ())
pollForJob tableName pollInterval timeoutInMicroseconds backoffStrategy onNewJob = do
let query = PG.Query ("SELECT COUNT(*) FROM ? WHERE (((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW()) " <> timeoutCondition timeoutInMicroseconds <> " LIMIT 1")
let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds, timeoutInMicroseconds)
Async.asyncBound do
forever do
count :: Int <- sqlQueryScalar query params
Expand Down Expand Up @@ -258,4 +258,8 @@ instance IHP.Controller.Param.ParamReader JobStatus where

retryQuery :: BackoffStrategy -> ByteString
retryQuery LinearBackoff {} = "updated_at < NOW() + (interval '1 second' * ?)"
retryQuery ExponentialBackoff {} = "updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count)"
retryQuery ExponentialBackoff {} = "updated_at < NOW() - interval '1 second' * ? * POW(2, attempts_count)"

timeoutCondition :: Maybe Int -> ByteString
timeoutCondition (Just timeoutInMicroseconds) = "OR (status = 'job_status_running' AND locked_by IS NOT NULL AND locked_at + ((? + 1000000) || 'microseconds')::interval < NOW())" -- Add 1000000 here to avoid race condition with the Haskell based timeout mechanism
timeoutCondition Nothing = "AND (? IS NULL)"
4 changes: 2 additions & 2 deletions IHP/Job/Runner.hs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do

case receivedAction of
JobAvailable -> do
maybeJob <- Queue.fetchNextJob @job (backoffStrategy @job) workerId
maybeJob <- Queue.fetchNextJob @job (timeoutInMicroseconds @job) (backoffStrategy @job) workerId
case maybeJob of
Just job -> do
Log.info ("Starting job: " <> tshow job)
Expand All @@ -191,7 +191,7 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do

loop

(subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) (backoffStrategy @job) action
(subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) (timeoutInMicroseconds @job) (backoffStrategy @job) action


pure JobWorkerProcess { runners, subscription, poller, action }
2 changes: 1 addition & 1 deletion IHP/Job/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Job job where
maxAttempts :: (?job :: job) => Int
maxAttempts = 10

timeoutInMicroseconds :: (?job :: job) => Maybe Int
timeoutInMicroseconds :: Maybe Int
timeoutInMicroseconds = Nothing

-- | While jobs are typically fetch using pg_notiy, we have to poll the queue table
Expand Down