From 6ab77215123cb1434ba33d10678e54e21df09023 Mon Sep 17 00:00:00 2001 From: al-niessner <1130658+al-niessner@users.noreply.github.com> Date: Tue, 22 Aug 2023 13:18:12 -0700 Subject: [PATCH] 214: problem with shelve comm link (#215) * cleaned up shelve Added a unit test for the work that could be done with mock items. Then did some debugging in-situ with a live pipeline and made other fixes that are missed with the mock items. --- Python/dawgie/db/shelve/comms.py | 6 ++++-- Python/dawgie/db/shelve/model.py | 21 ++++++++++++++------- Python/dawgie/db/shelve/util.py | 8 ++++---- Python/dawgie/security.py | 2 +- Test/test_07.py | 18 +++++++++++++++++- 5 files changed, 40 insertions(+), 15 deletions(-) diff --git a/Python/dawgie/db/shelve/comms.py b/Python/dawgie/db/shelve/comms.py index 5328ed81..2679d826 100644 --- a/Python/dawgie/db/shelve/comms.py +++ b/Python/dawgie/db/shelve/comms.py @@ -125,7 +125,8 @@ def open(): class Worker(twisted.internet.protocol.Protocol): def __init__ (self, address): twisted.internet.protocol.Protocol.__init__(self) - self.__buf = {'actual':len(struct.pack('>I',0)),'data':b'','expected':0} + self.__buf = {'actual':len(struct.pack('>I',0)), + 'data':b'','expected':None} # really is used so pylint: disable=unused-private-member self.__handshake = dawgie.security.TwistedWrapper(self, address) # really is used so pylint: enable=unused-private-member @@ -199,7 +200,8 @@ def do (self, request): elif request.func == Func.dbcopy: self._delay_copy (request.value) elif request.func == Func.get: - key = util.construct (**request.keyset) + if request.table == Table.prime: key = str(request.keyset) + else: key = util.construct (**request.keyset._asdict()) self._send (DBI().tables[request.table.value][key]) elif request.func == Func.release: log.debug("Inside worker: Release") diff --git a/Python/dawgie/db/shelve/model.py b/Python/dawgie/db/shelve/model.py index 7fbbef16..a4fb8a2b 100644 --- a/Python/dawgie/db/shelve/model.py +++ b/Python/dawgie/db/shelve/model.py @@ -72,13 +72,13 @@ def __refs2indices (self, refs:[(dawgie.SV_REF, dawgie.V_REF)])->{(int,int,int,i svn = vref.item.name() vn = vref.feat # pylint: disable=protected-access - tid = self._update_cmd (task, None, Table.task, None, None) + tid = self._update_cmd (task, None, Table.task, None, None)[1] aid = self._update_cmd (algn, tid, Table.alg, None, - vref.impl._get_ver()) + vref.impl._get_ver())[1] sid = self._update_cmd (svn, aid, Table.state, None, - vref.item._get_ver()) + vref.item._get_ver())[1] vid = self._update_cmd (vn, sid, Table.value, None, - vref.item[vn]._get_ver()) + vref.item[vn]._get_ver())[1] # pylint: enable=protected-access reftable[(tid,aid,sid,vid)] = ('.'.join([task,algn,svn]),vn) pass @@ -112,8 +112,9 @@ def _collect (self, refs:[(dawgie.SV_REF, dawgie.V_REF)])->None: if self.__span[tn][fsvn][vn][0] < pk[0]:\ self.__span[tn][fsvn][vn] = pk pass - for fsvn in self.__span.values(): - for vn,val in fsvn.items(): fsvn[vn] = self._get_prime (val) + for fsvns in self.__span.values(): + for vns in fsvns.values(): + for vn,val in vns.items(): vns[vn] = self._get_prime (val) pass return @@ -181,7 +182,13 @@ def _load (self, algref=None, err=True, ver=None, lok=None): spks = list(filter (lambda k,K=pk:k[1:] == K[1:], pks)) spks.sort (key=lambda t:t[0]) - pk = spks[-1] + + if spks: pk = spks[-1] + else: + log.warning('No matches for: %s.%s.%s.%s.%s', + self._tn(), self._task(), + self._alg(), sv.name(), vn) + continue pass sv[vn] = self._get_prime (pk) diff --git a/Python/dawgie/db/shelve/util.py b/Python/dawgie/db/shelve/util.py index dd698b3b..2b19266d 100644 --- a/Python/dawgie/db/shelve/util.py +++ b/Python/dawgie/db/shelve/util.py @@ -84,10 +84,6 @@ def dissect (name:str)->(int,str,dawgie.Version): else: ver = None return parent,name,ver -def prime_keys(prime_table): - # because shelve key must be a string, pylint: disable=eval-used - return [eval(k) for k in prime_table] - def indexed (table:{})->[]: return [t[0] for t in sorted (table.items(), key=lambda t:t[1])] @@ -100,6 +96,10 @@ def mkStgDir(): os.system(f'mkdir {tString}') return tString +def prime_keys(prime_table): + # because shelve key must be a string, pylint: disable=eval-used + return [eval(k) for k in prime_table] + def rotated_files(index=None): path = dawgie.context.db_rotate_path if index is None: diff --git a/Python/dawgie/security.py b/Python/dawgie/security.py index 8255d339..94c2a0ce 100755 --- a/Python/dawgie/security.py +++ b/Python/dawgie/security.py @@ -72,7 +72,7 @@ def __init__ (self, protocol, address): self.__phase = self._p1 self.__p = protocol - if 0 < dir (protocol).count ('dataReceived'): + if address and 0 < dir (protocol).count ('dataReceived'): self.__dr = getattr (protocol, 'dataReceived') setattr (protocol, 'dataReceived', self.process) pass diff --git a/Test/test_07.py b/Test/test_07.py index 327e7e4d..ec8d502e 100644 --- a/Test/test_07.py +++ b/Test/test_07.py @@ -42,7 +42,9 @@ import dawgie.db import dawgie.db.shelve.comms import os +import pickle import shutil +import struct import tempfile import unittest @@ -136,9 +138,23 @@ def test_issue_16(self): Even forcing __all__ into the search fails. ''' full_list = dawgie.db.targets (True) - print (full_list) self.assertTrue ('__all__' in full_list) return + + def test_issue_214(self): + '''pipelines using shelve fail to load data + + File "/usr/local/lib/python3.10/dist-packages/dawgie/db/shelve/comms.py", line 168, in dataReceived + request = pickle.loads (self.__buf['data'][:length]) + builtins.EOFError: Ran out of input + + try 1: give the function a good stream and see if it works + ''' + worker = dawgie.db.shelve.comms.Worker(None) + data = pickle.dumps ({'a':1,'b':2}) + data = struct.pack ('>I', len(data)) + data + self.assertRaises (AttributeError, worker.dataReceived, data) + return pass def mock_acquire (name): return True