Skip to content

Commit

Permalink
add try except for handling github api
Browse files Browse the repository at this point in the history
  • Loading branch information
alexboden committed Nov 29, 2024
1 parent 4f8b55e commit be58f17
Showing 1 changed file with 161 additions and 122 deletions.
283 changes: 161 additions & 122 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,90 +49,113 @@

POLLED_WITHOUT_ALLOCATING = False


def get_gh_api(url, token, etag=None):
"""
Sends a GET request to the GitHub API with the given URL and access token.
If the rate limit is exceeded, the function will wait until the rate limit is reset before returning.
"""

headers = {'Authorization': f'token {token}', 'Accept': 'application/vnd.github.v3+json'}
if (etag):
headers['If-None-Match'] = etag

response = requests.get(url, headers=headers)
if 'X-RateLimit-Remaining' in response.headers:
if int(response.headers['X-RateLimit-Remaining']) % 100 == 0:
logger.info(f"Rate Limit Remaining: {response.headers['X-RateLimit-Remaining']}")

if response.status_code == 304:
return None, etag
elif response.status_code == 200:
new_etag = response.headers.get('ETag')
return response.json(), new_etag
elif response.status_code == 403 and 'X-RateLimit-Remaining' in response.headers and response.headers['X-RateLimit-Remaining'] == '0':
reset_time = int(response.headers['X-RateLimit-Reset'])
sleep_time = reset_time - time.time() + 1 # Adding 1 second to ensure the reset has occurred
logger.warning(f"Rate limit exceeded. Waiting for {sleep_time} seconds.")
time.sleep(sleep_time)
return get_gh_api(url, token, etag) # Retry the request
else:
logger.error(f"Unexpected status code: {response.status_code}")
response.raise_for_status() # Handle HTTP errors
try:
headers = {'Authorization': f'token {token}', 'Accept': 'application/vnd.github.v3+json'}
if etag:
headers['If-None-Match'] = etag

response = requests.get(url, headers=headers)
response.raise_for_status()

if int(response.headers.get('X-RateLimit-Remaining', '0')) % 100 == 0:
logger.info(f"Rate Limit Remaining: {response.headers['X-RateLimit-Remaining']}")
if response.status_code == 304:
return None, etag
elif response.status_code == 200:
new_etag = response.headers.get('ETag')
return response.json(), new_etag
elif response.status_code == 403 and 'X-RateLimit-Remaining' in response.headers and response.headers['X-RateLimit-Remaining'] == '0':
reset_time = int(response.headers['X-RateLimit-Reset'])
sleep_time = reset_time - time.time() + 1 # Adding 1 second to ensure the reset has occurred
logger.warning(f"Rate limit exceeded. Waiting for {sleep_time} seconds.")
time.sleep(sleep_time)
return get_gh_api(url, token, etag) # Retry the request
else:
logger.error(f"Unexpected status code: {response.status_code}")
return None, etag
except requests.exceptions.RequestException as e:
logger.error(f"Exception occurred while calling GitHub API: {e}")
return None, etag


def poll_github_actions_and_allocate_runners(url, token, sleep_time=5):
etag = None
while True:
data, _ = get_gh_api(url, token, etag)
if data:
allocate_runners_for_jobs(data, token)
global POLLED_WITHOUT_ALLOCATING
if not POLLED_WITHOUT_ALLOCATING:
logger.info("Polling for queued workflows...")
POLLED_WITHOUT_ALLOCATING = True
time.sleep(sleep_time)
try:
data, etag = get_gh_api(url, token, etag)
if data:
allocate_runners_for_jobs(data, token)
global POLLED_WITHOUT_ALLOCATING
if not POLLED_WITHOUT_ALLOCATING:
logger.info("Polling for queued workflows...")
POLLED_WITHOUT_ALLOCATING = True
else:
logger.debug("No new data from GitHub API.")
time.sleep(sleep_time)
except Exception as e:
logger.error(f"Exception occurred in poll_github_actions_and_allocate_runners: {e}")
time.sleep(sleep_time)
continue


