Skip to content

Commit

Permalink
Merge pull request #170 from rayosborn/expire-stale-locks
Browse files Browse the repository at this point in the history
Set time to expire stale locks
  • Loading branch information
rayosborn authored Feb 18, 2022
2 parents f6a90d3 + 5b86d70 commit edfd4cf
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 15 deletions.
36 changes: 28 additions & 8 deletions src/nexusformat/nexus/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class NXLock(object):
File descriptor of the opened lock file.
"""

def __init__(self, filename, timeout=60, check_interval=1):
def __init__(self, filename, timeout=60, check_interval=1, expiry=28800):
"""Create a lock to prevent file access.
This creates a lock, which can be later acquired and released. It
Expand All @@ -51,11 +51,15 @@ def __init__(self, filename, timeout=60, check_interval=1):
check_interval : int, optional
Number of seconds between attempts to acquire the lock,
by default 1.
expiry : int, optional
Number of seconds after which a lock expires, by default 8*3600.
Set to 0 or None to make the locks persist indefinitely.
"""
self.filename = os.path.realpath(filename)
self.lock_file = self.filename+'.lock'
self.timeout = timeout
self.check_interval = check_interval
self.expiry = expiry
self.pid = os.getpid()
self.fd = None

Expand All @@ -64,7 +68,7 @@ def __repr__(self):
+ os.path.basename(self.filename)
+ "', pid=" + str(self.pid)+")")

def acquire(self, timeout=None, check_interval=None):
def acquire(self, timeout=None, check_interval=None, expiry=None):
"""Acquire the lock.
Parameters
Expand All @@ -75,6 +79,9 @@ def acquire(self, timeout=None, check_interval=None):
check_interval : int, optional
Number of seconds between attempts to acquire the lock,
by default `self.check_interval`.
expiry : int, optional
Number of seconds after which a lock expires, by default
`self.expiry`.
Raises
------
Expand All @@ -90,6 +97,9 @@ def acquire(self, timeout=None, check_interval=None):
if check_interval is None:
check_interval = self.check_interval

if expiry is None:
expiry = self.expiry

timeoutend = timeit.default_timer() + timeout
initial_attempt = True
while timeoutend > timeit.default_timer():
Expand All @@ -105,13 +115,14 @@ def acquire(self, timeout=None, check_interval=None):
if e.errno != errno.EEXIST:
raise
# Remove the lockfile if it is older than one day
elif initial_attempt:
if self.is_stale():
os.remove(self.lock_file)
elif initial_attempt and expiry:
if self.is_stale(expiry=expiry):
self.clear()
initial_attempt = False
time.sleep(check_interval)
# Raise an error if we had to wait for too long
else:
self.fd = None
raise NXLockException(
f"'{self.filename}' is currently locked by an external process"
)
Expand Down Expand Up @@ -190,9 +201,18 @@ def wait(self, timeout=None, check_interval=None):
"by an external process")
return

def is_stale(self):
"""Return True if the lock file is older than one day."""
return ((time.time() - os.path.getmtime(self.lock_file)) > 86400)
def is_stale(self, expiry=None):
"""Return True if the lock file is older than one day.
If the lock file has been cleared before this check, the
function returns False to enable another attempt to acquire it.
"""
if expiry is None:
expiry = self.expiry
try:
return ((time.time() - os.path.getmtime(self.lock_file)) > expiry)
except FileNotFoundError:
return False

