forked from taskcluster/taskcluster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
0064.yml
124 lines (122 loc) · 5.17 KB
/
0064.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
version: 64
description: Add `queue_worker_seen` and `queue_task_queue_seen` functions
methods:
create_task_queue: {deprecated: true}
update_task_queue: {deprecated: true}
task_queue_seen:
description: |-
Recognize that a task queue has been seen, creating it if necessary, updating
its properties if not null, and in any case bumping its last seen time time.
The expiration time is not allowed to move backward.
This function always writes to the DB, so calls should be suitably rate-limited at the
client side.
mode: write
serviceName: queue
args: task_queue_id_in text, expires_in timestamptz, description_in text, stability_in text
returns: void
body: |-
begin
insert
into task_queues (task_queue_id, expires, last_date_active, description, stability)
values (
task_queue_id_in,
expires_in,
now(),
coalesce(description_in, ''),
coalesce(stability_in, 'experimental')
)
on conflict (task_queue_id) do update
set
expires = greatest(coalesce(expires_in, task_queues.expires), task_queues.expires),
last_date_active = now(),
description = coalesce(description_in, task_queues.description),
stability = coalesce(stability_in, task_queues.stability)
where task_queues.task_queue_id = task_queue_id_in;
end
update_queue_worker_tqid: {deprecated: true}
create_queue_worker_tqid: {deprecated: true}
queue_worker_seen:
description: |-
Recognize that a worker has been seen by the queue, creating it if necessary. This is called
when workers claim or re-claim work. The expiration time is not allowed to move backward.
This function always writes to the DB, so calls should be suitably rate-limited at the
client side.
mode: write
serviceName: queue
args: task_queue_id_in text, worker_group_in text, worker_id_in text, expires_in timestamptz
returns: void
body: |-
begin
insert
into queue_workers (task_queue_id, worker_group, worker_id, quarantine_until, expires, first_claim, recent_tasks)
values (
task_queue_id_in,
worker_group_in,
worker_id_in,
now() - interval '10 years',
expires_in,
now(),
jsonb_build_array()
)
on conflict (task_queue_id, worker_group, worker_id) do update
set
expires = greatest(coalesce(expires_in, queue_workers.expires), queue_workers.expires)
where
queue_workers.task_queue_id = task_queue_id_in and
queue_workers.worker_group = worker_group_in and
queue_workers.worker_id = worker_id_in;
end
quarantine_queue_worker:
description: |-
Update the quarantine_until date for a worker. The Queue service interprets a date in the past
as "not quarantined". This function also "bumps" the expiration of the worker so that un-quarantined
workers do not immediately expire. Returns the worker row just as get_queue_worker would, or no rows if
no such worker exists.
mode: write
serviceName: queue
args: task_queue_id_in text, worker_group_in text, worker_id_in text, quarantine_until_in timestamptz
returns: table(task_queue_id text, worker_group text, worker_id text, quarantine_until timestamptz, expires timestamptz, first_claim timestamptz, recent_tasks jsonb)
body: |-
begin
return query update queue_workers
set
quarantine_until = quarantine_until_in,
expires = greatest(queue_workers.expires, now() + interval '1 day')
where
queue_workers.task_queue_id = task_queue_id_in and
queue_workers.worker_group = worker_group_in and
queue_workers.worker_id = worker_id_in
returning
queue_workers.task_queue_id,
queue_workers.worker_group,
queue_workers.worker_id,
queue_workers.quarantine_until,
queue_workers.expires,
queue_workers.first_claim,
queue_workers.recent_tasks;
end
queue_worker_task_seen:
description: |-
Update the worker record to indicate that this task run was seen there. The
task run should be a JSON object with keys `taskId` and `runId`. This will
add the task to `recent_tasks`, keeping the most recent 20 tasks. This
will do nothing, but not fail, if the worker does not exist, as it is
unusual for a nonexistent worker to claim work.
mode: write
serviceName: queue
args: task_queue_id_in text, worker_group_in text, worker_id_in text, task_run_in jsonb
returns: void
body: |-
begin
update queue_workers
set
-- append without increasing size over 20
recent_tasks = case
when jsonb_array_length(recent_tasks) > 19 then (recent_tasks - 0)
else recent_tasks
end || jsonb_build_array(task_run_in)
where
queue_workers.task_queue_id = task_queue_id_in and
queue_workers.worker_group = worker_group_in and
queue_workers.worker_id = worker_id_in;
end