diff --git a/python/nistoar/pdr/utils.py b/python/nistoar/pdr/utils.py index be95f9e61..f99f1a1df 100644 --- a/python/nistoar/pdr/utils.py +++ b/python/nistoar/pdr/utils.py @@ -2,7 +2,7 @@ Utility functions useful across the pdr package """ from collections import OrderedDict, Mapping -import hashlib, json, re, shutil, os, time, subprocess, logging +import hashlib, json, re, shutil, os, time, subprocess, logging, threading try: import fcntl except ImportError: @@ -27,6 +27,146 @@ def blab(log, msg, *args, **kwargs): """ log.log(BLAB, msg, *args, **kwargs) +class LockedFile(object): + """ + An object representing a file in a locked state. The file is locked against + simultaneous accesses across both threads and processes. + + The easiest way to use this class is via the with statement. For example, + to read a file with a shared lock (many reads, no writes): + .. code-block:: python + + with LockedFile(filename) as fd: + data = json.load(fd) + + And to write a file with an exclusive write (no other simultaneous reads + or writes): + .. code-block:: python + + with LockedFile(filename, 'w') as fd: + json.dump(data, fd) + + An example of its use without the with statement might be: + .. code-block:: python + + lkdfile = LockedFile(filename) + fd = lkdfile.open() + data = json.load(fd) + lkdfile.close() # do not call fd.close() !!! + + lkdfile.mode = 'w' + with lkdfile as fd: + json.dump(data, fd) + + """ + _thread_locks = {} + _class_lock = threading.RLock() + + class _ThreadLock(object): + _reader_count = 0 + def __init__(self): + self.ex_lock = threading.Lock() + self.sh_lock = threading.Lock() + def acquire_shared(self): + with self.ex_lock: + if not self._reader_count: + self.sh_lock.acquire() + self._reader_count += 1 + def release_shared(self): + with self.ex_lock: + if self._reader_count > 0: + self._reader_count -= 1 + if self._reader_count <= 0: + self.sh_lock.release() + def acquire_exclusive(self): + with self.sh_lock: + self.ex_lock.acquire() + def release_exclusive(self): + self.ex_lock.release() + + @classmethod + def _get_thread_lock_for(cls, filepath): + filepath = os.path.abspath(filepath) + with cls._class_lock: + if filepath not in cls._thread_locks: + cls._thread_locks[filepath] = cls._ThreadLock() + return cls._thread_locks[filepath] + + def __init__(self, filename, mode='r'): + self.mode = mode + self._fname = filename + self._thread_lock = self._get_thread_lock_for(filename) + self._writing = None + self._fo = None + + @property + def fo(self): + """ + the open file object or None if the file is not currently open + """ + return self._fo + + def _acquire_thread_lock(self): + if self._writing: + self._thread_lock.acquire_exclusive() + else: + self._thread_lock.acquire_shared() + def _release_thread_lock(self): + if self._writing: + self._thread_lock.release_exclusive() + else: + self._thread_lock.release_shared() + + def open(self, mode=None): + """ + Open the file so that it is appropriate locked. If mode is not + provided, the mode will be the value set when this object was + created. + """ + if self._fo: + raise StateException(self._fname+": file is already open") + if mode: + self.mode = mode + + self._writing = 'a' in self.mode or 'w' in self.mode or '+' in self.mode + self._acquire_thread_lock() + try: + self._fo = open(self._fname, self.mode) + except: + self._release_thread_lock() + if self._fo: + try: + self._fo.close() + except: + pass + self._fo = None + self._writing = None + + if fcntl: + lock_type = (self._writing and fcntl.LOCK_EX) or fcntl.LOCK_SH + fcntl.lockf(self.fo, lock_type) + return self.fo + + def close(self): + if not self._fo: + return + try: + self._fo.close() + finally: + self._fo = None + self._release_thread_lock() + self._writing = None + + def __enter__(self): + return self.open() + + def __exit__(self, e1, e2, e3): + return False + + def __del__(self): + if self._fo: + self.close() + def read_nerd(nerdfile): """ read the JSON-formatted NERDm metadata in the given file @@ -69,22 +209,11 @@ def read_json(jsonfile, nolock=False): the file contents :raise ValueError: if JSON format errors are detected. """ - with open(jsonfile) as fd: - if fcntl and not nolock: - fcntl.lockf(fd, fcntl.LOCK_SH) - blab(log, "Acquired shared lock for reading: "+jsonfile) - data = fd.read() + with LockedFile(jsonfile) as fd: + blab(log, "Acquired shared lock for reading: "+jsonfile) + out = json.load(fd, object_pairs_hook=OrderedDict) blab(log, "released SH") - if not data: - # this is an unfortunate hack multithreaded reading/writing - time.sleep(0.02) - with open(jsonfile) as fd: - if fcntl and not nolock: - fcntl.lockf(fd, fcntl.LOCK_SH) - blab(log, "(Re)Acquired shared lock for reading: "+jsonfile) - data = fd.read() - blab(log, "released SH") - return json.loads(data, object_pairs_hook=OrderedDict) + return out def write_json(jsdata, destfile, indent=4, nolock=False): """ @@ -99,14 +228,11 @@ def write_json(jsdata, destfile, indent=4, nolock=False): data without a lock """ try: - with open(destfile, 'a') as fd: - if fcntl and not nolock: - fcntl.lockf(fd, fcntl.LOCK_EX) - blab(log, "Acquired exclusive lock for writing: "+destfile) + with LockedFile(destfile, 'a') as fd: + blab(log, "Acquired exclusive lock for writing: "+destfile) fd.truncate(0) json.dump(jsdata, fd, indent=indent, separators=(',', ': ')) blab(log, "released EX") - except Exception, ex: raise StateException("{0}: Failed to write JSON data to file: {1}" .format(destfile, str(ex)), cause=ex) diff --git a/python/tests/nistoar/pdr/test_utils.py b/python/tests/nistoar/pdr/test_utils.py index c5fa699a1..14d45fd7e 100644 --- a/python/tests/nistoar/pdr/test_utils.py +++ b/python/tests/nistoar/pdr/test_utils.py @@ -1,4 +1,4 @@ -import os, sys, pdb, json, subprocess +import os, sys, pdb, json, subprocess, threading, time import unittest as test from nistoar.testing import * @@ -6,7 +6,8 @@ testdir = os.path.dirname(os.path.abspath(__file__)) testdatadir = os.path.join(testdir, 'data') -testdatadir2 = os.path.join(testdir, 'preserv', 'data', 'simplesip') +testdatadir3 = os.path.join(testdir, 'preserv', 'data') +testdatadir2 = os.path.join(testdatadir3, 'simplesip') class TestMimeTypeLoading(test.TestCase): @@ -134,7 +135,178 @@ def test_rmfile(self): utils.rmtree(top) self.assertTrue(os.path.exists(root)) self.assertFalse(os.path.exists(top)) + + +class TestLockedFile(test.TestCase): + + class OtherThread(threading.Thread): + def __init__(self, func, pause=0.05): + threading.Thread.__init__(self) + self.f = func + self.pause = pause + def run(self): + if self.f: + time.sleep(self.pause) + self.f('o') + + def lockedop(self, who, mode='r', sleep=0.5): + with utils.LockedFile(self.lfile, mode) as lockdfile: + self.rfd.write(who+'a') + time.sleep(sleep) + self.rfd.write(who+'r') + + def setUp(self): + self.tf = Tempfiles() + self.lfile = self.tf("test.txt") + self.rfile = self.tf("result.txt") + self.rfd = None + + def tearDown(self): + self.tf.clean() + + def test_shared_reads(self): + def f(who): + self.lockedop(who, 'r') + t = self.OtherThread(f) + with open(self.rfile,'w') as self.rfd: + t.start() + f('t') + t.join() + with open(self.rfile) as self.rfd: + data = self.rfd.read() + + self.assertEqual(data, "taoatror") + + def test_exclusive_writes1(self): + def f(who): + self.lockedop(who, 'w') + t = self.OtherThread(f) + with open(self.rfile,'w') as self.rfd: + t.start() + f('t') + t.join() + with open(self.rfile) as self.rfd: + data = self.rfd.read() + + self.assertEqual(data, "tatroaor") + + def test_exclusive_writes2(self): + def f(who): + self.lockedop(who, 'w') + t = self.OtherThread(f) + with open(self.rfile,'w') as self.rfd: + t.start() + self.lockedop('t', 'r') + t.join() + with open(self.rfile) as self.rfd: + data = self.rfd.read() + + self.assertEqual(data, "tatroaor") + + def test_exclusive_writes3(self): + def f(who): + self.lockedop(who, 'r') + t = self.OtherThread(f) + with open(self.rfile,'w') as self.rfd: + t.start() + self.lockedop('t', 'w') + t.join() + with open(self.rfile) as self.rfd: + data = self.rfd.read() + + self.assertEqual(data, "tatroaor") + +class TestJsonIO(test.TestCase): + # this class focuses on testing the locking of JSON file IO + + testdata = os.path.join(testdatadir3, + "3A1EE2F169DD3B8CE0531A570681DB5D1491.json") + + def setUp(self): + self.tf = Tempfiles() + self.jfile = self.tf("data.json") + + def tearDown(self): + self.tf.clean() + + class OtherThread(threading.Thread): + def __init__(self, func, pause=0.05): + threading.Thread.__init__(self) + self.f = func + self.pause = pause + def run(self): + if self.f: + time.sleep(self.pause) + self.f() + + def write_test_data(self): + with open(self.testdata) as fd: + data = json.load(fd) + + def test_writes(self): + # this is not a definitive test that the use of LockedFile is working + data = utils.read_json(self.testdata) + data['foo'] = 'bar' + def f(): + utils.write_json(data, self.jfile) + t = self.OtherThread(f) + + data2 = dict(data) + data2['foo'] = 'BAR' + t.start() + utils.write_json(data2, self.jfile) + t.join() + + # success in these two lines indicate that the file was not corrupted + data = utils.read_json(self.jfile) + self.assertIn('@id', data) + + # success in this test indicates that writing happened in the expected + # order; failure means that the test function is not test what we + # exected. + self.assertEqual(data['foo'], 'bar') + + def test_readwrite(self): + # this is not a definitive test that the use of LockedFile is working + data = utils.read_json(self.testdata) + with open(self.jfile,'w') as fd: + json.dump(data, fd) + data['foo'] = 'bar' + def f(): + utils.write_json(data, self.jfile) + t = self.OtherThread(f) + + t.start() + td = utils.read_json(self.jfile) + t.join() + + self.assertIn('@id', td) + self.assertNotIn('foo', td) + td = utils.read_json(self.jfile) + self.assertIn('@id', td) + self.assertEqual(td['foo'], 'bar') + + def test_writeread(self): + # this is not a definitive test that the use of LockedFile is working + data = utils.read_json(self.testdata) + with open(self.jfile,'w') as fd: + json.dump(data, fd) + data['foo'] = 'bar' + self.td = None + def f(): + self.td = utils.read_json(self.jfile) + t = self.OtherThread(f) + + t.start() + utils.write_json(data, self.jfile) + t.join() + + self.assertIn('@id', self.td) + self.assertEqual(self.td['foo'], 'bar') + + + if __name__ == '__main__': test.main()