def get_all_jobs(workflow_id, token):
"""
Get all CI jobs for a given workflow ID by iterating through the paginated API response.
"""
"""
all_jobs = []
page = 1
per_page = 100 # Maximum number of jobs per page according to rate limits

while True:
url = f"{GITHUB_API_BASE_URL}/actions/runs/{workflow_id}/jobs?per_page={per_page}&page={page}"
job_data, _ = get_gh_api(url, token)
if job_data and 'jobs' in job_data:
all_jobs.extend(job_data['jobs'])
if len(job_data['jobs']) < per_page:
break # No more pages
page += 1
logger.info(f"Getting jobs for workflow {workflow_id} page {page}")
else:
break # No more data
try:
url = f"{GITHUB_API_BASE_URL}/actions/runs/{workflow_id}/jobs?per_page={per_page}&page={page}"
job_data, _ = get_gh_api(url, token)
if job_data and 'jobs' in job_data:
all_jobs.extend(job_data['jobs'])
if len(job_data['jobs']) < per_page:
break # No more pages
page += 1
logger.info(f"Getting jobs for workflow {workflow_id} page {page}")
else:
logger.error(f"Failed to get job data for workflow {workflow_id}")
break # No more data or error occurred
except Exception as e:
logger.error(f"Exception occurred in get_all_jobs for workflow_id {workflow_id}: {e}")
break # Decide whether to continue or break

return all_jobs


def allocate_runners_for_jobs(workflow_data, token):
if "workflow_runs" not in workflow_data:
logger.error("No workflows found.")
return

number_of_queued_workflows = workflow_data["total_count"]

number_of_queued_workflows = len(workflow_data["workflow_runs"])

for i in range(number_of_queued_workflows):
workflow_id = workflow_data["workflow_runs"][i]["id"]
# logger.info(f"Evaluating workflow ID: {workflow_id}")
branch = workflow_data["workflow_runs"][i]["head_branch"]
if branch != "alexboden/test-slurm-gha-runner" and branch != "alexboden/test-ci-apptainer":
continue
job_data = get_all_jobs(workflow_id, token)
for job in job_data:
if job["status"] == "queued":
queued_job_id = job["id"]
allocate_actions_runner(queued_job_id, token)
try:
job_data = get_all_jobs(workflow_id, token)
if not job_data:
logger.error(f"No job data retrieved for workflow {workflow_id}")
continue
for job in job_data:
if job["status"] == "queued":
queued_job_id = job["id"]
allocate_actions_runner(queued_job_id, token)
except Exception as e:
logger.error(f"Exception occurred in allocate_runners_for_jobs for workflow_id {workflow_id}: {e}")
continue


def allocate_actions_runner(job_id, token):
Expand All @@ -146,78 +169,94 @@ def allocate_actions_runner(job_id, token):
logger.info(f"Allocating runner for job {job_id}")
global POLLED_WITHOUT_ALLOCATING
POLLED_WITHOUT_ALLOCATING = False
allocated_jobs[job_id] = None # mark as allocated to prevent double allocation

# get the runner registration token
headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}

data = requests.post(f'{GITHUB_API_BASE_URL}/actions/runners/registration-token', headers=headers)
registration_token = data.json()["token"]

time.sleep(1) # https://docs.github.com/en/rest/using-the-rest-api/best-practices-for-using-the-rest-api?apiVersion=2022-11-28#pause-between-mutative-requests

data = requests.post(f'{GITHUB_API_BASE_URL}/actions/runners/remove-token', headers=headers)
removal_token = data.json()["token"]

data, _ = get_gh_api(f'{GITHUB_API_BASE_URL}/actions/jobs/{job_id}', token)
labels = data["labels"] # should only be one label in prod
logger.info(f"Job labels: {labels}")

run_id = data['run_id']

allocated_jobs[job_id] = RunningJob(job_id, None, data['workflow_name'], data['name'], labels)

if "slurm-runner" not in labels[0]:
logger.info(f"Skipping job because it is not for the correct runner. labels: {labels}, labels[0]: {labels[0]}")
del allocated_jobs[job_id]
return

runner_size_label = labels[0]

logger.info(f"Using runner size label: {runner_size_label}")
runner_resources = get_runner_resources(runner_size_label)

# sbatch resource allocation command
command = [
"sbatch",
# f"--nodelist=thor-slurm1",
f"--job-name=slurm-{runner_size_label}-{job_id}",
f"--mem-per-cpu={runner_resources['mem-per-cpu']}",
f"--cpus-per-task={runner_resources['cpu']}",
f"--gres=tmpdisk:{runner_resources['tmpdisk']}",
f"--time={runner_resources['time']}",
ALLOCATE_RUNNER_SCRIPT_PATH,
GITHUB_REPO_URL,
registration_token,
removal_token,
','.join(labels),
str(run_id)
]

logger.info(f"Running command: {' '.join(command)}")

result = subprocess.run(command, capture_output=True, text=True)
output = result.stdout.strip()
error_output = result.stderr.strip()
logger.info(f"Command stdout: {output}")
logger.error(f"Command stderr: {error_output}")
allocated_jobs[job_id] = None # mark as allocated to prevent double allocation

try:
slurm_job_id = int(output.split()[-1]) # output is of the form "Submitted batch job 3828"
allocated_jobs[job_id] = RunningJob(job_id, slurm_job_id, data['workflow_name'], data['name'], labels)
logger.info(f"Allocated runner for job {allocated_jobs[job_id]} with SLURM job ID {slurm_job_id}.")
if result.returncode != 0:
# get the runner registration token
headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}

response = requests.post(f'{GITHUB_API_BASE_URL}/actions/runners/registration-token', headers=headers)
response.raise_for_status()
data = response.json()
registration_token = data["token"]

time.sleep(1) # https://docs.github.com/en/rest/using-the-rest-api/best-practices-for-using-the-rest-api?apiVersion=2022-11-28#pause-between-mutative-requests
response = requests.post(f'{GITHUB_API_BASE_URL}/actions/runners/remove-token', headers=headers)
response.raise_for_status()
data = response.json()
removal_token = data["token"]

data, _ = get_gh_api(f'{GITHUB_API_BASE_URL}/actions/jobs/{job_id}', token)
if not data:
logger.error(f"Failed to retrieve job data for job_id {job_id}")
del allocated_jobs[job_id]
logger.error(f"Failed to allocate runner for job {job_id}.")
allocate_actions_runner(job_id, token)
except (IndexError, ValueError) as e:
logger.error(f"Failed to parse SLURM job ID from command output: {output}. Error: {e}")
del allocated_jobs[job_id]
# retry the job allocation
allocate_actions_runner(job_id, token)
return
labels = data.get("labels", [])
if not labels:
logger.error(f"No labels found for job_id {job_id}")
del allocated_jobs[job_id]
return
logger.info(f"Job labels: {labels}")

run_id = data['run_id']

allocated_jobs[job_id] = RunningJob(job_id, None, data['workflow_name'], data['name'], labels)

if "slurm-runner" not in labels[0]:
logger.info(f"Skipping job because it is not for the correct runner. labels: {labels}, labels[0]: {labels[0]}")
del allocated_jobs[job_id]
return

runner_size_label = labels[0]

logger.info(f"Using runner size label: {runner_size_label}")
runner_resources = get_runner_resources(runner_size_label)

# sbatch resource allocation command
command = [
"sbatch",
# f"--nodelist=thor-slurm1",
f"--job-name=slurm-{runner_size_label}-{job_id}",
f"--mem-per-cpu={runner_resources['mem-per-cpu']}",
f"--cpus-per-task={runner_resources['cpu']}",
f"--gres=tmpdisk:{runner_resources['tmpdisk']}",
f"--time={runner_resources['time']}",
ALLOCATE_RUNNER_SCRIPT_PATH, # allocate-ephemeral-runner-from-docker.sh
GITHUB_REPO_URL,
registration_token,
removal_token,
','.join(labels),
str(run_id)
]

logger.info(f"Running command: {' '.join(command)}")

result = subprocess.run(command, capture_output=True, text=True)
output = result.stdout.strip()
error_output = result.stderr.strip()
logger.info(f"Command stdout: {output}")
logger.error(f"Command stderr: {error_output}")
try:
slurm_job_id = int(output.split()[-1]) # output is of the form "Submitted batch job 3828"
allocated_jobs[job_id] = RunningJob(job_id, slurm_job_id, data['workflow_name'], data['name'], labels)
logger.info(f"Allocated runner for job {allocated_jobs[job_id]} with SLURM job ID {slurm_job_id}.")
if result.returncode != 0:
del allocated_jobs[job_id]
logger.error(f"Failed to allocate runner for job {job_id}.")
allocate_actions_runner(job_id, token)
except (IndexError, ValueError) as e:
logger.error(f"Failed to parse SLURM job ID from command output: {output}. Error: {e}")
del allocated_jobs[job_id]
# retry the job allocation
allocate_actions_runner(job_id, token)
except Exception as e:
logger.error(f"Exception occurred in allocate_actions_runner for job_id {job_id}: {e}")
if job_id in allocated_jobs:
del allocated_jobs[job_id]
# Decide whether to retry or not


def check_slurm_status():
Expand Down Expand Up @@ -289,7 +328,7 @@ def poll_slurm_statuses(sleep_time=5):
time.sleep(sleep_time)

if __name__ == "__main__":
# need to use threading to achieve simultaneous polling
# Need to use threading to achieve simultaneous polling
github_thread = threading.Thread(target=poll_github_actions_and_allocate_runners, args=(queued_workflows_url, GITHUB_ACCESS_TOKEN, 2))
slurm_thread = threading.Thread(target=poll_slurm_statuses)

Expand Down

0 comments on commit be58f17

Please sign in to comment.