Skip to content

Commit

Permalink
add dataset_status='normal|blocked' to job metrics (#3008)
Browse files Browse the repository at this point in the history
* add dataset_status='normal|blocked' to job metrics

* add tests + rename collection constant

* fix test

* Update jobs/cache_maintenance/src/cache_maintenance/queue_metrics.py

Co-authored-by: Andrea Francis Soria Jimenez <[email protected]>

* Update jobs/mongodb_migration/tests/migrations/test_20240731143600_queue_add_dataset_status_to_queue_metrics.py

Co-authored-by: Andrea Francis Soria Jimenez <[email protected]>

---------

Co-authored-by: Andrea Francis Soria Jimenez <[email protected]>
  • Loading branch information
severo and AndreaFrancis authored Jul 31, 2024
1 parent 64920b3 commit 9b08fd7
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 45 deletions.
20 changes: 12 additions & 8 deletions jobs/cache_maintenance/src/cache_maintenance/queue_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@ def collect_queue_metrics() -> None:
"""
logging.info("updating queue metrics")

new_metric_by_id = Queue().get_jobs_total_by_type_and_status()
new_metric_by_id = Queue().get_jobs_total_by_type_status_and_dataset_status()
new_ids = set(new_metric_by_id.keys())
old_ids = set((metric.job_type, metric.status) for metric in JobTotalMetricDocument.objects())
old_ids = set(
(metric.job_type, metric.status, metric.dataset_status) for metric in JobTotalMetricDocument.objects()
)
to_delete = old_ids - new_ids

for job_type, status in to_delete:
JobTotalMetricDocument.objects(job_type=job_type, status=status).delete()
logging.info(f"{job_type=} {status=} has been deleted")
for job_type, status, dataset_status in to_delete:
JobTotalMetricDocument.objects(job_type=job_type, status=status, dataset_status=dataset_status).delete()
logging.info(f"{job_type=} {status=} {dataset_status=}: has been deleted")

for (job_type, status), total in new_metric_by_id.items():
JobTotalMetricDocument.objects(job_type=job_type, status=status).upsert_one(total=total)
logging.info(f"{job_type=} {status=}: {total=} has been inserted")
for (job_type, status, dataset_status), total in new_metric_by_id.items():
JobTotalMetricDocument.objects(job_type=job_type, status=status, dataset_status=dataset_status).upsert_one(
total=total
)
logging.info(f"{job_type=} {status=} {dataset_status=}: {total=} has been inserted")

logging.info("queue metrics have been updated")

Expand Down
22 changes: 12 additions & 10 deletions jobs/cache_maintenance/tests/test_collect_queue_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from unittest.mock import patch

import pytest
from libcommon.queue.jobs import JobsCountByWorkerSize, JobsTotalByTypeAndStatus, Queue
from libcommon.queue.dataset_blockages import DATASET_STATUS_NORMAL
from libcommon.queue.jobs import JobsCountByWorkerSize, JobsTotalByTypeStatusAndDatasetStatus, Queue
from libcommon.queue.metrics import JobTotalMetricDocument, WorkerSizeJobsCountDocument

from cache_maintenance.queue_metrics import collect_queue_metrics, collect_worker_size_jobs_count
Expand All @@ -13,10 +14,10 @@
STATUS_WAITING = "waiting"
COUNT = 1

NEW_METRIC = {(JOB_TYPE_A, STATUS_WAITING): COUNT}
OTHER_JOB_TYPE = {("JobTypeB", STATUS_WAITING): COUNT}
OTHER_STATUS = {(JOB_TYPE_A, "started"): COUNT}
OTHER_COUNT = {(JOB_TYPE_A, STATUS_WAITING): COUNT + 1}
NEW_METRIC = {(JOB_TYPE_A, STATUS_WAITING, DATASET_STATUS_NORMAL): COUNT}
OTHER_JOB_TYPE = {("JobTypeB", STATUS_WAITING, DATASET_STATUS_NORMAL): COUNT}
OTHER_STATUS = {(JOB_TYPE_A, "started", DATASET_STATUS_NORMAL): COUNT}
OTHER_COUNT = {(JOB_TYPE_A, STATUS_WAITING, DATASET_STATUS_NORMAL): COUNT + 1}


WORKER_SIZE = "medium"
Expand All @@ -26,7 +27,7 @@


class MockQueue(Queue):
def get_jobs_total_by_type_and_status(self) -> JobsTotalByTypeAndStatus:
def get_jobs_total_by_type_status_and_dataset_status(self) -> JobsTotalByTypeStatusAndDatasetStatus:
return NEW_METRIC

def get_jobs_count_by_worker_size(self) -> JobsCountByWorkerSize:
Expand All @@ -37,9 +38,9 @@ def get_jobs_count_by_worker_size(self) -> JobsCountByWorkerSize:
"old_metrics",
[{}, NEW_METRIC, OTHER_JOB_TYPE, OTHER_STATUS, OTHER_COUNT],
)
def test_collect_jobs_metrics(old_metrics: JobsTotalByTypeAndStatus) -> None:
for (job_type, status), total in old_metrics.items():
JobTotalMetricDocument(job_type=job_type, status=status, total=total).save()
def test_collect_jobs_metrics(old_metrics: JobsTotalByTypeStatusAndDatasetStatus) -> None:
for (job_type, status, dataset_status), total in old_metrics.items():
JobTotalMetricDocument(job_type=job_type, status=status, dataset_status=dataset_status, total=total).save()

with patch(
"cache_maintenance.queue_metrics.Queue",
Expand All @@ -48,7 +49,8 @@ def test_collect_jobs_metrics(old_metrics: JobsTotalByTypeAndStatus) -> None:
collect_queue_metrics()

assert {
(metric.job_type, metric.status): metric.total for metric in JobTotalMetricDocument.objects()
(metric.job_type, metric.status, metric.dataset_status): metric.total
for metric in JobTotalMetricDocument.objects()
} == NEW_METRIC


Expand Down
10 changes: 8 additions & 2 deletions jobs/mongodb_migration/src/mongodb_migration/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
QUEUE_COLLECTION_DATASET_BLOCKAGES,
QUEUE_COLLECTION_PAST_JOBS,
QUEUE_MONGOENGINE_ALIAS,
TYPE_AND_STATUS_JOB_COUNTS_COLLECTION,
TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION,
)

from mongodb_migration.deletion_migrations import (
Expand Down Expand Up @@ -93,6 +93,9 @@
from mongodb_migration.migrations._20240703160300_cache_add_duration import (
MigrationAddDurationToCacheResponse,
)
from mongodb_migration.migrations._20240731143600_queue_add_dataset_status_to_queue_metrics import (
MigrationAddDatasetStatusToQueueMetrics,
)
from mongodb_migration.renaming_migrations import (
CacheRenamingMigration,
QueueRenamingMigration,
Expand Down Expand Up @@ -303,7 +306,7 @@ def get_migrations(self) -> list[Migration]:
version="20230814121400",
description="drop queue metrics collection",
alias="metrics",
collection_name=TYPE_AND_STATUS_JOB_COUNTS_COLLECTION,
collection_name=TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION,
),
MigrationAddHasFTSToSplitDuckdbIndexCacheResponse(
version="20230926095900",
Expand Down Expand Up @@ -412,4 +415,7 @@ def get_migrations(self) -> list[Migration]:
MigrationAddDurationToCacheResponse(
version="20240703160300", description="add 'duration' field to cache records"
),
MigrationAddDatasetStatusToQueueMetrics(
version="20240731143600", description="add 'dataset_status' field to the jobs metrics"
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2024 The HuggingFace Authors.

import logging

from libcommon.constants import QUEUE_MONGOENGINE_ALIAS, TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION
from libcommon.queue.dataset_blockages import DATASET_STATUS_NORMAL
from libcommon.queue.metrics import JobTotalMetricDocument
from mongoengine.connection import get_db

from mongodb_migration.check import check_documents
from mongodb_migration.migration import Migration


# connection already occurred in the main.py (caveat: we use globals)
class MigrationAddDatasetStatusToQueueMetrics(Migration):
def up(self) -> None:
# See https://docs.mongoengine.org/guide/migration.html#example-1-addition-of-a-field
logging.info("If missing, add the 'dataset_status' field with the default value 'normal' to the jobs metrics")
db = get_db(QUEUE_MONGOENGINE_ALIAS)
db[TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION].update_many(
{"dataset_status": {"$exists": False}}, {"$set": {"dataset_status": DATASET_STATUS_NORMAL}}
)

def down(self) -> None:
logging.info("Remove the 'dataset_status' field from all the jobs metrics")
db = get_db(QUEUE_MONGOENGINE_ALIAS)
db[TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION].update_many({}, {"$unset": {"dataset_status": ""}})

def validate(self) -> None:
logging.info("Ensure that a random selection of jobs metrics have the 'dataset_status' field")

check_documents(DocCls=JobTotalMetricDocument, sample_size=10)
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2024 The HuggingFace Authors.


from libcommon.constants import QUEUE_MONGOENGINE_ALIAS, TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION
from libcommon.queue.dataset_blockages import DATASET_STATUS_BLOCKED, DATASET_STATUS_NORMAL
from libcommon.resources import MongoResource
from mongoengine.connection import get_db

from mongodb_migration.migrations._20240731143600_queue_add_dataset_status_to_queue_metrics import (
MigrationAddDatasetStatusToQueueMetrics,
)


def test_queue_add_dataset_status_to_queue_metrics(mongo_host: str) -> None:
with MongoResource(
database="test_queue_add_dataset_status_to_queue_metrics",
host=mongo_host,
mongoengine_alias=QUEUE_MONGOENGINE_ALIAS,
):
db = get_db(QUEUE_MONGOENGINE_ALIAS)
db[TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION].insert_many(
[
{
"job_type": "job_type1",
"status": "waiting",
},
{"job_type": "job_type2", "status": "waiting", "dataset_status": DATASET_STATUS_BLOCKED},
]
)

migration = MigrationAddDatasetStatusToQueueMetrics(
version="20240731143600", description="add 'dataset_status' field to jobs metrics"
)
migration.up()

result = list(db[TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION].find({"job_type": "job_type1"}))
assert len(result) == 1
assert result[0]["dataset_status"] == DATASET_STATUS_NORMAL

result = list(db[TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION].find({"job_type": "job_type2"}))
assert len(result) == 1
assert result[0]["dataset_status"] == DATASET_STATUS_BLOCKED

migration.down()
result = list(db[TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION].find())
assert len(result) == 2
assert "dataset_status" not in result[0]
assert "dataset_status" not in result[1]

db[TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION].drop()
2 changes: 1 addition & 1 deletion libs/libcommon/src/libcommon/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
DUCKDB_INDEX_CACHE_APPNAME = "dataset_viewer_duckdb_index"
DUCKDB_INDEX_JOB_RUNNER_SUBDIRECTORY = "job_runner"
CACHE_METRICS_COLLECTION = "cacheTotalMetric"
TYPE_AND_STATUS_JOB_COUNTS_COLLECTION = "jobTotalMetric"
TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION = "jobTotalMetric"
WORKER_TYPE_JOB_COUNTS_COLLECTION = "workerTypeJobCounts"
QUEUE_COLLECTION_JOBS = "jobsBlue"
QUEUE_COLLECTION_PAST_JOBS = "pastJobs"
Expand Down
6 changes: 4 additions & 2 deletions libs/libcommon/src/libcommon/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def getLatestContent(self) -> Any:
QUEUE_JOBS_TOTAL = Gauge(
name="queue_jobs_total",
documentation="Number of jobs in the queue",
labelnames=["queue", "status"],
labelnames=["queue", "status", "dataset_status"],
multiprocess_mode="liveall",
)
WORKER_SIZE_JOBS_COUNT = Gauge(
Expand Down Expand Up @@ -80,7 +80,9 @@ def getLatestContent(self) -> Any:

def update_queue_jobs_total() -> None:
for job_metric in JobTotalMetricDocument.objects():
QUEUE_JOBS_TOTAL.labels(queue=job_metric.job_type, status=job_metric.status).set(job_metric.total)
QUEUE_JOBS_TOTAL.labels(
queue=job_metric.job_type, status=job_metric.status, dataset_status=job_metric.dataset_status
).set(job_metric.total)


def update_worker_size_jobs_count() -> None:
Expand Down
3 changes: 3 additions & 0 deletions libs/libcommon/src/libcommon/queue/dataset_blockages.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def __get__(self, instance: object, cls: type[U]) -> QuerySet[U]:
6 * 60 * 60
) # if we change this value, we have to ensure the TTL index is migrated accordingly

DATASET_STATUS_NORMAL = "normal"
DATASET_STATUS_BLOCKED = "blocked"


class DatasetBlockageDocument(Document):
"""A decision to block (rate-limit) a dataset. The same dataset can be blocked several times.
Expand Down
31 changes: 23 additions & 8 deletions libs/libcommon/src/libcommon/queue/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
QUEUE_MONGOENGINE_ALIAS,
)
from libcommon.dtos import FlatJobInfo, JobInfo, Priority, Status, WorkerSize
from libcommon.queue.dataset_blockages import get_blocked_datasets
from libcommon.queue.dataset_blockages import DATASET_STATUS_BLOCKED, DATASET_STATUS_NORMAL, get_blocked_datasets
from libcommon.queue.lock import lock, release_lock, release_locks
from libcommon.queue.metrics import (
decrease_metric,
Expand Down Expand Up @@ -60,7 +60,7 @@ def __get__(self, instance: object, cls: type[U]) -> QuerySet[U]:
# END monkey patching ### hack ###


JobsTotalByTypeAndStatus = Mapping[tuple[str, str], int]
JobsTotalByTypeStatusAndDatasetStatus = Mapping[tuple[str, str, str], int]

JobsCountByWorkerSize = Mapping[str, int]

Expand Down Expand Up @@ -805,28 +805,43 @@ def has_pending_jobs(self, dataset: str, job_types: Optional[list[str]] = None)
return JobDocument.objects(**filters, dataset=dataset).count() > 0

# special reports
def get_jobs_total_by_type_and_status(self) -> JobsTotalByTypeAndStatus:
"""Count the number of jobs by job type and status.
def get_jobs_total_by_type_status_and_dataset_status(self) -> JobsTotalByTypeStatusAndDatasetStatus:
"""Count the number of jobs by job type, status and dataset status.
Returns:
an object with the total of jobs by job type and status.
Keys are a tuple (job_type, status), and values are the total of jobs.
an object with the total of jobs by job type, status and dataset status.
Keys are a tuple (job_type, status, dataset_status), and values are the total of jobs.
"""
blocked_datasets = get_blocked_datasets()

return {
(metric["job_type"], metric["status"]): metric["total"]
(metric["job_type"], metric["status"], metric["dataset_status"]): metric["total"]
for metric in JobDocument.objects().aggregate(
[
{"$sort": {"type": 1, "status": 1}},
{
# TODO(SL): optimize this part?
"$addFields": {
"dataset_status": {
"$cond": {
"if": {"$in": ["$dataset", blocked_datasets]},
"then": DATASET_STATUS_BLOCKED,
"else": DATASET_STATUS_NORMAL,
}
}
}
},
{
"$group": {
"_id": {"type": "$type", "status": "$status"},
"_id": {"type": "$type", "status": "$status", "dataset_status": "$dataset_status"},
"total": {"$sum": 1},
}
},
{
"$project": {
"job_type": "$_id.type",
"status": "$_id.status",
"dataset_status": "$_id.dataset_status",
"total": "$total",
}
},
Expand Down
14 changes: 8 additions & 6 deletions libs/libcommon/src/libcommon/queue/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@

from libcommon.constants import (
QUEUE_MONGOENGINE_ALIAS,
TYPE_AND_STATUS_JOB_COUNTS_COLLECTION,
TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION,
WORKER_TYPE_JOB_COUNTS_COLLECTION,
)
from libcommon.dtos import Status, WorkerSize
from libcommon.queue.dataset_blockages import is_blocked
from libcommon.queue.dataset_blockages import DATASET_STATUS_NORMAL, is_blocked
from libcommon.utils import get_datetime

# START monkey patching ### hack ###
Expand Down Expand Up @@ -52,18 +52,20 @@ class JobTotalMetricDocument(Document):
Args:
job_type (`str`): job type
status (`str`): job status see libcommon.queue.jobs.Status
dataset_status (`str`): whether the dataset is blocked ("normal", "blocked")
total (`int`): total of jobs
created_at (`datetime`): when the metric has been created.
"""

id = ObjectIdField(db_field="_id", primary_key=True, default=ObjectId)
job_type = StringField(required=True, unique_with="status")
job_type = StringField(required=True, unique_with=["status", "dataset_status"])
status = StringField(required=True)
dataset_status = StringField(required=True, default=DATASET_STATUS_NORMAL)
total = IntField(required=True, default=0)
created_at = DateTimeField(default=get_datetime)

meta = {
"collection": TYPE_AND_STATUS_JOB_COUNTS_COLLECTION,
"collection": TYPE_STATUS_AND_DATASET_STATUS_JOB_COUNTS_COLLECTION,
"db_alias": QUEUE_MONGOENGINE_ALIAS,
"indexes": [("job_type", "status")],
}
Expand Down Expand Up @@ -105,8 +107,8 @@ def get_worker_size(difficulty: int) -> WorkerSize:
objects = QuerySetManager["WorkerSizeJobsCountDocument"]()


def _update_metrics(job_type: str, status: str, increase_by: int) -> None:
JobTotalMetricDocument.objects(job_type=job_type, status=status).update(
def _update_metrics(job_type: str, status: str, increase_by: int, dataset_status: str = DATASET_STATUS_NORMAL) -> None:
JobTotalMetricDocument.objects(job_type=job_type, status=status, dataset_status=dataset_status).update(
upsert=True,
write_concern={"w": "majority", "fsync": True},
read_concern={"level": "majority"},
Expand Down
Loading

0 comments on commit 9b08fd7

Please sign in to comment.