Skip to content

Commit

Permalink
Publish experiment config from taskcluster training task (group_logs) (
Browse files Browse the repository at this point in the history
…#602)

* Configure evaluation tasks

* Extract w&b code into module

* Do not check taskcluwter when publication is disabled

* Publish evaluation metrics to W&B

* Fix running eval tracking on CI

* Use args.wandb_run_name instead of default teacher

* Remove duplicated arguments

* Retrieve dataset from Taskcluster directly

* Add missing calls to publisher and logging

* Allow publishing metrics as a table on existing runs (i.e. previous trainings)

* Update regex to parse labels ending with '-1'

* Generic support for train/eval different naming

* Update tests

* Support disabled publication

* Publish group_logs from taskcluster

* Update tests

* Refactor group_log publication between online and offline taskcluster

* Restore missing input-file argument

* Rebase and fixes

* TRASHME test parameters to trigger train in CI

* Fix metrics_tasks default value

* Fix import

* Run linter

* Publish config first

* Revert "TRASHME test parameters to trigger train in CI"

This reverts commit ede4245.

---------

Co-authored-by: Bastien Abadie <[email protected]>
Co-authored-by: Bastien Abadie <[email protected]>
Co-authored-by: Evgeny Pavlov <[email protected]>
  • Loading branch information
4 people authored May 23, 2024
1 parent 419ff93 commit c2a6e7f
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 49 deletions.
2 changes: 1 addition & 1 deletion pipeline/train/train.sh
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ then
PARSER=cat
else
echo "### Weight & Biases publication is available."
PARSER="parse_tc_logs --from-stream -v"
PARSER="parse_tc_logs --from-stream --publish-group-logs -v"
fi

echo "### Training ${model_dir}"
Expand Down
4 changes: 4 additions & 0 deletions tests/test_tracking_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def samples_dir():
"unittest",
],
taskcluster_secret=None,
publish_group_logs=False,
),
)
@patch("translations_parser.publishers.wandb")
Expand Down Expand Up @@ -254,6 +255,7 @@ def test_experiments_marian_1_12(wandb_mock, getargs_mock, caplog, samples_dir,
"unittest",
],
taskcluster_secret=None,
publish_group_logs=False,
),
)
@patch("translations_parser.publishers.wandb")
Expand Down Expand Up @@ -295,6 +297,7 @@ def test_taskcluster_wandb_initialization_failure(
"unittest",
],
taskcluster_secret=None,
publish_group_logs=False,
),
)
@patch("translations_parser.publishers.wandb")
Expand Down Expand Up @@ -339,6 +342,7 @@ def test_taskcluster_wandb_log_failures(wandb_mock, getargs_mock, caplog, sample
"unittest",
],
taskcluster_secret=None,
publish_group_logs=False,
),
)
@patch("translations_parser.publishers.wandb")
Expand Down
26 changes: 25 additions & 1 deletion tracking/translations_parser/cli/taskcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
from io import TextIOWrapper
from pathlib import Path

import taskcluster
from translations_parser.parser import TrainingParser, logger
from translations_parser.publishers import CSVExport, Publisher
from translations_parser.utils import taskcluster_log_filter
from translations_parser.utils import publish_group_logs_from_tasks, taskcluster_log_filter
from translations_parser.wandb import add_wandb_arguments, get_wandb_publisher

queue = taskcluster.Queue({"rootUrl": "https://firefox-ci-tc.services.mozilla.com"})


def get_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -59,6 +62,12 @@ def get_args() -> argparse.Namespace:
dest="loglevel",
const=logging.DEBUG,
)
parser.add_argument(
"--publish-group-logs",
help=("Enable publishing a group_logs fake run with the experiment configuration."),
action="store_true",
default=False,
)

# Extend parser with Weight & Biases CLI args
add_wandb_arguments(parser)
Expand Down Expand Up @@ -100,6 +109,21 @@ def boot() -> None:
# Use log filtering when using non-stream (for uploading past experiments)
log_filter = taskcluster_log_filter if not args.from_stream else None

