forked from microsoft/onefuzz
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjobs.py
83 lines (67 loc) · 2.34 KB
/
jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import logging
from datetime import datetime, timedelta
from typing import List, Optional, Tuple
from onefuzztypes.enums import JobState, TaskState
from onefuzztypes.models import Job as BASE_JOB
from .orm import MappingIntStrAny, ORMMixin, QueryFilter
from .tasks.main import Task
class Job(BASE_JOB, ORMMixin):
@classmethod
def key_fields(cls) -> Tuple[str, Optional[str]]:
return ("job_id", None)
@classmethod
def search_states(cls, *, states: Optional[List[JobState]] = None) -> List["Job"]:
query: QueryFilter = {}
if states:
query["state"] = states
return cls.search(query=query)
@classmethod
def search_expired(cls) -> List["Job"]:
time_filter = "end_time lt datetime'%s'" % datetime.utcnow().isoformat()
return cls.search(
query={"state": JobState.available()}, raw_unchecked_filter=time_filter
)
def save_exclude(self) -> Optional[MappingIntStrAny]:
return {"task_info": ...}
def event_include(self) -> Optional[MappingIntStrAny]:
return {
"job_id": ...,
"state": ...,
"error": ...,
}
def telemetry_include(self) -> Optional[MappingIntStrAny]:
return {
"machine_id": ...,
"state": ...,
"scaleset_id": ...,
}
def init(self) -> None:
logging.info("init job: %s", self.job_id)
self.state = JobState.enabled
self.save()
def stopping(self) -> None:
self.state = JobState.stopping
logging.info("stopping job: %s", self.job_id)
not_stopped = [
task
for task in Task.search(query={"job_id": [self.job_id]})
if task.state != TaskState.stopped
]
if not_stopped:
for task in not_stopped:
task.state = TaskState.stopping
task.save()
else:
self.state = JobState.stopped
self.save()
def queue_stop(self) -> None:
self.queue(method=self.stopping)
def on_start(self) -> None:
# try to keep this effectively idempotent
if self.end_time is None:
self.end_time = datetime.utcnow() + timedelta(hours=self.config.duration)
self.save()