def __enter__(self):
return self.acquire()
Expand Down
51 changes: 46 additions & 5 deletions src/nexusformat/nexus/tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@
'NXvirtualfield', 'NXlink', 'NXlinkfield', 'NXlinkgroup',
'NeXusError',
'nxgetcompression', 'nxsetcompression',
'nxgetencoding', 'nxsetencoding', 'nxgetlock', 'nxsetlock',
'nxgetencoding', 'nxsetencoding',
'nxgetlock', 'nxsetlock', 'nxgetlockexpiry', 'nxsetlockexpiry',
'nxgetmaxsize', 'nxsetmaxsize', 'nxgetmemory', 'nxsetmemory',
'nxgetrecursive', 'nxsetrecursive',
'nxclasses', 'nxload', 'nxsave', 'nxduplicate', 'nxdir',
Expand All @@ -213,6 +214,7 @@
NX_COMPRESSION = 'gzip'
NX_ENCODING = 'utf-8'
NX_LOCK = 0
NX_LOCKEXPIRY = 8 * 3600
NX_MAXSIZE = 10000
NX_MEMORY = 2000 # Memory in MB
NX_RECURSIVE = False
Expand Down Expand Up @@ -415,7 +417,8 @@ def __init__(self, name, mode='r', recursive=None, **kwargs):
self._file = None
self._filename = os.path.abspath(name)
self._filedir = os.path.dirname(self._filename)
self._lock = NXLock(self._filename, timeout=NX_LOCK)
self._lock = NXLock(self._filename, timeout=NX_LOCK,
expiry=NX_LOCKEXPIRY)
self._path = '/'
self._root = None
self._with_count = 0
Expand Down Expand Up @@ -562,7 +565,8 @@ def lock(self):
@lock.setter
def lock(self, value):
if self._lock is None:
self._lock = NXLock(self._filename, timeout=NX_LOCK)
self._lock = NXLock(self._filename, timeout=NX_LOCK,
expiry=NX_LOCKEXPIRY)
if value is False or value is None or value == 0:
self._lock.timeout = 0
else:
Expand All @@ -584,7 +588,8 @@ def locked(self):
def lock_file(self):
"""Return the name of the file used to establish the lock."""
if self._lock is None:
self._lock = NXLock(self._filename, timeout=NX_LOCK)
self._lock = NXLock(self._filename, timeout=NX_LOCK,
expiry=NX_LOCKEXPIRY)
return self._lock.lock_file

def acquire_lock(self, timeout=None):
Expand All @@ -597,7 +602,7 @@ def acquire_lock(self, timeout=None):
timeout : int, optional
Timeout for attempts to acquire the lock, by default None.
"""
if self.locked and self.is_locked():
if self.locked:
return
if self._lock is None:
if timeout is not None:
Expand Down Expand Up @@ -7153,6 +7158,42 @@ def setlock(value=10):
nxsetlock = setlock


def getlockexpiry():
"""Return the number of seconds before a file lock expires.
If the value is 0, the file lock persists indefinitely.
Returns
-------
int
Number of seconds before a file lock expires.
"""
return NX_LOCKEXPIRY


def setlockexpiry(value=28800):
"""Sets the lock file expiry.
This creates a file with `.lock` appended to the NeXus file name.
Parameters
----------
value : int, optional
Number of seconds before a lock file is considered stale,
by default 8*3600. If the value is set to 0, the file lock
persists indefinitely.
"""
global NX_LOCKEXPIRY
try:
NX_LOCKEXPIRY = int(value)
except ValueError:
raise NeXusError("Invalid value for file lock expiry")


nxgetlockexpiry = getlockexpiry
nxsetlockexpiry = setlockexpiry


def getmaxsize():
"""Return the default maximum size for arrays without using core memory."""
return NX_MAXSIZE
Expand Down
5 changes: 3 additions & 2 deletions tests/test_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest
from nexusformat.nexus.tree import (NeXusError, NXentry, NXLock, NXroot,
nxload, nxsetlock, text)
nxload, nxsetlock, nxsetlockexpiry, text)


def test_lock_creation(tmpdir, field4):
Expand Down Expand Up @@ -151,6 +151,7 @@ def test_nested_locks(tmpdir, field4):
def test_stale_locks(tmpdir):

nxsetlock(2)
nxsetlockexpiry(3600)
filename = os.path.join(tmpdir, "file1.nxs")
root = NXroot(NXentry())
root.save(filename, "w")
Expand All @@ -167,7 +168,7 @@ def test_stale_locks(tmpdir):
assert root.nxfile.is_locked()

stat = os.stat(root.nxfile.lock_file)
os.utime(root.nxfile.lock_file, (stat.st_atime, stat.st_mtime - 100000))
os.utime(root.nxfile.lock_file, (stat.st_atime, stat.st_mtime - 10000))

with root.nxfile:

Expand Down

0 comments on commit edfd4cf

Please sign in to comment.