# publish the config fist
if args.publish_group_logs:
logger.info("Publishing experiment config to a 'group_logs' fake run.")
# Retrieve experiment configuration from the task group
task_id = os.environ.get("TASK_ID")
if not task_id:
raise Exception("Group logs publication can only run in taskcluster")
task = queue.task(task_id)
group_id = task["taskGroupId"]
# Ensure task group is readable
queue.getTaskGroup(group_id)
task_group = queue.task(group_id)
config = task_group.get("extra", {}).get("action", {}).get("context", {}).get("input")
publish_group_logs_from_tasks(config=config)

parser = TrainingParser(
lines,
publishers=publishers,
Expand Down
61 changes: 14 additions & 47 deletions tracking/translations_parser/cli/taskcluster_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,24 @@

import argparse
import logging
import re
import tempfile
from collections import defaultdict
from pathlib import Path

import wandb
import yaml

import taskcluster
from taskcluster.download import downloadArtifactToBuf, downloadArtifactToFile
from taskcluster.download import downloadArtifactToBuf
from translations_parser.data import Metric
from translations_parser.parser import TrainingParser, logger
from translations_parser.publishers import WandB
from translations_parser.utils import build_task_name, parse_task_label
from translations_parser.utils import (
MULTIPLE_TRAIN_SUFFIX,
build_task_name,
parse_task_label,
publish_group_logs_from_tasks,
)

MULTIPLE_TRAIN_SUFFIX = re.compile(r"(-\d+)/\d+$")
KIND_TAG_TARGET = ("train", "finetune")
queue = taskcluster.Queue({"rootUrl": "https://firefox-ci-tc.services.mozilla.com"})

Expand Down Expand Up @@ -145,9 +147,7 @@ def list_training_tasks(group_id: str, grouped_tasks: dict[str, list[dict]]) ->
return training_tasks


def list_metrics_tasks(
group_id: str, grouped_tasks: dict[str, list[dict]]
) -> list[dict[str, dict]]:
def list_metrics_tasks(group_id: str, grouped_tasks: dict[str, list[dict]]) -> dict[str, dict]:
metrics_tasks = {task["status"]["taskId"]: task for task in grouped_tasks["evaluate"]}

if not metrics_tasks:
Expand Down Expand Up @@ -256,45 +256,12 @@ def publish_task_group(group_id: str, override: bool = False) -> None:
)

# Group and publish remaining metrics tasks via the logs publication
with tempfile.TemporaryDirectory() as temp_dir:
logs_folder = Path(temp_dir) / "logs"
metrics_folder = logs_folder / project_name / group_name / "metrics"
metrics_folder.mkdir(parents=True, exist_ok=True)

for metric_task_id, metrics_task in metrics_tasks.items():
filename = metrics_task["task"]["tags"]["label"]
if re_match := MULTIPLE_TRAIN_SUFFIX.search(filename):
(suffix,) = re_match.groups()
filename = MULTIPLE_TRAIN_SUFFIX.sub(suffix, filename)

metric_artifact = next(
(
artifact["name"]
for artifact in queue.listLatestArtifacts(metric_task_id)["artifacts"]
if artifact["name"].endswith(".metrics")
),
None,
)
if metric_artifact is None:
logger.error(f"No .metric artifact found for task {metric_task_id}, skipping.")
continue
with (metrics_folder / f"{filename}.metrics").open("wb") as log_file:
downloadArtifactToFile(
log_file,
taskId=metrics_task["status"]["taskId"],
name=metric_artifact,
queueService=queue,
)

# Dump experiment config so it is published on group_logs
config_path = Path(temp_dir) / "experiments" / project_name / group_name / "config.yml"
config_path.parent.mkdir(parents=True, exist_ok=True)

with config_path.open("w") as config_file:
yaml.dump(config, config_file)

