Skip to content

Commit

Permalink
Updated purge() and updateMetrics()
Browse files Browse the repository at this point in the history
Updated purge() and updateMetrics() to be more deterministic, to avoid exception.
  • Loading branch information
Jesper Callesen committed Sep 26, 2024
1 parent cc20648 commit 3806b53
Showing 1 changed file with 29 additions and 28 deletions.
57 changes: 29 additions & 28 deletions src/LuaScripts.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@ public static function updateMetrics()
redis.call('sadd', KEYS[2], KEYS[1])
local hash = redis.call('hmget', KEYS[1], 'throughput', 'runtime')
local throughput = hash[1] + 1
local throughput = tonumber(hash[1]) or 0
throughput = throughput + 1
local runtime = 0
if hash[2] then
runtime = ((hash[1] * tonumber(hash[2])) + tonumber(ARGV[1])) / throughput
runtime = ((tonumber(hash[1]) * tonumber(hash[2])) + tonumber(ARGV[1])) / throughput
else
runtime = tonumber(ARGV[1])
end
redis.call('hmset', KEYS[1], 'throughput', throughput, 'runtime', runtime)
return {throughput, runtime}
LUA;
}

Expand All @@ -50,30 +54,27 @@ public static function purge()
return <<<'LUA'
local count = 0
local cursor = 0
local batch_size = tonumber(100)
local start = tonumber(0)
repeat
-- Iterate over the recent jobs sorted set
local scanner = redis.call('zscan', KEYS[1], cursor)
cursor = scanner[1]
for i = 1, #scanner[2], 2 do
local jobid = scanner[2][i]
local hashkey = ARGV[1] .. jobid
local job = redis.call('hmget', hashkey, 'status', 'queue')
-- Delete the pending/reserved jobs, that match the queue
-- name, from the sorted sets as well as the job hash
if((job[1] == 'reserved' or job[1] == 'pending') and job[2] == ARGV[2]) then
redis.call('zrem', KEYS[1], jobid)
redis.call('zrem', KEYS[2], jobid)
redis.call('del', hashkey)
count = count + 1
end
-- Use ZRANGE to get a range of elements from the sorted set in a deterministic way
local jobs = redis.call('zrange', KEYS[1], start, start + batch_size - 1)
for i = 1, #jobs do
local jobid = jobs[i]
local hashkey = ARGV[1] .. jobid
local job = redis.call('hmget', hashkey, 'status', 'queue')
-- Delete the pending/reserved jobs that match the queue name
if ((job[1] == 'reserved' or job[1] == 'pending') and job[2] == ARGV[2]) then
redis.call('zrem', KEYS[1], jobid)
redis.call('zrem', KEYS[2], jobid)
redis.call('del', hashkey)
count = count + 1
end
until cursor == '0'
return count
end
return {count, start + batch_size}
LUA;
}
}
}

0 comments on commit 3806b53

Please sign in to comment.