Skip to content

Commit

Permalink
Issue 194: updates to scheduler and testing of it (#205)
Browse files Browse the repository at this point in the history
* first steps in fix

Improved documentation so the next time this code is looked at or reviewed,
it can go faster. It works out since what it does is no longer in my head and
this documentation represents what I found difficult to decipher.

* make all test runnable independently

Having to wait for all the tests to run was slowing down development. Wrapped it into this branch because this work is required sooner rather than later.

* fixed previous test that caused the problem

Turns out that promotion engine tests set something that caused this test to fail. Implemented a tearDownClass() to undo what it set. Fixed the tests when they all run.
  • Loading branch information
al-niessner authored Jun 28, 2023
1 parent ab0c935 commit d6dbabe
Show file tree
Hide file tree
Showing 11 changed files with 408 additions and 45 deletions.
1 change: 1 addition & 0 deletions Python/dawgie/db/shelve/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import collections
import dawgie.db.lockview
import dawgie.context
import dawgie.security
import logging; log = logging.getLogger(__name__)
import os
import pickle
Expand Down
3 changes: 3 additions & 0 deletions Python/dawgie/pl/promotion.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def __call__ (self,
self.todo (original, rid, values)
elif any (arg_state): # error because its all or nothing
log.error ('Inconsistent arguments. Ignoring request.')
log.debug (' values: %s', str(values))
log.debug (' original: %s', str(original))
log.debug (' run ID: %s', str(rid))
else: self.do()

return self.more()
Expand Down
71 changes: 65 additions & 6 deletions Python/dawgie/pl/schedule.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,55 @@
'''
'''schedule jobs based on DAG
Theory of operation is: given a DAG and a change event, determine which work can be done and in what order.
Definitions:
DAG - directed graph normally built from source code as is done in Test/test_06
change event - is identified by the 'run ID' and can be caused by:
1. source code has changed
a. git commit on main
b. merge of a branch to main
2. parent node has generated new data
3. user has requested work be accomplished via command line or UI
work - running of a 'dawgie.Algorithm', 'dawgie.Analyzer', or 'dawgie.Regression'.
order - root of the DAG to its deepest leaf node. The level of the root is 0 and the deepest level is L. Available work at the lowest level should be done before work at higher levels.
Rules:
1. Given two 'dawgie.Algorithm' nodes A and B with B is dependant on data produced by A, then B can begin processing a target as soon as A finishes.
2. Given a 'dawgie.Algorithm' node A and a 'dawgie.Analyzer' node B where B is dependant on data produced by A, then B must wait for A to finish processing all targets before it can begin.
3. Given a 'dawgie.Analyzer' node A and a 'dawgie.Algorithm' node B where B is dependant on data produced by A, then B can begin processing as soon as A finishes.
4. Rule 1 is the same if either 'dawgie.Algorithm' node is replaced by 'dawgie.Regression'.
5. Rules 2 and 3 are the same if 'dawgie.Algorithm' is replaced by 'dawgie.Regression'.
6. Once a change event is detected, all subsequent changes are under the umbrella of the first change event (aka 'run ID').
Implementation:
The DAG is stored in a 'dawgie.pl.dag.Construct.at' where 'at' is specifically the algorithm tree (at) and it includes all work elements. Each node is of type 'xml.etree.ElementTree.Element' where the tag is the full name of the work element. The attributes of the node are used by the scheduler to maintain the scheduler queues. The attributes of the nodes are:
{'feedback': {<Element 'feedback.model' at 0x7fb80ec79d50>},
'shape': <Shape.ellipse: 0>,
'visitors': {'feedback.sensor.measured.voltage'},
'alg': <ae.feedback.bot.Sensor object at 0x7fb80e9e62f0>,
'do': set(),
'doing': set(),
'factory': <function task at 0x7fb838aa6290>,
'status': <State.initial: 5>,
'todo': set(),
'ancestry': set(),
'been_here': True,
'level': 0}
The important elements to the scheduler are do, doing, feedback, level, and todo. These allow the node to be added to 'per' and 'que' as necessary and keep track of what has been done and what has not. Processing a change event always starts by adding the information to the todo. When the event is processed with the DAG todo is completed, then the lowest level items are moved to do and doing - do is used by 'dawgie.farm' but setup in the scheduler.
Flow:
organize() -> put targets into 'todo' and mark the 'state' 'waiting' if it is not running.
next_job_batch() -> move 'todo' targets into 'doing' and 'do' for 'dawgie.pl.farm' where they are queued up for later processing and status changed to 'running'.
--
COPYRIGHT:
Copyright (c) 2015-2023, California Institute of Technology ("Caltech").
U.S. Government sponsorship acknowledged.
Expand Down Expand Up @@ -257,7 +308,13 @@ def next_job_batch():
todo.sort (key=lambda j:j.get ('level'))
return todo

def organize (task_names, runid=None, targets=None, event=None):
def organize (task_names:[str], runid:int=None, targets:[str]=None,
event:str=None):
'''organize a schedule based on the calling event and what is already known
Adds new targets to the todo list if it is not already there. If the job is
already 'running' for a target, then keep the status as 'running'.
'''
jobs = {j.tag:j for j in que}
targets = targets if targets else set()
log.debug ('organize() - looping over targets')
Expand All @@ -271,7 +328,7 @@ def organize (task_names, runid=None, targets=None, event=None):
if event: n.set ('event', event)
if _is_asp (n): n.get ('todo').add ('__all__')
elif '__all__' in targets:
n.get ('todo').update (set(dawgie.db.targets()))
n.get ('todo').update (dawgie.db.targets())
else: n.get ('todo').update (targets)
pass
pass
Expand Down Expand Up @@ -354,7 +411,7 @@ def update (values:[(str,bool)], original:dawgie.pl.dag.Node, rid:int):
pass
pass
organize (sorted (task_names), rid, targets, event)
promote (values, original, rid)
if rid: promote (values, original, rid)
else: log.error('Node %s for run ID %d did not update its state vector',
original.tag, rid)
return
Expand All @@ -379,8 +436,10 @@ def view_failure() -> [dict]: return err
def view_success() -> [dict]: return suc

def view_todo() -> [dict]:
wait = list(filter(lambda t:t.get ('status')in [State.waiting,
State.running], que))
wait = list(filter(lambda t:all([t.get ('status') in [State.waiting,
State.running],
len(t.get ('todo'))]), # prevents undefined
que))
wait.sort (key=lambda t:t.get ('level'))
return [{'name':w.tag,
'targets':sorted (list(w.get ('todo')))} for w in wait]
6 changes: 3 additions & 3 deletions Python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
bokeh>=1.2
boto3>=1.7.80
cryptography>=2.1.4
dawgie-pydot3==1.0.10
dawgie-pydot3==1.0.11
GitPython>=2.1.11
matplotlib>=2.1.1
psycopg>3.0.0
psycopg-binary>3.0.0
pyparsing>=2.2
pyparsing>=2.4.7
pyOpenSSL>=19.1.0
python-gnupg==0.4.4
python-gnupg==0.4.9
pyxb==1.2.6
requests>=2.20.0
transitions==0.6.8
Expand Down
43 changes: 43 additions & 0 deletions Test/mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
'''
COPYRIGHT:
Copyright (c) 2015-2023, California Institute of Technology ("Caltech").
U.S. Government sponsorship acknowledged.
All rights reserved.
LICENSE:
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
- Neither the name of Caltech nor its operating division, the Jet
Propulsion Laboratory, nor the names of its contributors may be used to
endorse or promote products derived from this software without specific prior
written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
NTR:
'''

import dawgie.context
import dawgie.pl.state

if 'fsm' not in dir(dawgie.context): dawgie.context.fsm = dawgie.pl.state.FSM()
1 change: 1 addition & 0 deletions Test/test_05.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
'''

import dawgie.pl.dag
import dawgie.pl.scan
import os
import sys
import unittest
Expand Down
1 change: 1 addition & 0 deletions Test/test_06.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import dawgie.context
import dawgie.pl.dag
import dawgie.pl.jobinfo
import dawgie.pl.scan
import dawgie.pl.schedule
import os
import sys
Expand Down
28 changes: 27 additions & 1 deletion Test/test_10.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@
NTR:
'''

import dawgie.context; dawgie.context.fsm = dawgie.pl.state.FSM()
import mock

import dawgie.context
import dawgie.pl.dag
import dawgie.pl.farm
import dawgie.pl.message
import dawgie.pl.schedule
import os
import shutil
import tempfile
import unittest

class Farm(unittest.TestCase):
Expand All @@ -50,6 +55,25 @@ def loseConnection(): return
@staticmethod
def write (b:bytes): return

@classmethod
def setUpClass(cls):
cls.root = tempfile.mkdtemp()
os.mkdir (os.path.join (cls.root, 'db'))
os.mkdir (os.path.join (cls.root, 'dbs'))
os.mkdir (os.path.join (cls.root, 'logs'))
os.mkdir (os.path.join (cls.root, 'stg'))
dawgie.context.db_impl = 'shelve'
dawgie.context.db_name = 'testspace'
dawgie.context.db_path = os.path.join (cls.root, 'db')
dawgie.context.data_dbs = os.path.join (cls.root, 'dbs')
dawgie.context.data_log = os.path.join (cls.root, 'logs')
dawgie.context.data_stg = os.path.join (cls.root, 'stg')
return
@classmethod
def tearDownClass(cls):
shutil.rmtree (cls.root, True)
return

def test_hand__process(self):
dawgie.context.git_rev = 321
hand = dawgie.pl.farm.Hand(('localhost',666))
Expand Down Expand Up @@ -99,6 +123,7 @@ def test_hand__res(self):
return

def test_rerunid (self):
dawgie.db.open()
n = dawgie.pl.dag.Node('a')
r = dawgie.pl.farm.rerunid (n)
self.assertGreater (r, 0)
Expand All @@ -108,6 +133,7 @@ def test_rerunid (self):
n.set ('runid', 0)
r = dawgie.pl.farm.rerunid (n)
self.assertEqual (0, r)
dawgie.db.close()
return

def test_something_to_do(self):
Expand Down
8 changes: 8 additions & 0 deletions Test/test_12.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,18 @@ def sv_as_dict(self): return {'d':SVMock({'e':VMock()}),
class PromotionEngine(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.allow_promotion = dawgie.context.allow_promotion
cls.db_impl = dawgie.context.db_impl
cls.promote = dawgie.pl.promotion.Engine()
dawgie.context.allow_promotion = True
dawgie.context.db_impl = 'test'
return
@classmethod
def tearDownClass(cls):
cls.promote = dawgie.pl.promotion.Engine()
dawgie.context.allow_promotion = cls.allow_promotion
dawgie.context.db_impl = cls.db_impl
return

def mock_consistent (self, inputs:[dawgie.db.REF], outputs:[dawgie.db.REF],
target_name:str)->():
Expand Down
35 changes: 0 additions & 35 deletions Test/test_13.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,57 +60,22 @@ def setup(cls):
for tsk,alg,sv,vn,v in dawgie.db.testdata.KNOWNS:
dawgie.db.update (tsk, alg, sv, vn, v)
pass
print ('knowns:')
print (' alg:', len (dawgie.db.shelve.state.DBI().tables.alg))
print (' prime:', len (dawgie.db.shelve.state.DBI().tables.prime))
print (' state:', len (dawgie.db.shelve.state.DBI().tables.state))
print (' target:', len (dawgie.db.shelve.state.DBI().tables.target))
print (' task:', len (dawgie.db.shelve.state.DBI().tables.task))
print (' value:', len (dawgie.db.shelve.state.DBI().tables.value))
for tgt,tsk,alg in dawgie.db.testdata.DATASETS:
dawgie.db.connect (alg, tsk, tgt).update()
pass
print ('datasets:')
print (' alg:', len (dawgie.db.shelve.state.DBI().tables.alg))
print (' prime:', len (dawgie.db.shelve.state.DBI().tables.prime))
print (' state:', len (dawgie.db.shelve.state.DBI().tables.state))
print (' target:', len (dawgie.db.shelve.state.DBI().tables.target))
print (' task:', len (dawgie.db.shelve.state.DBI().tables.task))
print (' value:', len (dawgie.db.shelve.state.DBI().tables.value))
for tsk,alg in dawgie.db.testdata.ASPECTS:
dawgie.db.gather (alg, tsk).ds().update()
pass
print ('aspects:')
print (' alg:', len (dawgie.db.shelve.state.DBI().tables.alg))
print (' prime:', len (dawgie.db.shelve.state.DBI().tables.prime))
print (' state:', len (dawgie.db.shelve.state.DBI().tables.state))
print (' target:', len (dawgie.db.shelve.state.DBI().tables.target))
print (' task:', len (dawgie.db.shelve.state.DBI().tables.task))
print (' value:', len (dawgie.db.shelve.state.DBI().tables.value))
for tsk,alg in dawgie.db.testdata.TIMELINES:
dawgie.db.retreat (alg, tsk).ds().update()
pass
print ('timelines:')
print (' alg:', len (dawgie.db.shelve.state.DBI().tables.alg))
print (' prime:', len (dawgie.db.shelve.state.DBI().tables.prime))
print (' state:', len (dawgie.db.shelve.state.DBI().tables.state))
print (' target:', len (dawgie.db.shelve.state.DBI().tables.target))
print (' task:', len (dawgie.db.shelve.state.DBI().tables.task))
print (' value:', len (dawgie.db.shelve.state.DBI().tables.value))
dawgie.db.close()
return

def test__prime_keys(self):
dawgie.db.close()
self.assertRaises (RuntimeError, dawgie.db._prime_keys)
dawgie.db.open()
print ('PKs:')
print (' alg:', len (dawgie.db.shelve.state.DBI().tables.alg))
print (' prime:', len (dawgie.db.shelve.state.DBI().tables.prime))
print (' state:', len (dawgie.db.shelve.state.DBI().tables.state))
print (' target:', len (dawgie.db.shelve.state.DBI().tables.target))
print (' task:', len (dawgie.db.shelve.state.DBI().tables.task))
print (' value:', len (dawgie.db.shelve.state.DBI().tables.value))
keys = dawgie.db._prime_keys()
self.assertEqual ((dawgie.db.testdata.TSK_CNT *
dawgie.db.testdata.SVN_CNT *
Expand Down
Loading

0 comments on commit d6dbabe

Please sign in to comment.