parents = str(logs_folder.resolve()).strip().split("/")
WandB.publish_group_logs(parents, project_name, group_name, existing_runs=[])
publish_group_logs_from_tasks(
project=project_name,
group0=group_name,
metrics_tasks=metrics_tasks,
config=config,
)


def list_dependent_group_ids(task_id: str, known: set[str]):
Expand Down
74 changes: 74 additions & 0 deletions tracking/translations_parser/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import logging
import os
import re
import tempfile
from collections.abc import Sequence
from datetime import datetime
from pathlib import Path
from typing import NamedTuple, Optional

import yaml

import taskcluster
from taskcluster.download import downloadArtifactToFile

logger = logging.getLogger(__name__)

Expand All @@ -15,6 +20,7 @@
# Tags usually ends with project (e.g. `en-nl` or `eng-nld`)
TAG_PROJECT_SUFFIX_REGEX = re.compile(r"((-\w{2}){2}|(-\w{3}){2})$")

MULTIPLE_TRAIN_SUFFIX = re.compile(r"(-\d+)/\d+$")

# This regex needs to work on historic runs as well as the current tasks.
TRAIN_LABEL_REGEX = re.compile(
Expand Down Expand Up @@ -100,6 +106,8 @@
r"$"
)

queue = taskcluster.Queue({"rootUrl": "https://firefox-ci-tc.services.mozilla.com"})


class ParsedTaskLabel(NamedTuple):
model: str
Expand Down Expand Up @@ -187,3 +195,69 @@ def metric_from_tc_context(chrf: float, bleu: float, comet: float):
bleu_detok=bleu,
comet=comet,
)


def publish_group_logs_from_tasks(
project: str | None = None,
group: str | None = None,
metrics_tasks: dict[str, dict] = {},
config: dict = {},
):
"""
Publish a fake run, named 'group_logs' to Weight & Biases from a Taskcluster context.
In case project or group is left to None, both values will be detected from Taskcluster.
`metrics_tasks` optionally contains finished evaluation tasks that will be published as new runs.
"""
from translations_parser.publishers import WandB
from translations_parser.wandb import get_wandb_names

message = "Handling group_logs publication"
if metrics_tasks:
message += f" with {len(metrics_tasks)} extra evaluation tasks"
logger.info(message)

if project is None or group is None:
logger.info("Retrieving W&B names from taskcluster attributes")
project, group, _ = get_wandb_names()

with tempfile.TemporaryDirectory() as temp_dir:
logs_folder = Path(temp_dir) / "logs"
metrics_folder = logs_folder / project / group / "metrics"
metrics_folder.mkdir(parents=True, exist_ok=True)

# Group and publish remaining metrics tasks via the logs publication
for metric_task_id, metrics_task in metrics_tasks.items():
filename = metrics_task["task"]["tags"]["label"]
if re_match := MULTIPLE_TRAIN_SUFFIX.search(filename):
(suffix,) = re_match.groups()
filename = MULTIPLE_TRAIN_SUFFIX.sub(suffix, filename)

metric_artifact = next(
(
artifact["name"]
for artifact in queue.listLatestArtifacts(metric_task_id)["artifacts"]
if artifact["name"].endswith(".metrics")
),
None,
)
if metric_artifact is None:
logger.error(f"No .metric artifact found for task {metric_task_id}, skipping.")
continue

with (metrics_folder / f"{filename}.metrics").open("wb") as log_file:
downloadArtifactToFile(
log_file,
taskId=metrics_task["status"]["taskId"],
name=metric_artifact,
queueService=queue,
)

# Dump experiment config so it is published on group_logs
config_path = Path(temp_dir) / "experiments" / project / group / "config.yml"
config_path.parent.mkdir(parents=True, exist_ok=True)

with config_path.open("w") as config_file:
yaml.dump(config, config_file)

parents = str(logs_folder.resolve()).strip().split("/")
WandB.publish_group_logs(parents, project, group, existing_runs=[])

0 comments on commit c2a6e7f

Please sign in to comment.