-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_dev_check_five.py
73 lines (57 loc) · 1.89 KB
/
test_dev_check_five.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import time
import threading
from config import config
from message_monitor import MessageMonitor
from job import Job
from util import Util
mm = MessageMonitor()
mm.clear_all_state()
job = Job()
util = Util()
def dev_check_five(results):
print("Testing dev_check_five subs and loops")
filename = "dev_check_five_subs_loops.sbp"
name = "testing dev check five"
description = "testing dev check five subs and loops"
# Submit the job
job.submit(filename, name, description)
# Run the Job
job.run_next_job_in_queue()
print("waiting for running")
check = util.test_check(mm.wait_for_state("running", 10), "now running", "timed out while waiting for running")
if check is False:
return
# Wait for a bit, then quit
# TODO should add various pause and resumes test cases
time.sleep(10)
job.pause()
time.sleep(2)
job.quit()
# Make sure we are in an expected state
# If something went wrong, we will probably not be idle
print("waiting for idle")
check = util.test_check(mm.wait_for_state("idle", 10), "now idle", "timed out while waiting for idle")
if check is False:
return
# Did tests pass?
results["code"] = True
return
def thread_for_mm(args):
mm.run()
# test function
def test_dev_check_five():
# setting things up so test can run
messageMonitorThread = threading.Thread(target=thread_for_mm, args=(1,), daemon=True)
results = {"code":False}
testThread = threading.Thread(target=dev_check_five, args=(results,))
# test sequence
messageMonitorThread.start()
time.sleep(1) # time for the MessageMonitor to get up and running
testThread.start()
testThread.join() #waiting for the test to return
#reporting result
assert results["code"] is True
if __name__ == "__main__":
print(config.API_URL)
print("Testing dev_check_five")
test_dev_check_five()