Skip to content

Commit

Permalink
Merge pull request #541 from opensafely-core/job-status-ns
Browse files Browse the repository at this point in the history
fix: change JobStatus.timestamp to ns precision
  • Loading branch information
bloodearnest authored Jan 6, 2023
2 parents f14c761 + 1647d88 commit 0b16493
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 66 deletions.
33 changes: 12 additions & 21 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
JobStatus,
Privacy,
)
from jobrunner.lib import docker
from jobrunner.lib import datestr_to_ns_timestamp, docker
from jobrunner.lib.git import checkout_commit
from jobrunner.lib.path_utils import list_dir_with_ignore_patterns
from jobrunner.lib.string_utils import tabulate
Expand Down Expand Up @@ -58,14 +58,6 @@ def get_medium_privacy_workspace(workspace):
return None


def timestamp_from_iso(iso):
"""Attempt to convert iso formatted date to unix timestamp."""
try:
return int(datetime.fromisoformat(iso))
except Exception:
return None


def get_log_dir(job):
# Split log directory up by month to make things slightly more manageable
month_dir = datetime.date.today().strftime("%Y-%m")
Expand Down Expand Up @@ -209,27 +201,27 @@ def get_status(self, job, timeout=15):

if container is None: # container doesn't exist
# timestamp file presence means we have finished preparing
timestamp = volume_api.file_timestamp(job, TIMESTAMP_REFERENCE_FILE, 10)
timestamp_ns = volume_api.read_timestamp(job, TIMESTAMP_REFERENCE_FILE, 10)
# TODO: maybe log the case where the volume exists, but the
# timestamp file does not? It's not a problems as the loop should
# re-prepare it anyway.
if timestamp is None:
if timestamp_ns is None:
# we are Jon Snow
return JobStatus(ExecutorState.UNKNOWN)
else:
# we've finish preparing
return JobStatus(ExecutorState.PREPARED, timestamp=timestamp)
return JobStatus(ExecutorState.PREPARED, timestamp_ns=timestamp_ns)

if container["State"]["Running"]:
timestamp = timestamp_from_iso(container["State"]["StartedAt"])
return JobStatus(ExecutorState.EXECUTING, timestamp=timestamp)
timestamp_ns = datestr_to_ns_timestamp(container["State"]["StartedAt"])
return JobStatus(ExecutorState.EXECUTING, timestamp_ns=timestamp_ns)
elif job.id in RESULTS:
return JobStatus(
ExecutorState.FINALIZED, timestamp=RESULTS[job.id].timestamp
ExecutorState.FINALIZED, timestamp_ns=RESULTS[job.id].timestamp_ns
)
else: # container present but not running, i.e. finished
timestamp = timestamp_from_iso(container["State"]["FinishedAt"])
return JobStatus(ExecutorState.EXECUTED, timestamp=timestamp)
timestamp_ns = datestr_to_ns_timestamp(container["State"]["FinishedAt"])
return JobStatus(ExecutorState.EXECUTED, timestamp_ns=timestamp_ns)

def get_results(self, job):
if job.id not in RESULTS:
Expand Down Expand Up @@ -289,8 +281,8 @@ def prepare_job(job):
)
volume_api.copy_to_volume(job, workspace_dir / filename, filename)

# Hack: see `get_unmatched_outputs`
volume_api.touch_file(job, TIMESTAMP_REFERENCE_FILE)
# Used to record state for telemetry, and also see `get_unmatched_outputs`
volume_api.write_timestamp(job, TIMESTAMP_REFERENCE_FILE)


def finalize_job(job):
Expand Down Expand Up @@ -327,14 +319,13 @@ def finalize_job(job):
exit_code=container_metadata["State"]["ExitCode"],
image_id=container_metadata["Image"],
message=message,
timestamp=int(time.time()),
timestamp_ns=time.time_ns(),
action_version=labels.get("org.opencontainers.image.version", "unknown"),
action_revision=labels.get("org.opencontainers.image.revision", "unknown"),
action_created=labels.get("org.opencontainers.image.created", "unknown"),
base_revision=labels.get("org.opensafely.base.vcs-ref", "unknown"),
base_created=labels.get("org.opencontainers.base.build-date", "unknown"),
)
job.completed_at = int(time.time())
job_metadata = get_job_metadata(job, outputs, container_metadata)
write_job_logs(job, job_metadata)
persist_outputs(job, results.outputs, job_metadata)
Expand Down
32 changes: 21 additions & 11 deletions jobrunner/executors/volumes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import importlib
import logging
import shutil
import tempfile
import time
from collections import defaultdict
from pathlib import Path

