From c2a6e7f8c899ba363c5058e200692bfd8e321299 Mon Sep 17 00:00:00 2001 From: Valentin Rigal Date: Thu, 23 May 2024 02:00:29 +0200 Subject: [PATCH] Publish experiment config from taskcluster training task (group_logs) (#602) * Configure evaluation tasks * Extract w&b code into module * Do not check taskcluwter when publication is disabled * Publish evaluation metrics to W&B * Fix running eval tracking on CI * Use args.wandb_run_name instead of default teacher * Remove duplicated arguments * Retrieve dataset from Taskcluster directly * Add missing calls to publisher and logging * Allow publishing metrics as a table on existing runs (i.e. previous trainings) * Update regex to parse labels ending with '-1' * Generic support for train/eval different naming * Update tests * Support disabled publication * Publish group_logs from taskcluster * Update tests * Refactor group_log publication between online and offline taskcluster * Restore missing input-file argument * Rebase and fixes * TRASHME test parameters to trigger train in CI * Fix metrics_tasks default value * Fix import * Run linter * Publish config first * Revert "TRASHME test parameters to trigger train in CI" This reverts commit ede424578665c0947b99281382aad9729077aff3. --------- Co-authored-by: Bastien Abadie Co-authored-by: Bastien Abadie Co-authored-by: Evgeny Pavlov --- pipeline/train/train.sh | 2 +- tests/test_tracking_cli.py | 4 + .../translations_parser/cli/taskcluster.py | 26 ++++++- .../cli/taskcluster_group.py | 61 ++++----------- tracking/translations_parser/utils.py | 74 +++++++++++++++++++ 5 files changed, 118 insertions(+), 49 deletions(-) diff --git a/pipeline/train/train.sh b/pipeline/train/train.sh index abe838982..215d5fe68 100755 --- a/pipeline/train/train.sh +++ b/pipeline/train/train.sh @@ -124,7 +124,7 @@ then PARSER=cat else echo "### Weight & Biases publication is available." - PARSER="parse_tc_logs --from-stream -v" + PARSER="parse_tc_logs --from-stream --publish-group-logs -v" fi echo "### Training ${model_dir}" diff --git a/tests/test_tracking_cli.py b/tests/test_tracking_cli.py index d8ce40d19..0fe804cc8 100644 --- a/tests/test_tracking_cli.py +++ b/tests/test_tracking_cli.py @@ -49,6 +49,7 @@ def samples_dir(): "unittest", ], taskcluster_secret=None, + publish_group_logs=False, ), ) @patch("translations_parser.publishers.wandb") @@ -254,6 +255,7 @@ def test_experiments_marian_1_12(wandb_mock, getargs_mock, caplog, samples_dir, "unittest", ], taskcluster_secret=None, + publish_group_logs=False, ), ) @patch("translations_parser.publishers.wandb") @@ -295,6 +297,7 @@ def test_taskcluster_wandb_initialization_failure( "unittest", ], taskcluster_secret=None, + publish_group_logs=False, ), ) @patch("translations_parser.publishers.wandb") @@ -339,6 +342,7 @@ def test_taskcluster_wandb_log_failures(wandb_mock, getargs_mock, caplog, sample "unittest", ], taskcluster_secret=None, + publish_group_logs=False, ), ) @patch("translations_parser.publishers.wandb") diff --git a/tracking/translations_parser/cli/taskcluster.py b/tracking/translations_parser/cli/taskcluster.py index 59d374b68..ec2e2e453 100644 --- a/tracking/translations_parser/cli/taskcluster.py +++ b/tracking/translations_parser/cli/taskcluster.py @@ -20,11 +20,14 @@ from io import TextIOWrapper from pathlib import Path +import taskcluster from translations_parser.parser import TrainingParser, logger from translations_parser.publishers import CSVExport, Publisher -from translations_parser.utils import taskcluster_log_filter +from translations_parser.utils import publish_group_logs_from_tasks, taskcluster_log_filter from translations_parser.wandb import add_wandb_arguments, get_wandb_publisher +queue = taskcluster.Queue({"rootUrl": "https://firefox-ci-tc.services.mozilla.com"}) + def get_args() -> argparse.Namespace: parser = argparse.ArgumentParser( @@ -59,6 +62,12 @@ def get_args() -> argparse.Namespace: dest="loglevel", const=logging.DEBUG, ) + parser.add_argument( + "--publish-group-logs", + help=("Enable publishing a group_logs fake run with the experiment configuration."), + action="store_true", + default=False, + ) # Extend parser with Weight & Biases CLI args add_wandb_arguments(parser) @@ -100,6 +109,21 @@ def boot() -> None: # Use log filtering when using non-stream (for uploading past experiments) log_filter = taskcluster_log_filter if not args.from_stream else None + # publish the config fist + if args.publish_group_logs: + logger.info("Publishing experiment config to a 'group_logs' fake run.") + # Retrieve experiment configuration from the task group + task_id = os.environ.get("TASK_ID") + if not task_id: + raise Exception("Group logs publication can only run in taskcluster") + task = queue.task(task_id) + group_id = task["taskGroupId"] + # Ensure task group is readable + queue.getTaskGroup(group_id) + task_group = queue.task(group_id) + config = task_group.get("extra", {}).get("action", {}).get("context", {}).get("input") + publish_group_logs_from_tasks(config=config) + parser = TrainingParser( lines, publishers=publishers, diff --git a/tracking/translations_parser/cli/taskcluster_group.py b/tracking/translations_parser/cli/taskcluster_group.py index 8edfa59d9..7504be30e 100644 --- a/tracking/translations_parser/cli/taskcluster_group.py +++ b/tracking/translations_parser/cli/taskcluster_group.py @@ -8,22 +8,24 @@ import argparse import logging -import re import tempfile from collections import defaultdict from pathlib import Path import wandb -import yaml import taskcluster -from taskcluster.download import downloadArtifactToBuf, downloadArtifactToFile +from taskcluster.download import downloadArtifactToBuf from translations_parser.data import Metric from translations_parser.parser import TrainingParser, logger from translations_parser.publishers import WandB -from translations_parser.utils import build_task_name, parse_task_label +from translations_parser.utils import ( + MULTIPLE_TRAIN_SUFFIX, + build_task_name, + parse_task_label, + publish_group_logs_from_tasks, +) -MULTIPLE_TRAIN_SUFFIX = re.compile(r"(-\d+)/\d+$") KIND_TAG_TARGET = ("train", "finetune") queue = taskcluster.Queue({"rootUrl": "https://firefox-ci-tc.services.mozilla.com"}) @@ -145,9 +147,7 @@ def list_training_tasks(group_id: str, grouped_tasks: dict[str, list[dict]]) -> return training_tasks -def list_metrics_tasks( - group_id: str, grouped_tasks: dict[str, list[dict]] -) -> list[dict[str, dict]]: +def list_metrics_tasks(group_id: str, grouped_tasks: dict[str, list[dict]]) -> dict[str, dict]: metrics_tasks = {task["status"]["taskId"]: task for task in grouped_tasks["evaluate"]} if not metrics_tasks: @@ -256,45 +256,12 @@ def publish_task_group(group_id: str, override: bool = False) -> None: ) # Group and publish remaining metrics tasks via the logs publication - with tempfile.TemporaryDirectory() as temp_dir: - logs_folder = Path(temp_dir) / "logs" - metrics_folder = logs_folder / project_name / group_name / "metrics" - metrics_folder.mkdir(parents=True, exist_ok=True) - - for metric_task_id, metrics_task in metrics_tasks.items(): - filename = metrics_task["task"]["tags"]["label"] - if re_match := MULTIPLE_TRAIN_SUFFIX.search(filename): - (suffix,) = re_match.groups() - filename = MULTIPLE_TRAIN_SUFFIX.sub(suffix, filename) - - metric_artifact = next( - ( - artifact["name"] - for artifact in queue.listLatestArtifacts(metric_task_id)["artifacts"] - if artifact["name"].endswith(".metrics") - ), - None, - ) - if metric_artifact is None: - logger.error(f"No .metric artifact found for task {metric_task_id}, skipping.") - continue - with (metrics_folder / f"{filename}.metrics").open("wb") as log_file: - downloadArtifactToFile( - log_file, - taskId=metrics_task["status"]["taskId"], - name=metric_artifact, - queueService=queue, - ) - - # Dump experiment config so it is published on group_logs - config_path = Path(temp_dir) / "experiments" / project_name / group_name / "config.yml" - config_path.parent.mkdir(parents=True, exist_ok=True) - - with config_path.open("w") as config_file: - yaml.dump(config, config_file) - - parents = str(logs_folder.resolve()).strip().split("/") - WandB.publish_group_logs(parents, project_name, group_name, existing_runs=[]) + publish_group_logs_from_tasks( + project=project_name, + group0=group_name, + metrics_tasks=metrics_tasks, + config=config, + ) def list_dependent_group_ids(task_id: str, known: set[str]): diff --git a/tracking/translations_parser/utils.py b/tracking/translations_parser/utils.py index 01569ddfa..98534ff70 100644 --- a/tracking/translations_parser/utils.py +++ b/tracking/translations_parser/utils.py @@ -1,11 +1,16 @@ import logging import os import re +import tempfile from collections.abc import Sequence from datetime import datetime +from pathlib import Path from typing import NamedTuple, Optional +import yaml + import taskcluster +from taskcluster.download import downloadArtifactToFile logger = logging.getLogger(__name__) @@ -15,6 +20,7 @@ # Tags usually ends with project (e.g. `en-nl` or `eng-nld`) TAG_PROJECT_SUFFIX_REGEX = re.compile(r"((-\w{2}){2}|(-\w{3}){2})$") +MULTIPLE_TRAIN_SUFFIX = re.compile(r"(-\d+)/\d+$") # This regex needs to work on historic runs as well as the current tasks. TRAIN_LABEL_REGEX = re.compile( @@ -100,6 +106,8 @@ r"$" ) +queue = taskcluster.Queue({"rootUrl": "https://firefox-ci-tc.services.mozilla.com"}) + class ParsedTaskLabel(NamedTuple): model: str @@ -187,3 +195,69 @@ def metric_from_tc_context(chrf: float, bleu: float, comet: float): bleu_detok=bleu, comet=comet, ) + + +def publish_group_logs_from_tasks( + project: str | None = None, + group: str | None = None, + metrics_tasks: dict[str, dict] = {}, + config: dict = {}, +): + """ + Publish a fake run, named 'group_logs' to Weight & Biases from a Taskcluster context. + In case project or group is left to None, both values will be detected from Taskcluster. + `metrics_tasks` optionally contains finished evaluation tasks that will be published as new runs. + """ + from translations_parser.publishers import WandB + from translations_parser.wandb import get_wandb_names + + message = "Handling group_logs publication" + if metrics_tasks: + message += f" with {len(metrics_tasks)} extra evaluation tasks" + logger.info(message) + + if project is None or group is None: + logger.info("Retrieving W&B names from taskcluster attributes") + project, group, _ = get_wandb_names() + + with tempfile.TemporaryDirectory() as temp_dir: + logs_folder = Path(temp_dir) / "logs" + metrics_folder = logs_folder / project / group / "metrics" + metrics_folder.mkdir(parents=True, exist_ok=True) + + # Group and publish remaining metrics tasks via the logs publication + for metric_task_id, metrics_task in metrics_tasks.items(): + filename = metrics_task["task"]["tags"]["label"] + if re_match := MULTIPLE_TRAIN_SUFFIX.search(filename): + (suffix,) = re_match.groups() + filename = MULTIPLE_TRAIN_SUFFIX.sub(suffix, filename) + + metric_artifact = next( + ( + artifact["name"] + for artifact in queue.listLatestArtifacts(metric_task_id)["artifacts"] + if artifact["name"].endswith(".metrics") + ), + None, + ) + if metric_artifact is None: + logger.error(f"No .metric artifact found for task {metric_task_id}, skipping.") + continue + + with (metrics_folder / f"{filename}.metrics").open("wb") as log_file: + downloadArtifactToFile( + log_file, + taskId=metrics_task["status"]["taskId"], + name=metric_artifact, + queueService=queue, + ) + + # Dump experiment config so it is published on group_logs + config_path = Path(temp_dir) / "experiments" / project / group / "config.yml" + config_path.parent.mkdir(parents=True, exist_ok=True) + + with config_path.open("w") as config_file: + yaml.dump(config, config_file) + + parents = str(logs_folder.resolve()).strip().split("/") + WandB.publish_group_logs(parents, project, group, existing_runs=[])