diff --git a/Python/dawgie/db/shelve/comms.py b/Python/dawgie/db/shelve/comms.py index 18c618ec..5328ed81 100644 --- a/Python/dawgie/db/shelve/comms.py +++ b/Python/dawgie/db/shelve/comms.py @@ -41,6 +41,7 @@ import collections import dawgie.db.lockview import dawgie.context +import dawgie.security import logging; log = logging.getLogger(__name__) import os import pickle diff --git a/Python/dawgie/pl/promotion.py b/Python/dawgie/pl/promotion.py index 359ed9f8..6f1151e2 100644 --- a/Python/dawgie/pl/promotion.py +++ b/Python/dawgie/pl/promotion.py @@ -53,6 +53,9 @@ def __call__ (self, self.todo (original, rid, values) elif any (arg_state): # error because its all or nothing log.error ('Inconsistent arguments. Ignoring request.') + log.debug (' values: %s', str(values)) + log.debug (' original: %s', str(original)) + log.debug (' run ID: %s', str(rid)) else: self.do() return self.more() diff --git a/Python/dawgie/pl/schedule.py b/Python/dawgie/pl/schedule.py index 05ad14ca..66a4922d 100644 --- a/Python/dawgie/pl/schedule.py +++ b/Python/dawgie/pl/schedule.py @@ -1,4 +1,55 @@ -''' +'''schedule jobs based on DAG + +Theory of operation is: given a DAG and a change event, determine which work can be done and in what order. + +Definitions: + DAG - directed graph normally built from source code as is done in Test/test_06 + change event - is identified by the 'run ID' and can be caused by: + 1. source code has changed + a. git commit on main + b. merge of a branch to main + 2. parent node has generated new data + 3. user has requested work be accomplished via command line or UI + work - running of a 'dawgie.Algorithm', 'dawgie.Analyzer', or 'dawgie.Regression'. + order - root of the DAG to its deepest leaf node. The level of the root is 0 and the deepest level is L. Available work at the lowest level should be done before work at higher levels. + +Rules: + 1. Given two 'dawgie.Algorithm' nodes A and B with B is dependant on data produced by A, then B can begin processing a target as soon as A finishes. + + 2. Given a 'dawgie.Algorithm' node A and a 'dawgie.Analyzer' node B where B is dependant on data produced by A, then B must wait for A to finish processing all targets before it can begin. + + 3. Given a 'dawgie.Analyzer' node A and a 'dawgie.Algorithm' node B where B is dependant on data produced by A, then B can begin processing as soon as A finishes. + + 4. Rule 1 is the same if either 'dawgie.Algorithm' node is replaced by 'dawgie.Regression'. + + 5. Rules 2 and 3 are the same if 'dawgie.Algorithm' is replaced by 'dawgie.Regression'. + + 6. Once a change event is detected, all subsequent changes are under the umbrella of the first change event (aka 'run ID'). + +Implementation: + +The DAG is stored in a 'dawgie.pl.dag.Construct.at' where 'at' is specifically the algorithm tree (at) and it includes all work elements. Each node is of type 'xml.etree.ElementTree.Element' where the tag is the full name of the work element. The attributes of the node are used by the scheduler to maintain the scheduler queues. The attributes of the nodes are: + {'feedback': {}, + 'shape': , + 'visitors': {'feedback.sensor.measured.voltage'}, + 'alg': , + 'do': set(), + 'doing': set(), + 'factory': , + 'status': , + 'todo': set(), + 'ancestry': set(), + 'been_here': True, + 'level': 0} + +The important elements to the scheduler are do, doing, feedback, level, and todo. These allow the node to be added to 'per' and 'que' as necessary and keep track of what has been done and what has not. Processing a change event always starts by adding the information to the todo. When the event is processed with the DAG todo is completed, then the lowest level items are moved to do and doing - do is used by 'dawgie.farm' but setup in the scheduler. + +Flow: + + organize() -> put targets into 'todo' and mark the 'state' 'waiting' if it is not running. + next_job_batch() -> move 'todo' targets into 'doing' and 'do' for 'dawgie.pl.farm' where they are queued up for later processing and status changed to 'running'. +-- + COPYRIGHT: Copyright (c) 2015-2023, California Institute of Technology ("Caltech"). U.S. Government sponsorship acknowledged. @@ -257,7 +308,13 @@ def next_job_batch(): todo.sort (key=lambda j:j.get ('level')) return todo -def organize (task_names, runid=None, targets=None, event=None): +def organize (task_names:[str], runid:int=None, targets:[str]=None, + event:str=None): + '''organize a schedule based on the calling event and what is already known + + Adds new targets to the todo list if it is not already there. If the job is + already 'running' for a target, then keep the status as 'running'. + ''' jobs = {j.tag:j for j in que} targets = targets if targets else set() log.debug ('organize() - looping over targets') @@ -271,7 +328,7 @@ def organize (task_names, runid=None, targets=None, event=None): if event: n.set ('event', event) if _is_asp (n): n.get ('todo').add ('__all__') elif '__all__' in targets: - n.get ('todo').update (set(dawgie.db.targets())) + n.get ('todo').update (dawgie.db.targets()) else: n.get ('todo').update (targets) pass pass @@ -354,7 +411,7 @@ def update (values:[(str,bool)], original:dawgie.pl.dag.Node, rid:int): pass pass organize (sorted (task_names), rid, targets, event) - promote (values, original, rid) + if rid: promote (values, original, rid) else: log.error('Node %s for run ID %d did not update its state vector', original.tag, rid) return @@ -379,8 +436,10 @@ def view_failure() -> [dict]: return err def view_success() -> [dict]: return suc def view_todo() -> [dict]: - wait = list(filter(lambda t:t.get ('status')in [State.waiting, - State.running], que)) + wait = list(filter(lambda t:all([t.get ('status') in [State.waiting, + State.running], + len(t.get ('todo'))]), # prevents undefined + que)) wait.sort (key=lambda t:t.get ('level')) return [{'name':w.tag, 'targets':sorted (list(w.get ('todo')))} for w in wait] diff --git a/Python/requirements.txt b/Python/requirements.txt index aa39d9b2..eee07b90 100644 --- a/Python/requirements.txt +++ b/Python/requirements.txt @@ -1,14 +1,14 @@ bokeh>=1.2 boto3>=1.7.80 cryptography>=2.1.4 -dawgie-pydot3==1.0.10 +dawgie-pydot3==1.0.11 GitPython>=2.1.11 matplotlib>=2.1.1 psycopg>3.0.0 psycopg-binary>3.0.0 -pyparsing>=2.2 +pyparsing>=2.4.7 pyOpenSSL>=19.1.0 -python-gnupg==0.4.4 +python-gnupg==0.4.9 pyxb==1.2.6 requests>=2.20.0 transitions==0.6.8 diff --git a/Test/mock.py b/Test/mock.py new file mode 100644 index 00000000..b08078e3 --- /dev/null +++ b/Test/mock.py @@ -0,0 +1,43 @@ +''' + +COPYRIGHT: +Copyright (c) 2015-2023, California Institute of Technology ("Caltech"). +U.S. Government sponsorship acknowledged. + +All rights reserved. + +LICENSE: +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +- Redistributions of source code must retain the above copyright notice, +this list of conditions and the following disclaimer. + +- Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. + +- Neither the name of Caltech nor its operating division, the Jet +Propulsion Laboratory, nor the names of its contributors may be used to +endorse or promote products derived from this software without specific prior +written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +NTR: +''' + +import dawgie.context +import dawgie.pl.state + +if 'fsm' not in dir(dawgie.context): dawgie.context.fsm = dawgie.pl.state.FSM() diff --git a/Test/test_05.py b/Test/test_05.py index 02bdeddc..d01a3245 100644 --- a/Test/test_05.py +++ b/Test/test_05.py @@ -38,6 +38,7 @@ ''' import dawgie.pl.dag +import dawgie.pl.scan import os import sys import unittest diff --git a/Test/test_06.py b/Test/test_06.py index f851742d..9f6c64b1 100644 --- a/Test/test_06.py +++ b/Test/test_06.py @@ -43,6 +43,7 @@ import dawgie.context import dawgie.pl.dag import dawgie.pl.jobinfo +import dawgie.pl.scan import dawgie.pl.schedule import os import sys diff --git a/Test/test_10.py b/Test/test_10.py index 2d7df5b8..496a4924 100644 --- a/Test/test_10.py +++ b/Test/test_10.py @@ -37,11 +37,16 @@ NTR: ''' -import dawgie.context; dawgie.context.fsm = dawgie.pl.state.FSM() +import mock + +import dawgie.context import dawgie.pl.dag import dawgie.pl.farm import dawgie.pl.message import dawgie.pl.schedule +import os +import shutil +import tempfile import unittest class Farm(unittest.TestCase): @@ -50,6 +55,25 @@ def loseConnection(): return @staticmethod def write (b:bytes): return + @classmethod + def setUpClass(cls): + cls.root = tempfile.mkdtemp() + os.mkdir (os.path.join (cls.root, 'db')) + os.mkdir (os.path.join (cls.root, 'dbs')) + os.mkdir (os.path.join (cls.root, 'logs')) + os.mkdir (os.path.join (cls.root, 'stg')) + dawgie.context.db_impl = 'shelve' + dawgie.context.db_name = 'testspace' + dawgie.context.db_path = os.path.join (cls.root, 'db') + dawgie.context.data_dbs = os.path.join (cls.root, 'dbs') + dawgie.context.data_log = os.path.join (cls.root, 'logs') + dawgie.context.data_stg = os.path.join (cls.root, 'stg') + return + @classmethod + def tearDownClass(cls): + shutil.rmtree (cls.root, True) + return + def test_hand__process(self): dawgie.context.git_rev = 321 hand = dawgie.pl.farm.Hand(('localhost',666)) @@ -99,6 +123,7 @@ def test_hand__res(self): return def test_rerunid (self): + dawgie.db.open() n = dawgie.pl.dag.Node('a') r = dawgie.pl.farm.rerunid (n) self.assertGreater (r, 0) @@ -108,6 +133,7 @@ def test_rerunid (self): n.set ('runid', 0) r = dawgie.pl.farm.rerunid (n) self.assertEqual (0, r) + dawgie.db.close() return def test_something_to_do(self): diff --git a/Test/test_12.py b/Test/test_12.py index be78d6a3..089fa9d7 100644 --- a/Test/test_12.py +++ b/Test/test_12.py @@ -67,10 +67,18 @@ def sv_as_dict(self): return {'d':SVMock({'e':VMock()}), class PromotionEngine(unittest.TestCase): @classmethod def setUpClass(cls): + cls.allow_promotion = dawgie.context.allow_promotion + cls.db_impl = dawgie.context.db_impl cls.promote = dawgie.pl.promotion.Engine() dawgie.context.allow_promotion = True dawgie.context.db_impl = 'test' return + @classmethod + def tearDownClass(cls): + cls.promote = dawgie.pl.promotion.Engine() + dawgie.context.allow_promotion = cls.allow_promotion + dawgie.context.db_impl = cls.db_impl + return def mock_consistent (self, inputs:[dawgie.db.REF], outputs:[dawgie.db.REF], target_name:str)->(): diff --git a/Test/test_13.py b/Test/test_13.py index 9839df6f..edd9f991 100644 --- a/Test/test_13.py +++ b/Test/test_13.py @@ -60,43 +60,15 @@ def setup(cls): for tsk,alg,sv,vn,v in dawgie.db.testdata.KNOWNS: dawgie.db.update (tsk, alg, sv, vn, v) pass - print ('knowns:') - print (' alg:', len (dawgie.db.shelve.state.DBI().tables.alg)) - print (' prime:', len (dawgie.db.shelve.state.DBI().tables.prime)) - print (' state:', len (dawgie.db.shelve.state.DBI().tables.state)) - print (' target:', len (dawgie.db.shelve.state.DBI().tables.target)) - print (' task:', len (dawgie.db.shelve.state.DBI().tables.task)) - print (' value:', len (dawgie.db.shelve.state.DBI().tables.value)) for tgt,tsk,alg in dawgie.db.testdata.DATASETS: dawgie.db.connect (alg, tsk, tgt).update() pass - print ('datasets:') - print (' alg:', len (dawgie.db.shelve.state.DBI().tables.alg)) - print (' prime:', len (dawgie.db.shelve.state.DBI().tables.prime)) - print (' state:', len (dawgie.db.shelve.state.DBI().tables.state)) - print (' target:', len (dawgie.db.shelve.state.DBI().tables.target)) - print (' task:', len (dawgie.db.shelve.state.DBI().tables.task)) - print (' value:', len (dawgie.db.shelve.state.DBI().tables.value)) for tsk,alg in dawgie.db.testdata.ASPECTS: dawgie.db.gather (alg, tsk).ds().update() pass - print ('aspects:') - print (' alg:', len (dawgie.db.shelve.state.DBI().tables.alg)) - print (' prime:', len (dawgie.db.shelve.state.DBI().tables.prime)) - print (' state:', len (dawgie.db.shelve.state.DBI().tables.state)) - print (' target:', len (dawgie.db.shelve.state.DBI().tables.target)) - print (' task:', len (dawgie.db.shelve.state.DBI().tables.task)) - print (' value:', len (dawgie.db.shelve.state.DBI().tables.value)) for tsk,alg in dawgie.db.testdata.TIMELINES: dawgie.db.retreat (alg, tsk).ds().update() pass - print ('timelines:') - print (' alg:', len (dawgie.db.shelve.state.DBI().tables.alg)) - print (' prime:', len (dawgie.db.shelve.state.DBI().tables.prime)) - print (' state:', len (dawgie.db.shelve.state.DBI().tables.state)) - print (' target:', len (dawgie.db.shelve.state.DBI().tables.target)) - print (' task:', len (dawgie.db.shelve.state.DBI().tables.task)) - print (' value:', len (dawgie.db.shelve.state.DBI().tables.value)) dawgie.db.close() return @@ -104,13 +76,6 @@ def test__prime_keys(self): dawgie.db.close() self.assertRaises (RuntimeError, dawgie.db._prime_keys) dawgie.db.open() - print ('PKs:') - print (' alg:', len (dawgie.db.shelve.state.DBI().tables.alg)) - print (' prime:', len (dawgie.db.shelve.state.DBI().tables.prime)) - print (' state:', len (dawgie.db.shelve.state.DBI().tables.state)) - print (' target:', len (dawgie.db.shelve.state.DBI().tables.target)) - print (' task:', len (dawgie.db.shelve.state.DBI().tables.task)) - print (' value:', len (dawgie.db.shelve.state.DBI().tables.value)) keys = dawgie.db._prime_keys() self.assertEqual ((dawgie.db.testdata.TSK_CNT * dawgie.db.testdata.SVN_CNT * diff --git a/Test/test_15.py b/Test/test_15.py new file mode 100644 index 00000000..ef41520a --- /dev/null +++ b/Test/test_15.py @@ -0,0 +1,256 @@ +''' + +COPYRIGHT: +Copyright (c) 2015-2023, California Institute of Technology ("Caltech"). +U.S. Government sponsorship acknowledged. + +All rights reserved. + +LICENSE: +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +- Redistributions of source code must retain the above copyright notice, +this list of conditions and the following disclaimer. + +- Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. + +- Neither the name of Caltech nor its operating division, the Jet +Propulsion Laboratory, nor the names of its contributors may be used to +endorse or promote products derived from this software without specific prior +written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +NTR: +''' + +import mock + +import dawgie +import dawgie.db +import dawgie.pl.schedule +import dawgie.util +import unittest + +class Analysis(dawgie.Analysis): + def __init__(self, name, ps_hint, runid, al): + dawgie.Analysis.__init__(self, name, ps_hint, runid) + self._al = al + pass + def list(self): return self._al + pass +class Regress(dawgie.Regress): + def __init__(self, name, ps_hint, target, rl): + dawgie.Regress.__init__(self, name, ps_hint, target) + self._rl = rl + pass + def list(self): return self._rl + pass +class StateVector(dawgie.StateVector): + def name(self): return 'sv' + def view (self, visitor): pass + pass +class Task(dawgie.Task): + def __init__(self, name, ps_hint, runid, target, tl): + dawgie.Task.__init__(self, name, ps_hint, runid, target) + self._tl = tl + pass + def list(self): return self._tl + pass +class Work(dawgie.Algorithm,dawgie.Analyzer,dawgie.Regression): + def __init__ (self, fb:[], name:str, prev:[]): + dawgie.Algorithm.__init__(self) + dawgie.Analyzer.__init__(self) + dawgie.Regression.__init__(self) + self._fb = fb + self._name = name + self._prev = prev + self._sv = StateVector() + self._sv['goat'] = dawgie.Value() + pass + def feedback(self): return self._fb + def name(self): return self._name + def previous(self): return self._prev + def run(self, *args, **kwds): pass + def state_vectors(self): return [self._sv] + def traits(self): return self._prev + def variables(self): return self._prev + pass + +class ScheduleRules(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls._targets_orig = getattr(dawgie.db, 'targets') + cls._task_name_orig = getattr(dawgie.util, 'task_name') + setattr (dawgie.db, 'targets', _mock_targets) + setattr (dawgie.util, 'task_name', _mock_task_name) + factories = {dawgie.Factories.analysis:[analysis], + dawgie.Factories.events:[events], + dawgie.Factories.regress:[regress], + dawgie.Factories.task:[task]} + dawgie.pl.schedule.build (factories, [{},{},{}], [{},{},{},{}]) + # use the print output to observe what the DAG for this test look like + print ('av:',dawgie.pl.schedule.ae.av.decode()) + return + + @classmethod + def tearDownClass(cls): + setattr (dawgie.db, 'targets', cls._targets_orig) + setattr (dawgie.util, 'task_name', cls._task_name_orig) + return + + def test_issue_194(self): + dawgie.pl.schedule.organize (['test_15.root'], + event='test for issue 194', + targets=['__all__']) + self.assertEqual (dawgie.pl.schedule.view_todo(), + [{'name': 'test_15.root', + 'targets': ['Target_0', 'Target_1', 'Target_2', + 'Target_3', 'Target_4', 'Target_5', + 'Target_6', 'Target_7', 'Target_8', + 'Target_9']}]) + jobs = dawgie.pl.schedule.next_job_batch() + self.assertEqual (len (jobs), 1) + self.assertEqual (jobs[0].get ('status'), + dawgie.pl.schedule.State.waiting) + jobs[0].set ('status', dawgie.pl.schedule.State.running) + jobs[0].get ('do').clear() + dawgie.pl.schedule.complete (jobs[0], 1, 'Target_3', {}, + dawgie.pl.schedule.State.success) + self.assertNotIn ('Target_3', jobs[0].get('doing')) + dawgie.pl.schedule.update ([('1.Target_3.test_15.root.sv.goat', True)], + jobs[0], 1) + jobs.extend (dawgie.pl.schedule.next_job_batch()) + self.assertEqual (len (jobs), 3) + jobs[1].set ('status', dawgie.pl.schedule.State.running) + jobs[1].get ('do').clear() + dawgie.pl.schedule.complete (jobs[1], 1, 'Target_3', {}, + dawgie.pl.schedule.State.success) + self.assertNotIn ('Target_3', jobs[1].get('doing')) + self.assertEqual (0, len (jobs[1].get('doing'))) + dawgie.pl.schedule.update ([('1.Target_3.test_15.A.sv.goat', True)], + jobs[1], 1) + jobs[2].set ('status', dawgie.pl.schedule.State.running) + jobs[2].get ('do').clear() + dawgie.pl.schedule.complete (jobs[2], 1, 'Target_3', {}, + dawgie.pl.schedule.State.success) + self.assertNotIn ('Target_3', jobs[2].get('doing')) + self.assertEqual (0, len (jobs[1].get('doing'))) + dawgie.pl.schedule.update ([('1.Target_3.test_15.B.sv.goat', True)], + jobs[2], 1) + _clean (jobs) + self.assertEqual (len (jobs), 1) + jobs.extend (dawgie.pl.schedule.next_job_batch()) + self.assertEqual (len (jobs), 2) + jobs[1].set ('status', dawgie.pl.schedule.State.running) + jobs[1].get ('do').clear() + dawgie.pl.schedule.complete (jobs[1], 1, 'Target_3', {}, + dawgie.pl.schedule.State.success) + self.assertNotIn ('Target_3', jobs[1].get('doing')) + self.assertEqual (0, len (jobs[1].get('doing'))) + dawgie.pl.schedule.update ([('1.Target_3.test_15.C.sv.goat', True)], + jobs[1], 1) + _clean (jobs) + self.assertEqual (len (jobs), 1) + jobs.extend (dawgie.pl.schedule.next_job_batch()) + self.assertEqual (len (jobs), 2) + jobs[1].set ('status', dawgie.pl.schedule.State.running) + jobs[1].get ('do').clear() + dawgie.pl.schedule.complete (jobs[1], 1, 'Target_3', {}, + dawgie.pl.schedule.State.success) + self.assertNotIn ('Target_3', jobs[1].get('doing')) + self.assertEqual (0, len (jobs[1].get('doing'))) + dawgie.pl.schedule.update ([('1.Target_3.test_15.A.sv.goat', False)], + jobs[1], 1) + _clean (jobs) + self.assertEqual (len (jobs), 1) + jobs.extend (dawgie.pl.schedule.next_job_batch()) + self.assertEqual (len (jobs), 2) + jobs[1].set ('status', dawgie.pl.schedule.State.running) + jobs[1].get ('do').clear() + dawgie.pl.schedule.complete (jobs[1], 1, 'Target_3', {}, + dawgie.pl.schedule.State.success) + self.assertNotIn ('Target_3', jobs[1].get('doing')) + self.assertEqual (0, len (jobs[1].get('doing'))) + dawgie.pl.schedule.update ([('1.Target_3.test_15.D.sv.goat', True)], + jobs[1], 1) + _clean (jobs) + self.assertEqual (len (jobs), 1) + jobs.extend (dawgie.pl.schedule.next_job_batch()) + self.assertEqual (len (jobs), 1) + for target in ['Target_0','Target_1','Target_2','Target_4','Target_5', + 'Target_6', 'Target_7', 'Target_8', 'Target_9']: + print ('target:', target) + dawgie.pl.schedule.complete (jobs[0], 1, target, {}, + dawgie.pl.schedule.State.success) + self.assertNotIn ('Target_3', jobs[0].get('doing')) + dawgie.pl.schedule.update ([(f'1.{target}.test_15.root.sv.goat', + False)], jobs[0], 1) + jobs.extend (dawgie.pl.schedule.next_job_batch()) + _clean (jobs) + self.assertEqual (len (jobs), 1) + pass + jobs[0].set ('status', dawgie.pl.schedule.State.running) + jobs[0].get ('do').clear() + dawgie.pl.schedule.complete (jobs[0], 1, '__all__', {}, + dawgie.pl.schedule.State.success) + self.assertNotIn ('__all__', jobs[0].get('doing')) + dawgie.pl.schedule.update ([('1.__all__.test_15.E.goat', True)], + jobs[0], 1) + jobs.extend (dawgie.pl.schedule.next_job_batch()) + _clean (jobs) + self.assertEqual (len (jobs), 0) + return + pass + +def _clean(jobs): + toBeRemoved = [] + for job in jobs: + inque = 0 + for qn in ['do', 'doing', 'todo']: inque += len(job.get (qn)) + if inque == 0: toBeRemoved.append (job) + pass + for job in toBeRemoved: jobs.remove (job) + return + +def _mock_targets(): return ['Target_' + c for c in '1234567890'] +def _mock_task_name(*args, **kwds): return 'test_15' +def analysis(prefix:str, ps_hint:int=0, runid:int=-1): + return Analysis(prefix, ps_hint, runid, _al) +def events(): + return [] +def regress(prefix:str, ps_hint:int=0, target:str='__none__'): + return Regress(prefix, ps_hint, target, _rl) +def task(prefix:str, ps_hint:int=0, runid:int=-1, target:str='__none__'): + return Task(prefix, ps_hint, runid, target, _tl) + +_al = [] +_fb = [] +_rl = [] +_tl = [] +_sv = StateVector() +_sv['goat'] = dawgie.Value() +_root = Work([], 'root', []) +_A = Work([], 'A', [dawgie.SV_REF(task, _root, _sv)]) +_B = Work(_fb, 'B', [dawgie.SV_REF(task, _root, _sv)]) +_C = Work([], 'C', [dawgie.SV_REF(task, _B, _sv)]) +_D = Work([], 'D', [dawgie.SV_REF(task, _C, _sv), + dawgie.SV_REF(task, _A, _sv)]) +_E = Work([], 'E', [dawgie.SV_REF(regress, _D, _sv)]) +_fb.append (dawgie.SV_REF(task, _C, _sv)) +_al.append (_E) +_rl.append (_D) +_tl.extend ([_root, _A, _B, _C])