Expand Down Expand Up @@ -46,11 +48,14 @@ def copy_from_volume(job, src, dst, timeout=None):
def delete_volume(job):
docker.delete_volume(docker_volume_name(job))

def touch_file(job, path, timeout=None):
docker.touch_file(docker_volume_name(job), path, timeout)
def write_timestamp(job, path, timeout=None):
with tempfile.NamedTemporaryFile() as f:
p = Path(f.name)
p.write_text(str(time.time_ns()))
docker.copy_to_volume(docker_volume_name(job), p, path, timeout)

def file_timestamp(job, path, timeout=None):
return docker.file_timestamp(docker_volume_name(job), path, timeout)
def read_timestamp(job, path, timeout=None):
return docker.read_timestamp(docker_volume_name(job), path, timeout)

def glob_volume_files(job):
return docker.glob_volume_files(docker_volume_name(job), job.output_spec.keys())
Expand Down Expand Up @@ -108,18 +113,23 @@ def copy_from_volume(job, src, dst, timeout=None):
def delete_volume(job):
shutil.rmtree(host_volume_path(job), ignore_errors=True)

def touch_file(job, path, timeout=None):
(host_volume_path(job) / path).touch()
def write_timestamp(job, path, timeout=None):
(host_volume_path(job) / path).write_text(str(time.time_ns()))

def file_timestamp(job, path, timeout=None):
def read_timestamp(job, path, timeout=None):
abs_path = host_volume_path(job) / path
try:
stat = abs_path.stat()
contents = abs_path.read_text()
if contents:
return int(contents)
else:
# linux host filesystem provides untruncated timestamps
stat = abs_path.stat()
return int(stat.st_ctime * 1e9)

except Exception:
logger.exception("Failed to stat volume file {abs_path}")
logger.exception("Failed to read timestamp from volume file {abs_path}")
return None
else:
return stat.st_ctime

def glob_volume_files(job):
volume = host_volume_path(job)
Expand Down
7 changes: 5 additions & 2 deletions jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ class ExecutorState(Enum):
class JobStatus:
state: ExecutorState
message: Optional[str] = None
timestamp: int = None
timestamp_ns: int = (
None # timestamp this JobStatus occurred, in integer nanoseconds
)


@dataclass
Expand All @@ -60,7 +62,8 @@ class JobResults:
exit_code: int
image_id: str
message: str = None
timestamp: int = None # timestamp these results were finalized
# timestamp these results were finalized, in integer nanoseconds
timestamp_ns: int = None

# to be extracted from the image labels
action_version: str = "unknown"
Expand Down
39 changes: 39 additions & 0 deletions jobrunner/lib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import secrets
from contextlib import contextmanager
from datetime import datetime


@contextmanager
Expand All @@ -18,3 +19,41 @@ def atomic_writer(dest):
raise
else:
tmp.replace(dest)


def datestr_to_ns_timestamp(datestr):
"""Parses a datestring with nanoseconds in it into integer ns timestamp.
Stdlib datetime cannot natively parse nanoseconds, so we use it to parse
the date and handle timezones, and then handle the ns ourselves.
"""
# truncate to ms
iso = datestr[:26]

if datestr[26:29].isdigit():
# we have nanoseconds
ns = int(datestr[26:29])
tz = datestr[29:].strip()
else:
ns = 0
tz = datestr[26:].strip()

if tz:
# datetime.fromisoformat can't handle the Z in python < 3.11
if tz == "Z":
iso += "+00:00"
# it also requires a : for timezones before 3.11
elif ":" not in tz:
iso += tz[0:3] + ":" + tz[3:5]
else:
iso += tz

try:
ts = int(datetime.fromisoformat(iso).timestamp() * 1e9)
except ValueError:
return None

# re add the ns component
ts += ns

return ts
71 changes: 49 additions & 22 deletions jobrunner/lib/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import subprocess

from jobrunner import config
from jobrunner.lib import atomic_writer
from jobrunner.lib import atomic_writer, datestr_to_ns_timestamp
from jobrunner.lib.subprocess_utils import subprocess_run


