From 81ce3eca4106ed01f778f334b8a307c0961c4723 Mon Sep 17 00:00:00 2001 From: Amirkeivan Mohtashami Date: Tue, 21 Feb 2023 20:22:51 +0100 Subject: [PATCH 01/10] Split ES and QueueService --- cms/__init__.py | 6 +- cms/server/admin/rpc_authorization.py | 8 +- cms/server/admin/server.py | 5 +- cms/server/admin/templates/overview.html | 12 +- cms/server/contest/handlers/tasksubmission.py | 8 +- cms/server/contest/handlers/taskusertest.py | 7 +- cms/server/contest/server.py | 8 +- cms/service/EvaluationService.py | 566 +++++------------ cms/service/QueueService.py | 586 ++++++++++++++++++ cms/service/esoperations.py | 3 + cms/util.py | 13 + cmstestsuite/testrunner.py | 8 +- config/cms.conf.sample | 4 +- scripts/cmsQueueService | 56 ++ 14 files changed, 860 insertions(+), 430 deletions(-) create mode 100644 cms/service/QueueService.py create mode 100755 scripts/cmsQueueService diff --git a/cms/__init__.py b/cms/__init__.py index 8f17c113a9..1fd8ed062c 100644 --- a/cms/__init__.py +++ b/cms/__init__.py @@ -39,7 +39,7 @@ "Address", "ServiceCoord", "ConfigError", "async_config", "config", # util "mkdir", "rmtree", "utf8_decoder", "get_safe_shard", "get_service_address", - "get_service_shards", "contest_id_from_args", "default_argument_parser", + "random_service", "get_service_shards", "contest_id_from_args", "default_argument_parser", # plugin "plugin_list", ] @@ -60,6 +60,7 @@ # Acts as a "none of the above". TOKEN_MODE_MIXED = "mixed" + # Feedback level. # Full information (killing signals, time and memory, status for all @@ -73,5 +74,6 @@ from .conf import Address, ServiceCoord, ConfigError, async_config, config from .util import mkdir, rmtree, utf8_decoder, get_safe_shard, \ get_service_address, get_service_shards, contest_id_from_args, \ - default_argument_parser + random_service, default_argument_parser from .plugin import plugin_list + diff --git a/cms/server/admin/rpc_authorization.py b/cms/server/admin/rpc_authorization.py index 2f4287c6e8..e000e0d26f 100644 --- a/cms/server/admin/rpc_authorization.py +++ b/cms/server/admin/rpc_authorization.py @@ -27,8 +27,8 @@ RPCS_ALLOWED_FOR_AUTHENTICATED = [ ("AdminWebServer", "submissions_status"), ("ResourceService", "get_resources"), - ("EvaluationService", "workers_status"), - ("EvaluationService", "queue_status"), + ("QueueService", "workers_status"), + ("QueueService", "queue_status"), ("LogService", "last_messages"), ] @@ -39,8 +39,8 @@ RPCS_ALLOWED_FOR_ALL = RPCS_ALLOWED_FOR_MESSAGING + [ ("ResourceService", "kill_service"), ("ResourceService", "toggle_autorestart"), - ("EvaluationService", "enable_worker"), - ("EvaluationService", "disable_worker"), + ("QueueService", "enable_worker"), + ("QueueService", "disable_worker"), ("EvaluationService", "invalidate_submission"), ("ScoringService", "invalidate_submission"), ] diff --git a/cms/server/admin/server.py b/cms/server/admin/server.py index 796ab84698..3c63338232 100644 --- a/cms/server/admin/server.py +++ b/cms/server/admin/server.py @@ -73,7 +73,10 @@ def __init__(self, shard): self.admin_web_server = self.connect_to( ServiceCoord("AdminWebServer", 0)) - self.evaluation_service = self.connect_to( + self.queue_service = self.connect_to( + ServiceCoord("QueueService", 0)) + # TODO: does it make sense to use a random one? + self.evaluation_services = self.connect_to( ServiceCoord("EvaluationService", 0)) self.scoring_service = self.connect_to( ServiceCoord("ScoringService", 0)) diff --git a/cms/server/admin/templates/overview.html b/cms/server/admin/templates/overview.html index 8d59f24c9a..6355b74764 100644 --- a/cms/server/admin/templates/overview.html +++ b/cms/server/admin/templates/overview.html @@ -63,11 +63,11 @@ function enable_worker(shard) { if (confirm("Do you really want to enable worker " + shard + "?")) { - cmsrpc_request("EvaluationService", 0, + cmsrpc_request("QueueService", 0, "enable_worker", {"shard":shard}, function() { - cmsrpc_request("EvaluationService", 0, + cmsrpc_request("QueueService", 0, "workers_status", {}, update_workers_status); @@ -77,11 +77,11 @@ function disable_worker(shard) { if (confirm("Do you really want to disable worker " + shard + "?")) { - cmsrpc_request("EvaluationService", 0, + cmsrpc_request("QueueService", 0, "disable_worker", {"shard":shard}, function() { - cmsrpc_request("EvaluationService", 0, + cmsrpc_request("QueueService", 0, "workers_status", {}, update_workers_status); @@ -214,12 +214,12 @@ if (!update_statuses.queue_request || update_statuses.queue_request.state() != "pending") { update_statuses.queue_request = - cmsrpc_request("EvaluationService", 0, + cmsrpc_request("QueueService", 0, "queue_status", {}, update_queue_status); } - cmsrpc_request("EvaluationService", 0, + cmsrpc_request("QueueService", 0, "workers_status", {}, update_workers_status); diff --git a/cms/server/contest/handlers/tasksubmission.py b/cms/server/contest/handlers/tasksubmission.py index 9199c0bcd7..ba24b9939a 100644 --- a/cms/server/contest/handlers/tasksubmission.py +++ b/cms/server/contest/handlers/tasksubmission.py @@ -38,7 +38,8 @@ import tornado.web as tornado_web from sqlalchemy.orm import joinedload -from cms import config, FEEDBACK_LEVEL_FULL + +from cms import config, random_service, FEEDBACK_LEVEL_FULL from cms.db import Submission, SubmissionResult from cms.grading.languagemanager import get_language from cms.grading.scoring import task_score @@ -90,8 +91,8 @@ def post(self, task_name): logger.info("Sent error: `%s' - `%s'", e.subject, e.formatted_text) self.notify_error(e.subject, e.text, e.text_params) else: - self.service.evaluation_service.new_submission( - submission_id=submission.id) + random_service(self.service.evaluation_services)\ + .new_submission(submission_id=submission.id) self.notify_success(N_("Submission received"), N_("Your submission has been received " "and is currently being evaluated.")) @@ -104,7 +105,6 @@ def post(self, task_name): self.redirect(self.contest_url("tasks", task.name, "submissions", **query_args)) - class TaskSubmissionsHandler(ContestHandler): """Shows the data of a task in the contest. diff --git a/cms/server/contest/handlers/taskusertest.py b/cms/server/contest/handlers/taskusertest.py index c78c87c1c8..167d8f513a 100644 --- a/cms/server/contest/handlers/taskusertest.py +++ b/cms/server/contest/handlers/taskusertest.py @@ -36,7 +36,8 @@ except ImportError: import tornado.web as tornado_web -from cms import config + +from cms import config, random_service from cms.db import UserTest, UserTestResult from cms.grading.languagemanager import get_language from cms.server import multi_contest @@ -143,8 +144,8 @@ def post(self, task_name): logger.info("Sent error: `%s' - `%s'", e.subject, e.formatted_text) self.notify_error(e.subject, e.text, e.text_params) else: - self.service.evaluation_service.new_user_test( - user_test_id=user_test.id) + random_service(self.service.evaluation_services) \ + .new_user_test(user_test_id=user_test.id) self.notify_success(N_("Test received"), N_("Your test has been received " "and is currently being executed.")) diff --git a/cms/server/contest/server.py b/cms/server/contest/server.py index a299af4aa6..10cbf5bc89 100644 --- a/cms/server/contest/server.py +++ b/cms/server/contest/server.py @@ -41,7 +41,7 @@ from werkzeug.middleware.shared_data import SharedDataMiddleware -from cms import ConfigError, ServiceCoord, config +from cms import ConfigError, ServiceCoord, config, get_service_shards from cms.io import WebService from cms.locale import get_translations from cms.server.contest.jinja2_toolbox import CWS_ENVIRONMENT @@ -116,8 +116,10 @@ def __init__(self, shard, contest_id=None): # Retrieve the available translations. self.translations = get_translations() - self.evaluation_service = self.connect_to( - ServiceCoord("EvaluationService", 0)) + self.evaluation_services = [ + self.connect_to(ServiceCoord("EvaluationService", i)) + for i in range(get_service_shards("EvaluationService"))] + self.scoring_service = self.connect_to( ServiceCoord("ScoringService", 0)) diff --git a/cms/service/EvaluationService.py b/cms/service/EvaluationService.py index 7a1b67d517..16255f4408 100644 --- a/cms/service/EvaluationService.py +++ b/cms/service/EvaluationService.py @@ -32,172 +32,26 @@ import logging from collections import defaultdict -from datetime import timedelta from functools import wraps import gevent.lock from sqlalchemy import func from sqlalchemy.exc import IntegrityError -from cms import ServiceCoord, get_service_shards +from cms import ServiceCoord from cmscommon.datetime import make_timestamp from cms.db import SessionGen, Digest, Dataset, Evaluation, Submission, \ SubmissionResult, Testcase, UserTest, UserTestResult, get_submissions, \ get_submission_results, get_datasets_to_judge -from cms.grading.Job import JobGroup -from cms.io import Executor, TriggeredService, rpc_method +from cms.grading.Job import Job +from cms.io import Service, rpc_method from .esoperations import ESOperation, get_relevant_operations, \ - get_submissions_operations, get_user_tests_operations, \ submission_get_operations, submission_to_evaluate, \ user_test_get_operations -from .flushingdict import FlushingDict -from .workerpool import WorkerPool logger = logging.getLogger(__name__) - -class EvaluationExecutor(Executor): - - # Real maximum number of operations to be sent to a worker. - MAX_OPERATIONS_PER_BATCH = 25 - - def __init__(self, evaluation_service): - """Create the single executor for ES. - - The executor just delegates work to the worker pool. - - """ - super().__init__(True) - - self.evaluation_service = evaluation_service - self.pool = WorkerPool(self.evaluation_service) - - # List of QueueItem (ESOperation) we have extracted from the - # queue, but not yet finished to execute. - self._currently_executing = [] - - # Lock used to guard the currently executing operations - self._current_execution_lock = gevent.lock.RLock() - - # As evaluate operations are split by testcases, there are too - # many entries in the queue to display, so we just take only one - # operation of each (type, object_id, dataset_id, priority) tuple. - # This dictionary maps any such tuple to a "queue entry" (lacking - # the testcase codename) and keeps track of multiplicity. - self.queue_status_cumulative = dict() - - for i in range(get_service_shards("Worker")): - worker = ServiceCoord("Worker", i) - self.pool.add_worker(worker) - - def __contains__(self, item): - """Return whether the item is in execution. - - item (QueueItem): an item to search. - - return (bool): True if item is in the queue, or if it is the - item already extracted but not given to the workers yet, - or if it is being executed by a worker. - - """ - return (super().__contains__(item) - or item in self._currently_executing - or item in self.pool) - - def max_operations_per_batch(self): - """Return the maximum number of operations per batch. - - We derive the number from the length of the queue divided by - the number of workers, with a cap at MAX_OPERATIONS_PER_BATCH. - - """ - # TODO: len(self.pool) is the total number of workers, - # included those that are disabled. - ratio = len(self._operation_queue) // len(self.pool) + 1 - ret = min(max(ratio, 1), EvaluationExecutor.MAX_OPERATIONS_PER_BATCH) - logger.info("Ratio is %d, executing %d operations together.", - ratio, ret) - return ret - - def execute(self, entries): - """Execute a batch of operations in the queue. - - The operations might not be executed immediately because of - lack of workers. - - entries ([QueueEntry]): entries containing the operations to - perform. - - """ - with self._current_execution_lock: - self._currently_executing = [] - for entry in entries: - operation = entry.item - # Side data is attached to the operation sent to the - # worker pool. In case the operation is lost, the pool - # will return it to us, and we will use it to - # re-enqueue it. - operation.side_data = (entry.priority, entry.timestamp) - self._currently_executing.append(operation) - while len(self._currently_executing) > 0: - self.pool.wait_for_workers() - with self._current_execution_lock: - if len(self._currently_executing) == 0: - break - res = self.pool.acquire_worker(self._currently_executing) - if res is not None: - self._currently_executing = [] - break - - def enqueue(self, item, priority, timestamp): - success = super().enqueue(item, priority, timestamp) - if success: - # Add the item to the cumulative status dictionary. - key = item.short_key() + (priority,) - if key in self.queue_status_cumulative: - self.queue_status_cumulative[key]["item"]["multiplicity"] += 1 - else: - item_entry = item.to_dict() - del item_entry["testcase_codename"] - item_entry["multiplicity"] = 1 - entry = {"item": item_entry, "priority": priority, "timestamp": make_timestamp(timestamp)} - self.queue_status_cumulative[key] = entry - return success - - def dequeue(self, operation): - """Remove an item from the queue. - - We need to override dequeue because the operation to dequeue - might have already been extracted, but not yet executed. - - operation (ESOperation) - - """ - try: - queue_entry = super().dequeue(operation) - self._remove_from_cumulative_status(queue_entry) - except KeyError: - with self._current_execution_lock: - for i in range(len(self._currently_executing)): - if self._currently_executing[i] == operation: - del self._currently_executing[i] - return - raise - - def _pop(self, wait=False): - queue_entry = super()._pop(wait=wait) - self._remove_from_cumulative_status(queue_entry) - return queue_entry - - def _remove_from_cumulative_status(self, queue_entry): - # Remove the item from the cumulative status dictionary. - key = queue_entry.item.short_key() + (queue_entry.priority,) - self.queue_status_cumulative[key]["item"]["multiplicity"] -= 1 - if self.queue_status_cumulative[key]["item"]["multiplicity"] == 0: - del self.queue_status_cumulative[key] - - def with_post_finish_lock(func): """Decorator for locking on self.post_finish_lock. @@ -212,18 +66,8 @@ def wrapped(self, *args, **kwargs): return wrapped -class Result: - """An object grouping the results obtained from a worker for an - operation. - - """ - - def __init__(self, job, job_success): - self.job = job - self.job_success = job_success - -class EvaluationService(TriggeredService): +class EvaluationService(Service): """Evaluation service. """ @@ -237,29 +81,11 @@ class EvaluationService(TriggeredService): INVALIDATE_COMPILATION = 0 INVALIDATE_EVALUATION = 1 - # How often we check for stale workers. - WORKER_TIMEOUT_CHECK_TIME = timedelta(seconds=300) - - # How often we check if a worker is connected. - WORKER_CONNECTION_CHECK_TIME = timedelta(seconds=10) - - # How many worker results we accumulate before processing them. - RESULT_CACHE_SIZE = 100 - # The maximum time since the last result before processing. - MAX_FLUSHING_TIME_SECONDS = 2 - def __init__(self, shard, contest_id=None): super().__init__(shard) self.contest_id = contest_id - # Cache holding the results from the worker until they are - # written to the DB. - self.result_cache = FlushingDict( - EvaluationService.RESULT_CACHE_SIZE, - EvaluationService.MAX_FLUSHING_TIME_SECONDS, - self.write_results) - # This lock is used to avoid inserting in the queue (which # itself is already thread-safe) an operation which is already # being processed. Such operation might be in one of the @@ -283,38 +109,28 @@ def __init__(self, shard, contest_id=None): # operations in state 4. self.post_finish_lock = gevent.lock.RLock() + self.queue_service = self.connect_to( + ServiceCoord("QueueService", 0)) self.scoring_service = self.connect_to( ServiceCoord("ScoringService", 0)) - self.add_executor(EvaluationExecutor(self)) - self.start_sweeper(117.0) - - self.add_timeout(self.check_workers_timeout, None, - EvaluationService.WORKER_TIMEOUT_CHECK_TIME - .total_seconds(), - immediately=False) - self.add_timeout(self.check_workers_connection, None, - EvaluationService.WORKER_CONNECTION_CHECK_TIME - .total_seconds(), - immediately=False) - - def submission_enqueue_operations(self, submission): + def get_submission_operations(self, submission): """Push in queue the operations required by a submission. submission (Submission): a submission. - return (int): the number of actually enqueued operations. + return ([ESOperation, int, datetime]): operations to enqueue, together + with priority and timestamp. """ - new_operations = 0 + operations = [] for dataset in get_datasets_to_judge(submission.task): submission_result = submission.get_result(dataset) number_of_operations = 0 for operation, priority, timestamp in submission_get_operations( submission_result, submission, dataset): number_of_operations += 1 - if self.enqueue(operation, priority, timestamp): - new_operations += 1 + operations.append([operation, priority, timestamp]) # If we got 0 operations, but the submission result is to # evaluate, it means that we just need to finalize the @@ -327,83 +143,35 @@ def submission_enqueue_operations(self, submission): submission_result.sa_session.commit() self.evaluation_ended(submission_result) - return new_operations + return operations - def user_test_enqueue_operations(self, user_test): + def get_user_test_operations(self, user_test): """Push in queue the operations required by a user test. user_test (UserTest): a user test. - return (int): the number of actually enqueued operations. + return ([ESOperation, int, datetime]): operations to enqueue, together + with priority and timestamp. """ - new_operations = 0 + operations = [] for dataset in get_datasets_to_judge(user_test.task): for operation, priority, timestamp in user_test_get_operations( user_test, dataset): - if self.enqueue(operation, priority, timestamp): - new_operations += 1 + operations.append([operation, priority, timestamp]) - return new_operations + return operations @with_post_finish_lock - def _missing_operations(self): - """Look in the database for submissions that have not been compiled or - evaluated for no good reasons. Put the missing operation in - the queue. - - """ - counter = 0 - with SessionGen() as session: - - for operation, timestamp, priority in \ - get_submissions_operations(session, self.contest_id): - if self.enqueue(operation, timestamp, priority): - counter += 1 - - for operation, timestamp, priority in \ - get_user_tests_operations(session, self.contest_id): - if self.enqueue(operation, timestamp, priority): - counter += 1 + def enqueue_all(self, operations): + """Enqueue all the operations - return counter - - @rpc_method - def workers_status(self): - """Returns a dictionary (indexed by shard number) whose values - are the information about the corresponding worker. See - WorkerPool.get_status for more details. - - returns (dict): the dict with the workers information. + operations ([ESOperation, int, datetime]): operations, priorities, + timestamps """ - return self.get_executor().pool.get_status() - - def check_workers_timeout(self): - """We ask WorkerPool for the unresponsive workers, and we put - again their operations in the queue. - - """ - lost_operations = self.get_executor().pool.check_timeouts() - for operation in lost_operations: - logger.info("Operation %s put again in the queue because of " - "worker timeout.", operation) - priority, timestamp = operation.side_data + for operation, priority, timestamp in operations: self.enqueue(operation, priority, timestamp) - return True - - def check_workers_connection(self): - """We ask WorkerPool for the unconnected workers, and we put - again their operations in the queue. - - """ - lost_operations = self.get_executor().pool.check_connections() - for operation in lost_operations: - logger.info("Operation %s put again in the queue because of " - "disconnected worker.", operation) - priority, timestamp = operation.side_data - self.enqueue(operation, priority, timestamp) - return True @with_post_finish_lock def enqueue(self, operation, priority, timestamp): @@ -419,64 +187,114 @@ def enqueue(self, operation, priority, timestamp): return (bool): True if pushed, False if not. """ - if operation in self.get_executor() or operation in self.result_cache: - return False - - # enqueue() returns the number of successful pushes. - return super().enqueue(operation, priority, timestamp) > 0 + return self.queue_service.enqueue( + operation=operation.to_dict(), + priority=priority, + timestamp=make_timestamp(timestamp)) @with_post_finish_lock - def action_finished(self, data, shard, error=None): - """Callback from a worker, to signal that is finished some - action (compilation or evaluation). + @rpc_method + def write_single_result(self, operation, job): + """Receive worker results from QS and writes them to the DB. - data (dict): the JobGroup, exported to dict. - shard (int): the shard finishing the action. + operation (ESOperation|dict): operation performed, exported as dict + job (Job|dict): job containing the result, exported as dict """ - # We notify the pool that the worker is available again for - # further work (no matter how the current request turned out, - # even if the worker encountered an error). If the pool - # informs us that the data produced by the worker has to be - # ignored (by returning True) we interrupt the execution of - # this method and do nothing because in that case we know the - # operation has returned to the queue and perhaps already been - # reassigned to another worker. - to_ignore = self.get_executor().pool.release_worker(shard) - if to_ignore is True: - logger.info("Ignored result from worker %s as requested.", shard) - return - - job_group = None - job_group_success = True - if error is not None: - logger.error( - "Received error from Worker (see above), job group lost.") - job_group_success = False - else: + logger.debug("Starting commit process...") + if isinstance(operation, dict): + operation = ESOperation.from_dict(operation) + if isinstance(job, dict): + job = Job.import_from_dict_with_type(job) + + with SessionGen() as session: + type_ = operation.type_ + object_id = operation.object_id + dataset_id = operation.dataset_id + + dataset = Dataset.get_from_id(dataset_id, session) + if dataset is None: + logger.error("Could not find dataset %d in the database.", + dataset_id) + return False, [] + + # Get submission or user test, and their results. + if type_ in [ESOperation.COMPILATION, ESOperation.EVALUATION]: + object_ = Submission.get_from_id(object_id, session) + if object_ is None: + logger.error("Could not find submission %d " + "in the database.", object_id) + return False, [] + object_result = object_.get_result_or_create(dataset) + else: + object_ = UserTest.get_from_id(object_id, session) + object_result = object_.get_result_or_create(dataset) + + + logger.info("Writing result to db for %s", operation) try: - job_group = JobGroup.import_from_dict(data) + new_operations = self.write_results_one_row( + session, object_result, operation, job) + except IntegrityError: + logger.warning( + "Integrity error while inserting worker result.", + exc_info=True) + # This is not an error condition, as the result is already + # in the DB. + return True, [] except Exception: - logger.error("Couldn't build JobGroup for data %s.", data, - exc_info=True) - job_group_success = False - - if job_group_success: - for job in job_group.jobs: - operation = job.operation - if job.success: - logger.info("`%s' succeeded.", operation) - else: - logger.error("`%s' failed, see worker logs and (possibly) " - "sandboxes at '%s'.", - operation, " ".join(job.sandboxes)) - if isinstance(to_ignore, list) and operation in to_ignore: - logger.info("`%s' result ignored as requested", operation) - else: - self.result_cache.add(operation, Result(job, job.success)) + # Defend against any exception. A poisonous results that fails + # here is attempted again without limits, thus can enter in + # all batches to write. Without the catch-all, it will prevent + # the whole batch to be written over and over. See issue #888. + logger.error( + "Unexpected exception while inserting worker result.", + exc_info=True) + return False, [] + + logger.debug("Committing evaluations...") + session.commit() + + # If we collected some new operations to do while writing + # the results, it means we had to invalidate the submission. + # We return immediately since we already have all the operations + # we need to do next. + if new_operations: + return True, [ + [op.to_dict(), + priority, + (timestamp - EvaluationService.EPOCH).total_seconds()] + for op, priority, timestamp in new_operations] + + if type_ == ESOperation.EVALUATION: + if len(object_result.evaluations) == len(dataset.testcases): + object_result.set_evaluation_outcome() + + logger.debug("Committing evaluation outcomes...") + session.commit() + + logger.info("Ending operations...") + if type_ == ESOperation.COMPILATION: + new_operations = self.compilation_ended(object_result) + elif type_ == ESOperation.EVALUATION: + if object_result.evaluated(): + new_operations = self.evaluation_ended(object_result) + elif type_ == ESOperation.USER_TEST_COMPILATION: + new_operations = \ + self.user_test_compilation_ended(object_result) + elif type_ == ESOperation.USER_TEST_EVALUATION: + new_operations = self.user_test_evaluation_ended(object_result) + + logger.debug("Done") + return True, [ + [op.to_dict(), + priority, + (timestamp - EvaluationService.EPOCH).total_seconds()] + for op, priority, timestamp in new_operations] @with_post_finish_lock + @rpc_method def write_results(self, items): """Receive worker results from the cache and writes them to the DB. @@ -486,7 +304,7 @@ def write_results(self, items): retrieving datasets and submission results only once instead of once for every result. - items ([(operation, Result)]): the results received by ES but + items ([(operation: dict, job: dict)]): the results received by ES but not yet written to the db. """ @@ -496,9 +314,12 @@ def write_results(self, items): # operation type (i.e., group together the testcase # evaluations for the same submission and dataset). by_object_and_type = defaultdict(list) - for operation, result in items: + new_operations = [] + for operation, job in items: + operation = ESOperation.from_dict(operation) + job = Job.import_from_dict_with_type(job) t = (operation.type_, operation.object_id, operation.dataset_id) - by_object_and_type[t].append((operation, result)) + by_object_and_type[t].append((operation, job)) with SessionGen() as session: for key, operation_results in by_object_and_type.items(): @@ -526,7 +347,7 @@ def write_results(self, items): continue object_result = object_.get_result_or_create(dataset) - self.write_results_one_object_and_type( + new_operations += self.write_results_one_object_and_type( session, object_result, operation_results) logger.info("Committing evaluations...") @@ -536,10 +357,10 @@ def write_results(self, items): for type_, object_id, dataset_id in by_object_and_type.keys(): if type_ == ESOperation.EVALUATION: if dataset_id not in num_testcases_per_dataset: - num_testcases_per_dataset[dataset_id] = session\ - .query(func.count(Testcase.id))\ + num_testcases_per_dataset[dataset_id] = session \ + .query(func.count(Testcase.id)) \ .filter(Testcase.dataset_id == dataset_id).scalar() - num_evaluations = session\ + num_evaluations = session \ .query(func.count(Evaluation.id)) \ .filter(Evaluation.dataset_id == dataset_id) \ .filter(Evaluation.submission_id == object_id).scalar() @@ -573,9 +394,14 @@ def write_results(self, items): self.user_test_evaluation_ended(user_test_result) logger.info("Done") + return True, [ + [op.to_dict(), + priority, + (timestamp - EvaluationService.EPOCH).total_seconds()] + for op, priority, timestamp in new_operations] def write_results_one_object_and_type( - self, session, object_result, operation_results): + self, session, object_result, operation_results): """Write to the DB the results for one object and type. session (Session): the DB session to use. @@ -586,47 +412,41 @@ def write_results_one_object_and_type( received for the given object_result """ + new_operations = [] for operation, result in operation_results: logger.info("Writing result to db for %s", operation) try: with session.begin_nested(): - self.write_results_one_row( + new_operations += self.write_results_one_row( session, object_result, operation, result) except IntegrityError: logger.warning( "Integrity error while inserting worker result.", exc_info=True) - except Exception: - # Defend against any exception. A poisonous results that fails - # here is attempted again without limits, thus can enter in - # all batches to write. Without the catch-all, it will prevent - # the whole batch to be written over and over. See issue #888. - logger.error( - "Unexpected exception while inserting worker result.", - exc_info=True) + return new_operations - def write_results_one_row(self, session, object_result, operation, result): + def write_results_one_row(self, session, object_result, operation, job): """Write to the DB a single result. session (Session): the DB session to use. object_result (SubmissionResult|UserTestResult): the DB object for the operation (and for the result). operation (ESOperation): the operation for which we have the result. - result (WorkerResult): the result from the worker. + job (Job): the result from the worker. """ if operation.type_ == ESOperation.COMPILATION: - if result.job_success: - result.job.to_submission(object_result) + if job.success: + job.to_submission(object_result) else: object_result.compilation_tries += 1 elif operation.type_ == ESOperation.EVALUATION: - if result.job_success: - result.job.to_submission(object_result) + if job.success: + job.to_submission(object_result) else: - if result.job.plus is not None and \ - result.job.plus.get("tombstone") is True: + if job.plus is not None and \ + job.plus.get("tombstone") is True: executable_digests = [ e.digest for e in object_result.executables.values()] @@ -638,26 +458,28 @@ def write_results_one_row(self, session, object_result, operation, result): object_result.dataset_id) with session.begin_nested(): object_result.invalidate_compilation() - self.submission_enqueue_operations( + return self.get_submission_operations( object_result.submission) else: object_result.evaluation_tries += 1 elif operation.type_ == ESOperation.USER_TEST_COMPILATION: - if result.job_success: - result.job.to_user_test(object_result) + if job.success: + job.to_user_test(object_result) else: object_result.compilation_tries += 1 elif operation.type_ == ESOperation.USER_TEST_EVALUATION: - if result.job_success: - result.job.to_user_test(object_result) + if job.success: + job.to_user_test(object_result) else: object_result.evaluation_tries += 1 else: logger.error("Invalid operation type %r.", operation.type_) + return [] + def compilation_ended(self, submission_result): """Actions to be performed when we have a submission that has ended compilation. In particular: we queue evaluation if @@ -706,7 +528,7 @@ def compilation_ended(self, submission_result): submission_result.compilation_outcome) # Enqueue next steps to be done - self.submission_enqueue_operations(submission) + return self.get_submission_operations(submission) def evaluation_ended(self, submission_result): """Actions to be performed when we have a submission that has @@ -744,7 +566,7 @@ def evaluation_ended(self, submission_result): submission_result.dataset_id) # Enqueue next steps to be done (e.g., if evaluation failed). - self.submission_enqueue_operations(submission) + return self.get_submission_operations(submission) def user_test_compilation_ended(self, user_test_result): """Actions to be performed when we have a user test that has @@ -787,7 +609,7 @@ def user_test_compilation_ended(self, user_test_result): user_test_result.compilation_outcome) # Enqueue next steps to be done - self.user_test_enqueue_operations(user_test) + return self.get_user_test_operations(user_test) def user_test_evaluation_ended(self, user_test_result): """Actions to be performed when we have a user test that has @@ -819,7 +641,7 @@ def user_test_evaluation_ended(self, user_test_result): user_test_result.dataset_id) # Enqueue next steps to be done (e.g., if evaluation failed). - self.user_test_enqueue_operations(user_test) + return self.get_user_test_operations(user_test) @rpc_method def new_submission(self, submission_id): @@ -837,7 +659,7 @@ def new_submission(self, submission_id): "%d in the database.", submission_id) return - self.submission_enqueue_operations(submission) + self.enqueue_all(self.get_submission_operations(submission)) session.commit() @@ -859,7 +681,7 @@ def new_user_test(self, user_test_id): "in the database.", user_test_id) return - self.user_test_enqueue_operations(user_test) + self.enqueue_all(self.get_user_test_operations(user_test)) session.commit() @@ -970,72 +792,8 @@ def invalidate_submission(self, # Finally, we re-enqueue the operations for the # submissions. for submission in submissions: - self.submission_enqueue_operations(submission) + self.enqueue_all(self.get_submission_operations(submission)) session.commit() logger.info("Invalidate successfully completed.") - @rpc_method - def disable_worker(self, shard): - """Disable a specific worker (recovering its assigned operations). - - shard (int): the shard of the worker. - - returns (bool): True if everything went well. - - """ - logger.info("Received request to disable worker %s.", shard) - - lost_operations = [] - try: - lost_operations = self.get_executor().pool.disable_worker(shard) - except ValueError: - return False - - for operation in lost_operations: - logger.info("Operation %s put again in the queue because " - "the worker was disabled.", operation) - priority, timestamp = operation.side_data - self.enqueue(operation, priority, timestamp) - return True - - @rpc_method - def enable_worker(self, shard): - """Enable a specific worker. - - shard (int): the shard of the worker. - - returns (bool): True if everything went well. - - """ - logger.info("Received request to enable worker %s.", shard) - try: - self.get_executor().pool.enable_worker(shard) - except ValueError: - return False - - return True - - @rpc_method - def queue_status(self): - """Return the status of the queue. - - Parent method returns list of queues of each executor, but in - EvaluationService we have only one executor, so we can just take - the first queue. - - As evaluate operations are split by testcases, there are too - many entries in the queue to display, so we collect entries with the - same (type, object_id, dataset_id, priority) tuple. - Generally, we will see only one evaluate operation for each submission - in the queue status. - - The entries are then ordered by priority and timestamp (the - same criteria used to look at what to complete next). - - return ([QueueEntry]): the list with the queued elements. - - """ - return sorted( - self.get_executor().queue_status_cumulative.values(), - key=lambda x: (x["priority"], x["timestamp"])) diff --git a/cms/service/QueueService.py b/cms/service/QueueService.py new file mode 100644 index 0000000000..07c85ca975 --- /dev/null +++ b/cms/service/QueueService.py @@ -0,0 +1,586 @@ +#!/usr/bin/env python3 + +# Contest Management System - http://cms-dev.github.io/ +# Copyright © 2010-2014 Giovanni Mascellani +# Copyright © 2010-2018 Stefano Maggiolo +# Copyright © 2010-2012 Matteo Boscariol +# Copyright © 2013-2015 Luca Wehrstedt +# Copyright © 2013 Bernard Blackham +# Copyright © 2014 Artem Iglikov +# Copyright © 2016 Luca Versari +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +"""Evaluation service. It takes care of receiving submissions from the +contestants, transforming them in operations (compilation, execution, +...), queuing them with the right priority, and dispatching them to +the workers. Also, it collects the results from the workers and build +the current ranking. + +""" + +import logging + +from datetime import datetime, timedelta +from functools import wraps + +import gevent.event +import gevent.lock + +from cms import ServiceCoord, get_service_shards, random_service +from cmscommon.datetime import make_timestamp +from cms.io import Executor, TriggeredService, rpc_method +from cms.db import SessionGen +from cms.grading.Job import JobGroup + +from .esoperations import ESOperation, \ + get_submissions_operations, get_user_tests_operations +from .workerpool import WorkerPool + +logger = logging.getLogger(__name__) + + +class PendingResults(object): + """Class to hold pending results. + + A result is pending when it arrives from a worker and it has not + yet be written. This includes results not yet sent to ES, or + results sent but for which an acknowledgment of the write has not + yet been received. + + """ + + def __init__(self): + # Lock for pending results and writes data structures. + self._lock = gevent.lock.RLock() + # Event for waking up the result dispatcher greenlet when + # there is a new result to send. + self._event = gevent.event.Event() + # A map from operation to job, containing the operations for + # which we have the result from a Worker, but haven't been sent + # to EvaluationService for writing. + self._results = dict() + # A set containing the operations sent to EvaluationService + # for writing, but not yet written. + self._writes = set() + + def __contains__(self, operation): + """Return true if the operation is pending + + The operation is pending either if it is still waiting to be + sent to ES or if it was sent and waiting for confirmation that + it was written. + + """ + with self._lock: + return operation in self._results or operation in self._writes + + def wait(self): + """Wait until there is a result available.""" + self._event.wait() + + def add_result(self, operation, job): + """Add one result to the pending operations. + + operation (ESOperation): the operation performed. + job (Job): job containing the result. + + """ + with self._lock: + self._results[operation] = job + self._event.set() + + def pop(self): + """Extract one of the pending result for writing. + + return ((ESOperation, Job)): operation and results (contained in the + job). + + raise (IndexError): if no results are available + + """ + with self._lock: + if len(self._results) == 0: + raise IndexError("No results available.") + operation, job = self._results.popitem() + if len(self._results) == 0: + self._event.clear() + self._writes.add(operation) + return operation, job + + def finalize(self, operation): + """Mark the operation as fully completed and written.""" + with self._lock: + self._writes.remove(operation) + + +class EvaluationExecutor(Executor): + + # Real maximum number of operations to be sent to a worker. + MAX_OPERATIONS_PER_BATCH = 25 + + def __init__(self, evaluation_service): + """Create the single executor for ES. + + The executor just delegates work to the worker pool. + + """ + super().__init__(True) + + self.evaluation_service = evaluation_service + self.pool = WorkerPool(self.evaluation_service) + + # List of QueueItem (ESOperation) we have extracted from the + # queue, but not yet finished to execute. + self._currently_executing = [] + + # Lock used to guard the currently executing operations + self._current_execution_lock = gevent.lock.RLock() + + # As evaluate operations are split by testcases, there are too + # many entries in the queue to display, so we just take only one + # operation of each (type, object_id, dataset_id, priority) tuple. + # This dictionary maps any such tuple to a "queue entry" (lacking + # the testcase codename) and keeps track of multiplicity. + self.queue_status_cumulative = dict() + + for i in range(get_service_shards("Worker")): + worker = ServiceCoord("Worker", i) + self.pool.add_worker(worker) + + def __contains__(self, item): + """Return whether the item is in execution. + + item (QueueItem): an item to search. + + return (bool): True if item is in the queue, or if it is the + item already extracted but not given to the workers yet, + or if it is being executed by a worker. + + """ + return (super().__contains__(item) + or item in self._currently_executing + or item in self.pool) + + def max_operations_per_batch(self): + """Return the maximum number of operations per batch. + + We derive the number from the length of the queue divided by + the number of workers, with a cap at MAX_OPERATIONS_PER_BATCH. + + """ + # TODO: len(self.pool) is the total number of workers, + # included those that are disabled. + ratio = len(self._operation_queue) // len(self.pool) + 1 + ret = min(max(ratio, 1), EvaluationExecutor.MAX_OPERATIONS_PER_BATCH) + logger.info("Ratio is %d, executing %d operations together.", + ratio, ret) + return ret + + def execute(self, entries): + """Execute a batch of operations in the queue. + + The operations might not be executed immediately because of + lack of workers. + + entries ([QueueEntry]): entries containing the operations to + perform. + + """ + with self._current_execution_lock: + self._currently_executing = [] + for entry in entries: + operation = entry.item + + # Side data is attached to the operation sent to the + # worker pool. In case the operation is lost, the pool + # will return it to us, and we will use it to + # re-enqueue it. + operation.side_data = (entry.priority, entry.timestamp) + self._currently_executing.append(operation) + while len(self._currently_executing) > 0: + self.pool.wait_for_workers() + with self._current_execution_lock: + if len(self._currently_executing) == 0: + break + res = self.pool.acquire_worker(self._currently_executing) + if res is not None: + self._currently_executing = [] + break + + def enqueue(self, item, priority=None, timestamp=None): + success = super().enqueue(item, priority, timestamp) + if success: + # Add the item to the cumulative status dictionary. + key = item.short_key() + (priority,) + if key in self.queue_status_cumulative: + self.queue_status_cumulative[key]["item"]["multiplicity"] += 1 + else: + item_entry = item.to_dict() + del item_entry["testcase_codename"] + item_entry["multiplicity"] = 1 + entry = {"item": item_entry, "priority": priority, "timestamp": make_timestamp(timestamp)} + self.queue_status_cumulative[key] = entry + return success + + def dequeue(self, operation): + """Remove an item from the queue. + + We need to override dequeue because the operation to dequeue + might have already been extracted, but not yet executed. + + operation (ESOperation) + + """ + try: + queue_entry = super().dequeue(operation) + self._remove_from_cumulative_status(queue_entry) + except KeyError: + with self._current_execution_lock: + for i in range(len(self._currently_executing)): + if self._currently_executing[i] == operation: + del self._currently_executing[i] + return + raise + + def _pop(self, wait=False): + queue_entry = super()._pop(wait=wait) + self._remove_from_cumulative_status(queue_entry) + return queue_entry + + def _remove_from_cumulative_status(self, queue_entry): + # Remove the item from the cumulative status dictionary. + key = queue_entry.item.short_key() + (queue_entry.priority,) + self.queue_status_cumulative[key]["item"]["multiplicity"] -= 1 + if self.queue_status_cumulative[key]["item"]["multiplicity"] == 0: + del self.queue_status_cumulative[key] + + +def with_post_finish_lock(func): + """Decorator for locking on self.post_finish_lock. + + Ensures that no more than one decorated function is executing at + the same time. + + """ + @wraps(func) + def wrapped(self, *args, **kwargs): + with self.post_finish_lock: + return func(self, *args, **kwargs) + return wrapped + + +class QueueService(TriggeredService): + """Queue service. + + """ + + # How often we check for stale workers. + WORKER_TIMEOUT_CHECK_TIME = timedelta(seconds=300) + + # How often we check if a worker is connected. + WORKER_CONNECTION_CHECK_TIME = timedelta(seconds=10) + + def __init__(self, shard, contest_id=None): + super().__init__(shard) + + self.contest_id = contest_id + + # This lock is used to avoid inserting in the queue (which + # itself is already thread-safe) an operation which is already + # being processed. Such operation might be in one of the + # following state: + # 1. in the queue; + # 2. extracted from the queue by the executor, but not yet + # dispatched to a worker; + # 3. being processed by a worker ("in the worker pool"); + # 4. being processed by action_finished, but with the results + # not yet written to the database. + # 5. with results written in the database. + # + # The methods enqueuing operations already check that the + # operation is not in state 5, and enqueue() checks that it is + # not in the first three states. + # + # Therefore, the lock guarantees that the methods adding + # operations to the queue (_missing_operations, + # invalidate_submission, enqueue) are not executed + # concurrently with action_finished to avoid picking + # operations in state 4. + self.post_finish_lock = gevent.lock.RLock() + + # Data structure holding pending results. + self.pending = PendingResults() + # Neverending greenlet consuming results, by sending them to ES. + gevent.spawn(self.process_results) + + self.evaluation_services = [ + self.connect_to(ServiceCoord("EvaluationService", i)) + for i in range(get_service_shards("EvaluationService"))] + + self.add_executor(EvaluationExecutor(self)) + self.start_sweeper(117.0) + + self.add_timeout(self.check_workers_timeout, None, + QueueService.WORKER_TIMEOUT_CHECK_TIME + .total_seconds(), + immediately=False) + self.add_timeout(self.check_workers_connection, None, + QueueService.WORKER_CONNECTION_CHECK_TIME + .total_seconds(), + immediately=False) + + @with_post_finish_lock + def _missing_operations(self): + """Look in the database for submissions that have not been compiled or + evaluated for no good reasons. Put the missing operation in + the queue. + + """ + counter = 0 + with SessionGen() as session: + + for operation, timestamp, priority in \ + get_submissions_operations(session, self.contest_id): + if self.enqueue(operation, timestamp, priority): + counter += 1 + + for operation, timestamp, priority in \ + get_user_tests_operations(session, self.contest_id): + if self.enqueue(operation, timestamp, priority): + counter += 1 + + return counter + + @rpc_method + def workers_status(self): + """Returns a dictionary (indexed by shard number) whose values + are the information about the corresponding worker. See + WorkerPool.get_status for more details. + + returns (dict): the dict with the workers information. + + """ + return self.get_executor().pool.get_status() + + def check_workers_timeout(self): + """We ask WorkerPool for the unresponsive workers, and we put + again their operations in the queue. + + """ + lost_operations = self.get_executor().pool.check_timeouts() + for operation in lost_operations: + logger.info("Operation %s put again in the queue because of " + "worker timeout.", operation) + priority, timestamp = operation.side_data + self.enqueue(operation, priority, timestamp) + return True + + def check_workers_connection(self): + """We ask WorkerPool for the unconnected workers, and we put + again their operations in the queue. + + """ + lost_operations = self.get_executor().pool.check_connections() + for operation in lost_operations: + logger.info("Operation %s put again in the queue because of " + "disconnected worker.", operation) + priority, timestamp = operation.side_data + self.enqueue(operation, priority, timestamp) + return True + + @with_post_finish_lock + @rpc_method + def enqueue(self, operation, priority, timestamp): + """Push an operation in the queue. + + Push an operation in the operation queue if the submission is + not already in the queue or assigned to a worker. + + operation (ESOperation|list): the operation to put in the queue. + priority (int): the priority of the operation. + timestamp (datetime|float): the time of the submission. + + return (bool): True if pushed, False if not. + + """ + if not isinstance(timestamp, datetime): + timestamp = datetime.utcfromtimestamp(timestamp) + if isinstance(operation, dict): + operation = ESOperation.from_dict(operation) + + if operation in self.get_executor() or operation in self.pending: + return False + + # enqueue() returns the number of successful pushes. + return super().enqueue(operation, priority, timestamp) > 0 + + @with_post_finish_lock + def action_finished(self, data, shard, error=None): + """Callback from a worker, to signal that is finished some + action (compilation or evaluation). + + data (dict): the JobGroup, exported to dict. + shard (int): the shard finishing the action. + + """ + # We notify the pool that the worker is available again for + # further work (no matter how the current request turned out, + # even if the worker encountered an error). If the pool + # informs us that the data produced by the worker has to be + # ignored (by returning True) we interrupt the execution of + # this method and do nothing because in that case we know the + # operation has returned to the queue and perhaps already been + # reassigned to another worker. + to_ignore = self.get_executor().pool.release_worker(shard) + if to_ignore is True: + logger.info("Ignored result from worker %s as requested.", shard) + return + + job_group = None + job_group_success = True + if error is not None: + logger.error( + "Received error from Worker (see above), job group lost.") + job_group_success = False + + else: + try: + job_group = JobGroup.import_from_dict(data) + except Exception: + logger.error("Couldn't build JobGroup for data %s.", data, + exc_info=True) + job_group_success = False + + if job_group_success: + for job in job_group.jobs: + operation = job.operation + if job.success: + logger.info("`%s' succeeded.", operation) + else: + logger.error("`%s' failed, see worker logs and (possibly) " + "sandboxes at '%s'.", + operation, " ".join(job.sandboxes)) + if isinstance(to_ignore, list) and operation in to_ignore: + logger.info("`%s' result ignored as requested", operation) + else: + self.pending.add_result(operation, job) + + def process_results(self): + """A background greenlet continuously sending results to ES.""" + while True: + self.pending.wait() + try: + operation, job = self.pending.pop() + except IndexError: + continue + + + logger.info("Sending results for operation %s to ES.", operation) + try: + random_service(self.evaluation_services).write_single_result( + operation=operation.to_dict(), + job=job.export_to_dict(), + callback=self.result_written, + plus=operation) + except IndexError: + logger.error("No EvaluationServices are connected, " + "result will be discarded") + + def result_written(self, success_new_operations, operation, error=None): + success, new_operations = None, [] + if success_new_operations is not None: + success, new_operations = success_new_operations + + logger.info("Result for operation %s written, success: %s", + operation, success) + try: + self.pending.finalize(operation) + except KeyError: + logger.warning("Operation written %s was not pending, ignoring.", + operation) + if error is not None: + logger.warning("Operation %s writing error (%s); re-enqueuing.", + operation, error) + priority, timestamp = operation.side_data + self.enqueue(operation, priority, timestamp) + else: + for new_operation, priority, timestamp in new_operations: + self.enqueue( + ESOperation.from_dict(new_operation), priority, timestamp) + + @rpc_method + def disable_worker(self, shard): + """Disable a specific worker (recovering its assigned operations). + + shard (int): the shard of the worker. + + returns (bool): True if everything went well. + + """ + logger.info("Received request to disable worker %s.", shard) + + lost_operations = [] + try: + lost_operations = self.get_executor().pool.disable_worker(shard) + except ValueError: + return False + + for operation in lost_operations: + logger.info("Operation %s put again in the queue because " + "the worker was disabled.", operation) + priority, timestamp = operation.side_data + self.enqueue(operation, priority, timestamp) + return True + + @rpc_method + def enable_worker(self, shard): + """Enable a specific worker. + + shard (int): the shard of the worker. + + returns (bool): True if everything went well. + + """ + logger.info("Received request to enable worker %s.", shard) + try: + self.get_executor().pool.enable_worker(shard) + except ValueError: + return False + + return True + + @rpc_method + def queue_status(self): + """Return the status of the queue. + + Parent method returns list of queues of each executor, but in + QueueService we have only one executor, so we can just take + the first queue. + + As evaluate operations are split by testcases, there are too + many entries in the queue to display, so we collect entries with the + same (type, object_id, dataset_id, priority) tuple. + Generally, we will see only one evaluate operation for each submission + in the queue status. + + The entries are then ordered by priority and timestamp (the + same criteria used to look at what to complete next). + + return ([QueueEntry]): the list with the queued elements. + + """ + return sorted( + self.get_executor().queue_status_cumulative.values(), + key=lambda x: (x["priority"], x["timestamp"])) diff --git a/cms/service/esoperations.py b/cms/service/esoperations.py index 76aaa78223..08a6699371 100644 --- a/cms/service/esoperations.py +++ b/cms/service/esoperations.py @@ -510,6 +510,7 @@ def from_dict(d): d["dataset_id"], d["testcase_codename"]) + def __eq__(self, other): # We may receive a non-ESOperation other when comparing with # operations in the worker pool (as these may also be unicode or @@ -566,3 +567,5 @@ def short_key(self): return (str(self.type_), str(self.object_id), str(self.dataset_id)) + + diff --git a/cms/util.py b/cms/util.py index 5c5aab78a5..8392e0fd84 100644 --- a/cms/util.py +++ b/cms/util.py @@ -27,6 +27,7 @@ import netifaces import os import pwd +import random import stat import sys @@ -186,6 +187,18 @@ def get_service_shards(service): return i +def random_service(services): + """Return a random connected service. + + service ([Service]): a list of services. + returns (Service): a random, connected service. + + raise (IndexError): if there are no connected services. + + """ + return random.choice([s for s in services if s.connected]) + + def default_argument_parser(description, cls, ask_contest=None): """Default argument parser for services. diff --git a/cmstestsuite/testrunner.py b/cmstestsuite/testrunner.py index 33e128cfef..aa5a602733 100644 --- a/cmstestsuite/testrunner.py +++ b/cmstestsuite/testrunner.py @@ -41,7 +41,8 @@ class TestRunner: # Tell pytest not to collect this class as test __test__ = False - def __init__(self, test_list, contest_id=None, workers=1, cpu_limits=None): + def __init__(self, test_list, contest_id=None, workers=1, cpu_limits=None, + evaluations=4): self.start_time = datetime.datetime.now() self.last_end_time = self.start_time @@ -61,6 +62,7 @@ def __init__(self, test_list, contest_id=None, workers=1, cpu_limits=None): self.num_users = 0 self.workers = workers + self.evaluations = evaluations if CONFIG["TEST_DIR"] is not None: # Set up our expected environment. @@ -306,7 +308,9 @@ def submit_tests(self, concurrent_submit_and_eval=True): # send the notification for all submissions. self.ps.start("ContestWebServer", contest=self.contest_id) if concurrent_submit_and_eval: - self.ps.start("EvaluationService", contest=self.contest_id) + self.ps.start("QueueService", contest=self.contest_id) + for shard in range(self.evaluations): + self.ps.start("EvaluationService", shard, contest=self.contest_id) self.ps.wait() self.ps.start("ProxyService", contest=self.contest_id) diff --git a/config/cms.conf.sample b/config/cms.conf.sample index 8bb0d730b8..ac40a159cf 100644 --- a/config/cms.conf.sample +++ b/config/cms.conf.sample @@ -23,7 +23,9 @@ "ResourceService": [["localhost", 28000]], "ScoringService": [["localhost", 28500]], "Checker": [["localhost", 22000]], - "EvaluationService": [["localhost", 25000]], + "EvaluationService": [["localhost", 25010], + ["localhost", 25011]], + "QueueService": [["localhost", 25000]], "Worker": [["localhost", 26000], ["localhost", 26001], ["localhost", 26002], diff --git a/scripts/cmsQueueService b/scripts/cmsQueueService new file mode 100755 index 0000000000..1fc0ba39c7 --- /dev/null +++ b/scripts/cmsQueueService @@ -0,0 +1,56 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- +# +# Contest Management System - http://cms-dev.github.io/ +# Copyright © 2017 Stefano Maggiolo +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from __future__ import absolute_import +from __future__ import print_function +from __future__ import unicode_literals + +# We enable monkey patching to make many libraries gevent-friendly +# (for instance, urllib3, used by requests) +import gevent.monkey +gevent.monkey.patch_all() + +import logging +import sys + +from cms import ConfigError, default_argument_parser +from cms.db import ask_for_contest, test_db_connection +from cms.service.QueueService import QueueService + + +logger = logging.getLogger(__name__) + + +def main(): + """Parse arguments and launch service. + + """ + test_db_connection() + success = default_argument_parser( + "Submission's compiler and evaluator for CMS.", + QueueService).run() + return 0 if success is True else 1 + + +if __name__ == "__main__": + try: + sys.exit(main()) + except ConfigError as error: + logger.critical(error.message) + sys.exit(1) From 790a75efe35805c30e83a265b7ca2aa90d4c0504 Mon Sep 17 00:00:00 2001 From: Amir Keivan Mohtashami Date: Sat, 20 May 2017 11:36:10 +0430 Subject: [PATCH 02/10] Temporal fix for submission invalidation --- cms/service/EvaluationService.py | 6 ++++-- cms/service/QueueService.py | 10 ++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/cms/service/EvaluationService.py b/cms/service/EvaluationService.py index 16255f4408..5c3d3b7f49 100644 --- a/cms/service/EvaluationService.py +++ b/cms/service/EvaluationService.py @@ -755,11 +755,13 @@ def invalidate_submission(self, level, submissions, dataset_id) for operation in operations: try: - self.dequeue(operation) + self.queue_service.dequeue(operation=operation.to_dict()) except KeyError: pass # Ok, the operation wasn't in the queue. try: - self.get_executor().pool.ignore_operation(operation) + self.queue_service.ignore_operation( + operation=operation.to_dict() + ) except LookupError: pass # Ok, the operation wasn't in the pool. diff --git a/cms/service/QueueService.py b/cms/service/QueueService.py index 07c85ca975..dc9bd214bc 100644 --- a/cms/service/QueueService.py +++ b/cms/service/QueueService.py @@ -561,6 +561,16 @@ def enable_worker(self, shard): return True + @rpc_method + def dequeue(self, operation): + self.get_executor().dequeue(ESOperation.from_dict(operation)) + + @rpc_method + def ignore_operation(self, operation): + self.get_executor().pool.ignore_operation( + ESOperation.from_dict(operation) + ) + @rpc_method def queue_status(self): """Return the status of the queue. From 34a25a84aa854ae7da419cdba550de06c77d1c7d Mon Sep 17 00:00:00 2001 From: Amir Keivan Mohtashami Date: Sun, 21 May 2017 21:33:18 +0430 Subject: [PATCH 03/10] Decreased to single operation per batch --- cms/service/QueueService.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cms/service/QueueService.py b/cms/service/QueueService.py index dc9bd214bc..d8732eab13 100644 --- a/cms/service/QueueService.py +++ b/cms/service/QueueService.py @@ -128,7 +128,7 @@ def finalize(self, operation): class EvaluationExecutor(Executor): # Real maximum number of operations to be sent to a worker. - MAX_OPERATIONS_PER_BATCH = 25 + MAX_OPERATIONS_PER_BATCH = 1 def __init__(self, evaluation_service): """Create the single executor for ES. From 9b9dde13fce33bda5af16b8d4ded2acc1be6ac4d Mon Sep 17 00:00:00 2001 From: Amirkeivan Mohtashami Date: Tue, 21 Feb 2023 20:41:45 +0100 Subject: [PATCH 04/10] fix invalidation --- cms/server/admin/rpc_authorization.py | 2 +- .../templates/macro/reevaluation_buttons.html | 4 +- cms/service/EvaluationService.py | 120 +----------------- cms/service/QueueService.py | 115 ++++++++++++++++- 4 files changed, 119 insertions(+), 122 deletions(-) diff --git a/cms/server/admin/rpc_authorization.py b/cms/server/admin/rpc_authorization.py index e000e0d26f..739c22a0b7 100644 --- a/cms/server/admin/rpc_authorization.py +++ b/cms/server/admin/rpc_authorization.py @@ -41,7 +41,7 @@ ("ResourceService", "toggle_autorestart"), ("QueueService", "enable_worker"), ("QueueService", "disable_worker"), - ("EvaluationService", "invalidate_submission"), + ("QueueService", "invalidate_submission"), ("ScoringService", "invalidate_submission"), ] diff --git a/cms/server/admin/templates/macro/reevaluation_buttons.html b/cms/server/admin/templates/macro/reevaluation_buttons.html index 7e769d0cd9..d37cf49c74 100644 --- a/cms/server/admin/templates/macro/reevaluation_buttons.html +++ b/cms/server/admin/templates/macro/reevaluation_buttons.html @@ -36,7 +36,7 @@ {% set invalidate_arguments = {"contest_id": contest_id} %} {% endif %}