-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsimulator.py
123 lines (90 loc) · 3.54 KB
/
simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from __future__ import division
import random
from heapq import heapify, heappop, heappush
import schedulers
ARRIVAL, COMPLETE, INTERNAL = 0, 1, 2
eps = 0.001
rand = random.Random()
def identity(x):
return x
def lognorm_error(sigma, factor=1):
def err_func(x):
return factor * x * rand.lognormvariate(0, sigma)
return err_func
def normal_error(sigma, factor=1):
def err_func(x):
while True:
res = factor * x * rand.gauss(1, sigma)
if res >= 0:
return res
return err_func
def fixed_estimations(estimations):
estimations_i = iter(estimations)
def err_func(x):
return next(estimations_i)
return err_func
def simulator(jobs, scheduler_factory=schedulers.PS,
size_estimation=identity, priorities=None):
events = [(t, ARRIVAL, (jobid, size)) for jobid, t, size in jobs]
heapify(events) # not needed if jobs are sorted by arrival time
remaining = {} # mapping jobid to remaining size
schedule = {} # mapping from jobid to resource ratio -- values
# should add up to <= 1
scheduler = scheduler_factory()
last_t = 0
if priorities is not None:
def enqueue(t, jobid, size):
scheduler.enqueue(t, jobid, size, priorities[jobid])
else:
def enqueue(t, jobid, size):
scheduler.enqueue(t, jobid, size)
while events: # main loop
t, event_type, event_data = heappop(events)
delta = t - last_t
# update remaining sizes
for jobid, resources in schedule.items():
remaining[jobid] -= delta * resources
#assert remaining[jobid] > -eps
# process event (and call the scheduler)
if event_type == ARRIVAL:
jobid, size = event_data
remaining[jobid] = size
enqueue(t, jobid, size_estimation(size))
elif event_type == COMPLETE:
jobid = event_data
#assert -eps <= remaining[jobid] <= eps
yield t, jobid
del remaining[jobid]
scheduler.dequeue(t, jobid)
schedule = scheduler.schedule(t)
#assert sum(schedule.values()) < 1 + eps
#assert not remaining or sum(schedule.values()) > 1 - eps
#if (scheduler_factory.__name__ == 'PS'):
# assert set(schedule) == set(remaining)
# if a job would terminate before next event, insert the
# COMPLETE event
candidate_event = False
next_int = scheduler.next_internal_event()
if next_int is not None:
next_time = t + next_int
if (not events) or next_time < events[0][0]:
candidate_event = next_time, INTERNAL, None
if remaining:
completions = ((remaining[jobid] / resources, jobid)
for jobid, resources in schedule.items())
try:
next_delta, jobid = min(completions)
except ValueError: # no scheduled items
pass
else:
#if (scheduler_factory.__name__ == 'FSP'
# and size_estimation is identity):
# assert schedule == {jobid: 1}
next_complete = t + next_delta
if not events or events[0][0] > next_complete:
if not candidate_event or next_time > next_complete:
candidate_event = next_complete, COMPLETE, jobid
if candidate_event:
heappush(events, candidate_event)
last_t = t
assert not remaining