Expand Down Expand Up @@ -181,47 +181,74 @@ def copy_to_volume(volume_name, source, dest, timeout=None):
)


def touch_file(volume_name, path, timeout=None):
docker(
[
"container",
"exec",
manager_name(volume_name),
"touch",
f"{VOLUME_MOUNT_POINT}/{path}",
],
check=True,
timeout=timeout,
)
def read_timestamp(volume_name, path, timeout=None):
container = manager_name(volume_name)
if not container_exists(container):
return None

try:
response = docker(
[
"container",
"exec",
container,
"cat",
f"{VOLUME_MOUNT_POINT}/{path}",
],
capture_output=True,
check=True,
text=True,
timeout=timeout,
)
except subprocess.CalledProcessError as exc:
# Must be file does not exist, as we've already checked for container
logger.debug(f"File {volume_name}:{path} does not exist:\n{exc.stderr}")
return None

output = response.stdout.strip()

if output:
try:
return int(output)
except ValueError:
# could not convert to integer
logger.debug(
f"Could not parse int from {volume_name}:{path}: {response.stdout.strip()}"
)

def file_timestamp(volume_name, path, timeout=None):
# either output was "" or we couldn't parse it as integer
# fallback to filesystem metadata, to support older volumes, and just be
# robust
try:
# use busybox's stat implementation
response = docker(
[
"container",
"exec",
manager_name(volume_name),
container,
"stat",
"-c",
"%X",
"%z",
f"{VOLUME_MOUNT_POINT}/{path}",
],
capture_output=True,
check=True,
text=True,
timeout=timeout,
)
except subprocess.CalledProcessError:
except subprocess.CalledProcessError as exc:
# either container or file didn't exist
logger.debug(f"Failed to stat volume file {volume_name}:{path}")
logger.debug(
f"Failed to stat file {volume_name}:{path} on container {container}:\n{exc.stderr}"
)
return None

datestr = response.stdout.strip()
try:
return int(response.stdout.strip())
except (ValueError, TypeError):
# could not convert to integer
return datestr_to_ns_timestamp(datestr)
except ValueError as exc:
logger.debug(
f"Failed to convert file time of {datestr} to integer nanoseconds: {exc}"
)
return None


Expand Down
32 changes: 28 additions & 4 deletions tests/lib/test_init.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import pytest

from jobrunner.lib import atomic_writer
from jobrunner import lib


def test_atomic_writer_success(tmp_path):
dst = tmp_path / "dst"
with atomic_writer(dst) as tmp:
with lib.atomic_writer(dst) as tmp:
tmp.write_text("dst")

assert dst.read_text() == "dst"
Expand All @@ -15,7 +15,7 @@ def test_atomic_writer_success(tmp_path):
def test_atomic_writer_failure(tmp_path):
dst = tmp_path / "dst"
with pytest.raises(Exception):
with atomic_writer(dst) as tmp:
with lib.atomic_writer(dst) as tmp:
tmp.write_text("dst")
raise Exception("test")

Expand All @@ -29,10 +29,34 @@ def test_atomic_writer_overwrite_symlink(tmp_path):
dst = tmp_path / "link"
dst.symlink_to(target)

with atomic_writer(dst) as tmp:
with lib.atomic_writer(dst) as tmp:
tmp.write_text("dst")

assert dst.read_text() == "dst"
assert not dst.is_symlink()
assert target.read_text() == "target"
assert not tmp.exists()


@pytest.mark.parametrize(
"datestr, expected",
[
# docker datestrs, with and without ns
("2022-01-01T12:34:56.123456Z", 1641040496123456000),
("2022-01-01T12:34:56.123456789Z", 1641040496123456789),
# busybox stat datestr, with and without ns
("2022-01-01 12:34:56.123456 +0000", 1641040496123456000),
("2022-01-01 12:34:56.123456789 +0000", 1641040496123456789),
# short date
("2022-01-01T12:34:56", 1641040496000000000),
# invalid
("not-a-timestamp", None),
# check tz maths, just in case
(
"2022-01-01 12:34:56.123456789+02:00",
1641040496123456789 - int(2 * 60 * 60 * 1e9),
),
],
)
def test_datestr_to_ns_timestamp(datestr, expected):
assert lib.datestr_to_ns_timestamp(datestr) == expected
Loading

0 comments on commit 0b16493

Please sign in to comment.