From bcd0ccf4976ce257dacaceddab03bb73bd4fb659 Mon Sep 17 00:00:00 2001 From: Alex Boden Date: Thu, 16 Jan 2025 19:59:46 -0500 Subject: [PATCH] init --- RunningJob.py | 35 ++++++-- main.py | 215 +++++++++++++++++++++++++++++++++++--------------- 2 files changed, 177 insertions(+), 73 deletions(-) diff --git a/RunningJob.py b/RunningJob.py index 719e638..23f46bf 100644 --- a/RunningJob.py +++ b/RunningJob.py @@ -1,18 +1,37 @@ -from typing import List +from typing import List, Optional +from datetime import datetime class RunningJob: - def __init__(self, job_id: int, slurm_job_id: int, workflow_name: str, job_name: str, labels: List[str]): - """Class to represent a running Github Actions Job on Slurm.""" + def __init__( + self, + job_id: int, + slurm_job_id: Optional[int], + workflow_name: str, + job_name: str, + labels: List[str], + start_time: Optional[datetime] = None + ): + """ + Class to represent a running GitHub Actions Job on Slurm. + + :param job_id: The GitHub Actions job ID. + :param slurm_job_id: The corresponding SLURM job ID (if allocated). + :param workflow_name: Name of the workflow that launched this job. + :param job_name: Name of this specific job. + :param labels: Labels associated with the job (e.g. 'slurm-runner-medium'). + :param start_time: When the job began, if known. Defaults to None. + """ self.job_id = job_id self.slurm_job_id = slurm_job_id self.workflow_name = workflow_name self.job_name = job_name self.labels = labels + self.start_time = start_time def __str__(self) -> str: - return (f"RunningJob(job_id = {self.job_id}, slurm_job_id = {self.slurm_job_id}, " - f"workflow_name = {self.workflow_name}, " - f"job_name = {self.job_name}, labels = {self.labels})") - + return (f"RunningJob(job_id={self.job_id}, slurm_job_id={self.slurm_job_id}, " + f"workflow_name={self.workflow_name}, job_name={self.job_name}, " + f"labels={self.labels}, start_time={self.start_time})") + def __repr__(self) -> str: - return self.__str__() \ No newline at end of file + return self.__str__() diff --git a/main.py b/main.py index 7f9c799..060cb6c 100644 --- a/main.py +++ b/main.py @@ -6,13 +6,17 @@ import os import sys import threading + from dotenv import load_dotenv +from prometheus_client import start_http_server, Counter, Gauge, Summary from runner_size_config import get_runner_resources from config import GITHUB_API_BASE_URL, GITHUB_REPO_URL, ALLOCATE_RUNNER_SCRIPT_PATH from RunningJob import RunningJob from KubernetesLogFormatter import KubernetesLogFormatter + +# ------------------------------------------ # Configure logging logger = logging.getLogger() log_formatter = KubernetesLogFormatter() @@ -45,15 +49,38 @@ queued_workflows_url = f'{GITHUB_API_BASE_URL}/actions/runs?status=queued' # Global tracking for allocated runners -allocated_jobs = {} # Maps job_id -> RunningJob - +allocated_jobs = {} # Maps job_id -> RunningJob POLLED_WITHOUT_ALLOCATING = False +# =========== PROMETHEUS METRICS ============= + +# Track how many jobs are allocated/running on each machine, labeled by 'machine' and 'job_size'. +CURRENT_ALLOCATED_JOBS = Gauge( + 'slurm_runner_allocated_jobs', + 'Number of jobs currently allocated to SLURM Runner per machine', + labelnames=['machine', 'job_size'] +) + +# Count total completed jobs, labeled by 'machine' and 'job_size' +COMPLETED_JOBS = Counter( + 'slurm_runner_jobs_completed', + 'Number of completed SLURM Runner jobs', + labelnames=['machine', 'job_size'] +) + +# Track job duration in seconds, labeled by 'machine' and 'job_size' +JOB_DURATION_SUMMARY = Summary( + 'slurm_runner_job_duration_seconds', + 'Duration of SLURM runner job in seconds', + labelnames=['machine', 'job_size'] +) + def get_gh_api(url, token, etag=None): """ Sends a GET request to the GitHub API with the given URL and access token. - If the rate limit is exceeded, the function will wait until the rate limit is reset before returning. + If the rate limit is exceeded, the function will wait until the rate limit + is reset before returning. """ try: headers = {'Authorization': f'token {token}', 'Accept': 'application/vnd.github.v3+json'} @@ -65,14 +92,17 @@ def get_gh_api(url, token, etag=None): if int(response.headers.get('X-RateLimit-Remaining', '0')) % 100 == 0: logger.info(f"Rate Limit Remaining: {response.headers['X-RateLimit-Remaining']}") + if response.status_code == 304: return None, etag elif response.status_code == 200: new_etag = response.headers.get('ETag') return response.json(), new_etag - elif response.status_code == 403 and 'X-RateLimit-Remaining' in response.headers and response.headers['X-RateLimit-Remaining'] == '0': + elif (response.status_code == 403 + and 'X-RateLimit-Remaining' in response.headers + and response.headers['X-RateLimit-Remaining'] == '0'): reset_time = int(response.headers['X-RateLimit-Reset']) - sleep_time = reset_time - time.time() + 1 # Adding 1 second to ensure the reset has occurred + sleep_time = reset_time - time.time() + 1 # Add 1s for safety logger.warning(f"Rate limit exceeded. Waiting for {sleep_time} seconds.") time.sleep(sleep_time) return get_gh_api(url, token, etag) # Retry the request @@ -106,11 +136,11 @@ def poll_github_actions_and_allocate_runners(url, token, sleep_time=5): def get_all_jobs(workflow_id, token): """ - Get all CI jobs for a given workflow ID by iterating through the paginated API response. + Get all CI jobs for a given workflow ID by iterating through the paginated API response. """ all_jobs = [] page = 1 - per_page = 100 # Maximum number of jobs per page according to rate limits + per_page = 100 while True: try: @@ -119,15 +149,15 @@ def get_all_jobs(workflow_id, token): if job_data and 'jobs' in job_data: all_jobs.extend(job_data['jobs']) if len(job_data['jobs']) < per_page: - break # No more pages + break page += 1 logger.info(f"Getting jobs for workflow {workflow_id} page {page}") else: logger.error(f"Failed to get job data for workflow {workflow_id}") - break # No more data or error occurred + break except Exception as e: logger.error(f"Exception occurred in get_all_jobs for workflow_id {workflow_id}: {e}") - break # Decide whether to continue or break + break return all_jobs @@ -138,7 +168,6 @@ def allocate_runners_for_jobs(workflow_data, token): return number_of_queued_workflows = len(workflow_data["workflow_runs"]) - for i in range(number_of_queued_workflows): workflow_id = workflow_data["workflow_runs"][i]["id"] branch = workflow_data["workflow_runs"][i]["head_branch"] @@ -154,21 +183,23 @@ def allocate_runners_for_jobs(workflow_data, token): allocate_actions_runner(queued_job_id, token) except Exception as e: logger.error(f"Exception occurred in allocate_runners_for_jobs for workflow_id {workflow_id}: {e}") - continue def allocate_actions_runner(job_id, token): """ - Allocates a runner for the given job ID by sending a POST request to the GitHub API to get a registration token. - Proceeds to submit a SLURM job to allocate the runner with the corresponding resources. + Allocates a runner for the given job ID by sending a POST request to the GitHub API + to get a registration token, then submits a SLURM job. """ if job_id in allocated_jobs: logger.info(f"Runner already allocated for job {job_id}") return + logger.info(f"Allocating runner for job {job_id}") global POLLED_WITHOUT_ALLOCATING POLLED_WITHOUT_ALLOCATING = False - allocated_jobs[job_id] = None # mark as allocated to prevent double allocation + + # Put a placeholder entry so we don't double-allocate + allocated_jobs[job_id] = None try: # get the runner registration token @@ -202,28 +233,37 @@ def allocate_actions_runner(job_id, token): run_id = data['run_id'] - allocated_jobs[job_id] = RunningJob(job_id, None, data['workflow_name'], data['name'], labels) - if "slurm-runner" not in labels[0]: - logger.info(f"Skipping job because it is not for the correct runner. labels: {labels}, labels[0]: {labels[0]}") + logger.info(f"Skipping job because it is not for a slurm-runner. labels: {labels}, labels[0]: {labels[0]}") del allocated_jobs[job_id] return runner_size_label = labels[0] - - logger.info(f"Using runner size label: {runner_size_label}") runner_resources = get_runner_resources(runner_size_label) - # sbatch resource allocation command + # Create a RunningJob with start_time; machine_name unknown yet + allocated_jobs[job_id] = RunningJob( + job_id=job_id, + slurm_job_id=None, + workflow_name=data['workflow_name'], + job_name=data['name'], + labels=labels, + start_time=datetime.now(), + machine_name=None # We will fill it once we parse sacct + ) + + # We do NOT increment CURRENT_ALLOCATED_JOBS here because we do not yet know the node (machine). + # We wait until sacct tells us the actual 'NodeList'. + + # Submit the SLURM job command = [ "sbatch", - # f"--nodelist=thor-slurm1", f"--job-name=slurm-{runner_size_label}-{job_id}", f"--mem-per-cpu={runner_resources['mem-per-cpu']}", f"--cpus-per-task={runner_resources['cpu']}", f"--gres=tmpdisk:{runner_resources['tmpdisk']}", f"--time={runner_resources['time']}", - ALLOCATE_RUNNER_SCRIPT_PATH, # allocate-ephemeral-runner-from-docker.sh + ALLOCATE_RUNNER_SCRIPT_PATH, GITHUB_REPO_URL, registration_token, removal_token, @@ -232,104 +272,149 @@ def allocate_actions_runner(job_id, token): ] logger.info(f"Running command: {' '.join(command)}") - result = subprocess.run(command, capture_output=True, text=True) output = result.stdout.strip() error_output = result.stderr.strip() logger.info(f"Command stdout: {output}") logger.error(f"Command stderr: {error_output}") - try: - slurm_job_id = int(output.split()[-1]) # output is of the form "Submitted batch job 3828" - allocated_jobs[job_id] = RunningJob(job_id, slurm_job_id, data['workflow_name'], data['name'], labels) - logger.info(f"Allocated runner for job {allocated_jobs[job_id]} with SLURM job ID {slurm_job_id}.") - if result.returncode != 0: + + if result.returncode == 0: + # Typically: "Submitted batch job 3828" + try: + slurm_job_id = int(output.split()[-1]) + allocated_jobs[job_id].slurm_job_id = slurm_job_id + logger.info(f"Allocated runner for job {allocated_jobs[job_id]} with SLURM job ID {slurm_job_id}.") + except (IndexError, ValueError) as e: + logger.error(f"Failed to parse SLURM job ID: {output}, error: {e}") + # Clean up del allocated_jobs[job_id] - logger.error(f"Failed to allocate runner for job {job_id}.") - allocate_actions_runner(job_id, token) - except (IndexError, ValueError) as e: - logger.error(f"Failed to parse SLURM job ID from command output: {output}. Error: {e}") + else: + logger.error(f"Failed to allocate runner for job {job_id}. Retrying allocation...") del allocated_jobs[job_id] - # retry the job allocation allocate_actions_runner(job_id, token) + except Exception as e: - logger.error(f"Exception occurred in allocate_actions_runner for job_id {job_id}: {e}") + logger.error(f"Exception in allocate_actions_runner for job_id {job_id}: {e}") if job_id in allocated_jobs: del allocated_jobs[job_id] - # Decide whether to retry or not def check_slurm_status(): """ Checks the status of SLURM jobs and updates the allocated_jobs dictionary. + We parse the NodeList from sacct to get the actual machine name. + + If a job is in RUNNING (or PENDING) on a node for the first time, we increment the + CURRENT_ALLOCATED_JOBS gauge labeled by that node. + If the job is completed/failed/cancelled, we decrement and update the relevant metrics. """ if not allocated_jobs: return + # We'll store jobs to remove after they're completed/failed to_remove = [] + + # Copy the dict so we don't mutate while iterating frozen_jobs = allocated_jobs.copy() for job_id, runningjob in frozen_jobs.items(): + # If we have no slurm_job_id, skip if not runningjob or not runningjob.slurm_job_id: continue - # Use 'sacct' to get job status and start time for a single job - sacct_cmd = ['sacct', '-n', '-P', '-o', 'JobID,State,Start,End', '--jobs', str(runningjob.slurm_job_id)] + sacct_cmd = [ + 'sacct', + '-n', + '-P', + '-o', 'JobID,State,Start,End,NodeList', + '--jobs', str(runningjob.slurm_job_id) + ] + try: sacct_result = subprocess.run(sacct_cmd, capture_output=True, text=True) sacct_output = sacct_result.stdout.strip() if sacct_result.returncode != 0: - logger.error(f"sacct command failed with return code {sacct_result.returncode}") - if sacct_result.stderr: - logger.error(f"Error output: {sacct_result.stderr}") + logger.error(f"sacct command failed (rc={sacct_result.returncode}). Stderr: {sacct_result.stderr}") continue + # Example lines: "3828|RUNNING|2023-01-16T10:05:00|2023-01-16T10:10:05|node01" for line in sacct_output.split('\n'): parts = line.split('|') - if line == '' or len(parts) < 4: - continue # Sometimes it takes a while for the job to appear in the sacct output - - job_component = parts[0] # e.g., '3840.batch' - status = parts[1] + if len(parts) < 5: + continue + + job_component = parts[0] # e.g. '3828' + job_state = parts[1] start_time_str = parts[2] end_time_str = parts[3] + node_list = parts[4] # e.g. "node01" + # Skip .batch, .extern, etc. We only want the main job ID line if '.batch' in job_component or '.extern' in job_component: continue - # Convert time strings to datetime objects - - if status.startswith('COMPLETED') or status.startswith('FAILED') or status.startswith('CANCELLED') or status.startswith('TIMEOUT'): + # If we haven't recorded the machine_name, do it now + if runningjob.machine_name is None and node_list: + runningjob.machine_name = node_list + # Because the job is now placed on a node, increment the gauge + runner_size_label = runningjob.labels[0] if runningjob.labels else "unknown" + CURRENT_ALLOCATED_JOBS.labels(machine=node_list, job_size=runner_size_label).inc() + + # If job is completed/failed/canceled, we finalize + if job_state.startswith('COMPLETED') or job_state.startswith('FAILED') \ + or job_state.startswith('CANCELLED') or job_state.startswith('TIMEOUT'): + runner_size_label = runningjob.labels[0] if runningjob.labels else "unknown" + machine_name = runningjob.machine_name or "unknown" + + # Calculate duration from sacct Start->End if possible + duration_seconds = 0 try: - start_time = datetime.strptime(start_time_str, '%Y-%m-%dT%H:%M:%S') - end_time = datetime.strptime(end_time_str, '%Y-%m-%dT%H:%M:%S') - except Exception as e: - logger.error(f"Error parsing start/end time for job {job_component}: {e}") - start_time = None - end_time = None - duration = "[Unknown Duration]" - - if start_time and end_time: - duration = end_time - start_time - logger.info(f"Slurm job {job_component} {status} in {duration}. Running Job Info: {str(runningjob)}") + st_dt = datetime.strptime(start_time_str, '%Y-%m-%dT%H:%M:%S') + end_dt = datetime.strptime(end_time_str, '%Y-%m-%dT%H:%M:%S') + duration_seconds = (end_dt - st_dt).total_seconds() + except ValueError: + logger.warning(f"Cannot parse start/end time from sacct for job {job_id}") + + logger.info(f"Slurm job {job_id} is {job_state} on {machine_name}, took {duration_seconds}s.") + + # Prometheus metrics + JOB_DURATION_SUMMARY.labels(machine=machine_name, job_size=runner_size_label).observe(duration_seconds) + COMPLETED_JOBS.labels(machine=machine_name, job_size=runner_size_label).inc() + CURRENT_ALLOCATED_JOBS.labels(machine=machine_name, job_size=runner_size_label).dec() + + # Mark for removal to_remove.append(job_id) except Exception as e: - logger.error(f"Error querying SLURM job details for job ID {runningjob.slurm_job_id}: {e}") + logger.error(f"Error running sacct for job {job_id}: {e}") + # Remove completed/failed/cancelled jobs for job_id in to_remove: - del allocated_jobs[job_id] + if job_id in allocated_jobs: + del allocated_jobs[job_id] + def poll_slurm_statuses(sleep_time=5): """ - Wrapper function to poll check_slurm_status. + Periodically checks SLURM statuses and updates metrics. """ while True: check_slurm_status() time.sleep(sleep_time) + if __name__ == "__main__": - # Need to use threading to achieve simultaneous polling - github_thread = threading.Thread(target=poll_github_actions_and_allocate_runners, args=(queued_workflows_url, GITHUB_ACCESS_TOKEN, 2)) + # Expose Prometheus metrics on port 8000 + start_http_server(8000) + logger.info("Prometheus metrics server started on port 8000.") + + # Two threads: + # 1) Poll GitHub for queued jobs + github_thread = threading.Thread( + target=poll_github_actions_and_allocate_runners, + args=(queued_workflows_url, GITHUB_ACCESS_TOKEN, 2) + ) + # 2) Poll SLURM for job statuses slurm_thread = threading.Thread(target=poll_slurm_statuses) github_thread.start()