From f48cb40a4fc645bf072dba0782c074c1dcd5a566 Mon Sep 17 00:00:00 2001 From: Zach Wolfenbarger Date: Mon, 10 Jun 2024 13:03:33 -0500 Subject: [PATCH 1/3] Batch Aggregation (#785) * Add Celery and a test route * Add new dependencies * Test task tests * Docker updates * Scripts folder * Setup deploy to test env * Link redis container via docker * Modify test task * Add redis service to test workflow * Hook up services * Fix test arguments * flake8 * newline * rename and refactor * Taking a swing at extraction * oops * update .gitignore * Remove deploy files * Update .gitignore * Clean up test tests * Add router tests * Extremely placeholder BA lib tests * Only override local import * First few batch agg specs * Updates to BatchAggregation & tests * less flake8y * Add final POST message to Panoptes * Flake * flake * Pull etag before atempting update * Remove unnecessary mocks * Assert result set, not method called * clean up spec mocks * Add permissions checking, fix some specs, refactor Panoptes update * Flake * Use os.path for platform independence * Undeleting deploy template --- .github/workflows/python-versions.yml | 15 +- .gitignore | 2 + Dockerfile | 6 +- docker-compose.yml | 51 +++++- kubernetes/deployment-production.tmpl | 2 +- panoptes_aggregation/batch_aggregation.py | 173 ++++++++++++++++++ panoptes_aggregation/routes.py | 20 ++ .../tests/batch_aggregation/__init__.py | 0 .../tests/batch_aggregation/cls_export.csv | 8 + .../test_batch_aggregation.py | 155 ++++++++++++++++ .../tests/batch_aggregation/wf_export.csv | 7 + .../tests/router_tests/test_routes.py | 24 +++ pyproject.toml | 7 +- make_docs.sh => scripts/make_docs.sh | 0 scripts/start-celery.sh | 3 + start-flask.sh => scripts/start-flask.sh | 0 scripts/start-flower.sh | 4 + 17 files changed, 469 insertions(+), 8 deletions(-) create mode 100644 panoptes_aggregation/batch_aggregation.py create mode 100644 panoptes_aggregation/tests/batch_aggregation/__init__.py create mode 100644 panoptes_aggregation/tests/batch_aggregation/cls_export.csv create mode 100644 panoptes_aggregation/tests/batch_aggregation/test_batch_aggregation.py create mode 100644 panoptes_aggregation/tests/batch_aggregation/wf_export.csv rename make_docs.sh => scripts/make_docs.sh (100%) create mode 100755 scripts/start-celery.sh rename start-flask.sh => scripts/start-flask.sh (100%) create mode 100755 scripts/start-flower.sh diff --git a/.github/workflows/python-versions.yml b/.github/workflows/python-versions.yml index 74a3890e..f0a78b17 100644 --- a/.github/workflows/python-versions.yml +++ b/.github/workflows/python-versions.yml @@ -9,6 +9,16 @@ on: jobs: build: runs-on: ubuntu-latest + services: + redis: + image: redis + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 6379:6379 strategy: matrix: python-version: [3.8, 3.9, "3.10", "3.11"] @@ -30,11 +40,14 @@ jobs: - name: Run tests env: TRAVIS: true # one test is skipped on CI and looks for this env value + REDIS_HOST: redis + CELERY_BROKER_URL: redis://localhost:6379/0 + CELERY_RESULT_BACKEND: redis://localhost:6379/0 run: | coverage run coverage report - name: Coveralls if: ${{ matrix.python-version == 3.10 }} - env: + env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: coveralls --service=github diff --git a/.gitignore b/.gitignore index f35986e6..040a57ca 100644 --- a/.gitignore +++ b/.gitignore @@ -113,3 +113,5 @@ endpoints.yml .vscode/ .noseids +tmp/* +.DS_Store \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 5740f767..587ac9e6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,10 +21,12 @@ COPY . . RUN pip install -U .[online,test,doc] # make documentation -RUN /bin/bash -lc ./make_docs.sh +RUN /bin/bash -lc ./scripts/make_docs.sh + +ADD ./ /usr/src/aggregation ARG REVISION='' ENV REVISION=$REVISION # load configs and start flask app -CMD ["bash", "./start-flask.sh"] +CMD ["bash", "./scripts/start-flask.sh"] diff --git a/docker-compose.yml b/docker-compose.yml index aaf8ae2b..b1c2acd5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,7 +6,7 @@ services: args: REVISION: fake-git-sha-id volumes: - - ./panoptes_aggregation:/usr/src/aggregation/panoptes_aggregation + - ./:/usr/src/aggregation - ~/.aws:/root/.aws environment: - AWS_REGION=${AWS_REGION} @@ -14,6 +14,51 @@ services: - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN} - AWS_SECURITY_TOKEN=${AWS_SECURITY_TOKEN} - - LISTEN_PORT=5000 + - CELERY_BROKER_URL=redis://redis:6379/0 + - CELERY_RESULT_BACKEND=redis://redis:6379/0 + - FLASK_DEBUG=1 + - FLASK_ENV=development + - LISTEN_PORT=4000 ports: - - "5000:5000" + - "4000:4000" + links: + - redis:redis + + worker: + build: + context: ./ + args: + REVISION: fake-git-sha-id + command: celery --app panoptes_aggregation.tasks.celery worker --loglevel=info + volumes: + - ./:/usr/src/aggregation + environment: + - FLASK_DEBUG=1 + - APP_SETTINGS=project.server.config.DevelopmentConfig + - CELERY_BROKER_URL=redis://redis:6379/0 + - CELERY_RESULT_BACKEND=redis://redis:6379/0 + - FLASK_ENV=development + links: + - redis:redis + depends_on: + - redis + + dashboard: + build: . + command: celery --app panoptes_aggregation.tasks.celery flower --port=5555 --broker=redis://redis:6379/0 + ports: + - 5556:5555 + environment: + - FLASK_DEBUG=1 + - APP_SETTINGS=project.server.config.DevelopmentConfig + - CELERY_BROKER_URL=redis://redis:6379/0 + - CELERY_RESULT_BACKEND=redis://redis:6379/0 + links: + - redis:redis + depends_on: + - redis + - worker + + redis: + image: redis + command: redis-server --appendonly yes \ No newline at end of file diff --git a/kubernetes/deployment-production.tmpl b/kubernetes/deployment-production.tmpl index e97d4b88..c54084fe 100644 --- a/kubernetes/deployment-production.tmpl +++ b/kubernetes/deployment-production.tmpl @@ -117,4 +117,4 @@ spec: ports: - protocol: TCP port: 80 - targetPort: 80 + targetPort: 80 \ No newline at end of file diff --git a/panoptes_aggregation/batch_aggregation.py b/panoptes_aggregation/batch_aggregation.py new file mode 100644 index 00000000..330b11f2 --- /dev/null +++ b/panoptes_aggregation/batch_aggregation.py @@ -0,0 +1,173 @@ +from celery import Celery +import json +import pandas as pd +import os +import sys +import urllib3 +from shutil import make_archive +import uuid + +from azure.storage.blob import BlobServiceClient + +from panoptes_client import Panoptes, Project, Workflow +from panoptes_aggregation.workflow_config import workflow_extractor_config +from panoptes_aggregation.scripts import batch_utils + +celery = Celery(__name__) +celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379") +celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379") + + +@celery.task(name="run_aggregation") +def run_aggregation(project_id, workflow_id, user_id): + ba = BatchAggregator(project_id, workflow_id, user_id) + + if not ba.check_permission(): + print(f'Batch Aggregation: Unauthorized attempt by user {user_id} to aggregate workflow {workflow_id}') + # Exit the task gracefully without retrying or erroring + sys.exit() + + ba.save_exports() + + ba.process_wf_export(ba.wf_csv) + cls_df = ba.process_cls_export(ba.cls_csv) + + extractor_config = workflow_extractor_config(ba.tasks) + extracted_data = batch_utils.batch_extract(cls_df, extractor_config) + + batch_standard_reducers = { + 'question_extractor': ['question_reducer', 'question_consensus_reducer'], + 'survey_extractor': ['survey_reducer'] + } + + for task_type, extract_df in extracted_data.items(): + extract_df.to_csv(f'{ba.output_path}/{ba.workflow_id}_{task_type}.csv') + reducer_list = batch_standard_reducers[task_type] + reduced_data = {} + + for reducer in reducer_list: + # This is an override. The workflow_reducer_config method returns a config object + # that is incompatible with the batch_utils batch_reduce method + reducer_config = {'reducer_config': {reducer: {}}} + reduced_data[reducer] = batch_utils.batch_reduce(extract_df, reducer_config) + # filename = f'{ba.output_path}/{ba.workflow_id}_reductions.csv' + filename = os.path.join(ba.output_path, ba.workflow_id, '_reductions.csv') + reduced_data[reducer].to_csv(filename, mode='a') + + # Upload zip & reduction files to blob storage + ba.upload_files() + + # This could catch PanoptesAPIException, but what to do if it fails? + success_attrs = {'uuid': ba.id, 'status': 'completed'} + ba.update_panoptes(success_attrs) + + # STDOUT messages get printed to kubernetes logs + print(f'Batch Aggregation: Run successful for workflow {workflow_id} by user {user_id}') + + +class BatchAggregator: + """ + Bunch of stuff to manage a batch aggregation run + """ + + def __init__(self, project_id, workflow_id, user_id): + self.project_id = project_id + self.workflow_id = workflow_id + self.user_id = user_id + self._generate_uuid() + self._connect_api_client() + + def save_exports(self): + self.output_path = os.path.join('tmp', str(self.workflow_id)) + os.mkdir(self.output_path) + + cls_export = Workflow(self.workflow_id).describe_export('classifications') + full_cls_url = cls_export['media'][0]['src'] + cls_file = os.path.join(self.output_path, f'{self.workflow_id}_cls_export.csv') + + self._download_export(full_cls_url, cls_file) + + wf_export = Project(self.project_id).describe_export('workflows') + full_wf_url = wf_export['media'][0]['src'] + wf_file = os.path.join(self.output_path, f'{self.workflow_id}_workflow_export.csv') + self._download_export(full_wf_url, wf_file) + + self.cls_csv = cls_file + self.wf_csv = wf_file + return {'classifications': cls_file, 'workflows': wf_file} + + def process_wf_export(self, wf_csv): + self.wf_df = pd.read_csv(wf_csv) + self.wf_maj_version = self.wf_df.query(f'workflow_id == {self.workflow_id}')['version'].max() + self.wf_min_version = self.wf_df.query(f'workflow_id == {self.workflow_id} & version == {self.wf_maj_version}')['minor_version'].max() + self.workflow_version = f'{self.wf_maj_version}.{self.wf_min_version}' + self.workflow_row = self.wf_df.query(f'workflow_id == {self.workflow_id} & minor_version == {self.wf_min_version}') + self.tasks = json.loads(self.workflow_row.iloc[0]['tasks']) + return self.wf_df + + def process_cls_export(self, cls_csv): + cls_df = pd.read_csv(cls_csv) + self.cls_df = cls_df.query(f'workflow_version == {self.workflow_version}') + return self.cls_df + + def connect_blob_storage(self): + connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING') + self.blob_service_client = BlobServiceClient.from_connection_string(connect_str) + self.blob_service_client.create_container(name=self.id) + + def upload_file_to_storage(self, container_name, filepath): + blob = filepath.split('/')[-1] + blob_client = self.blob_service_client.get_blob_client(container=container_name, blob=blob) + with open(file=filepath, mode="rb") as data: + blob_client.upload_blob(data, overwrite=True) + + def upload_files(self): + self.connect_blob_storage() + reductions_file = os.path.join(self.output_path, f'{self.workflow_id}_reductions.csv') + self.upload_file_to_storage(self.id, reductions_file) + zippath = os.path.join('tmp', self.id) + zipfile = make_archive(zippath, 'zip', self.output_path) + self.upload_file_to_storage(self.id, zipfile) + + def update_panoptes(self, body_attributes): + # An Aggregation class can be added to the python client to avoid doing this manually + params = {'workflow_id': self.workflow_id} + response = Panoptes.client().get('/aggregations', params=params) + agg_id = response[0]['aggregations'][0]['id'] + fresh_etag = response[1] + + Panoptes.client().put( + f'/aggregations/{agg_id}', + etag=fresh_etag, + json={'aggregations': body_attributes} + ) + + def check_permission(self): + project = Project.find(self.project_id) + permission = False + for user in project.collaborators(): + if user.id == self.user_id: + permission = True + return permission + + def _generate_uuid(self): + self.id = uuid.uuid4().hex + + def _download_export(self, url, filepath): + http = urllib3.PoolManager() + r = http.request('GET', url, preload_content=False) + with open(filepath, 'wb') as out: + while True: + data = r.read(65536) + if not data: + break + out.write(data) + r.release_conn() + + def _connect_api_client(self): + # connect to the API only once for this function request + Panoptes.connect( + endpoint=os.getenv('PANOPTES_URL', 'https://panoptes.zooniverse.org/'), + client_id=os.getenv('PANOPTES_CLIENT_ID'), + client_secret=os.getenv('PANOPTES_CLIENT_SECRET') + ) diff --git a/panoptes_aggregation/routes.py b/panoptes_aggregation/routes.py index 857e99ce..4a8e9daf 100644 --- a/panoptes_aggregation/routes.py +++ b/panoptes_aggregation/routes.py @@ -15,8 +15,10 @@ from panoptes_aggregation import reducers from panoptes_aggregation import extractors from panoptes_aggregation import running_reducers +from panoptes_aggregation import batch_aggregation from panoptes_aggregation import __version__ import numpy as np +from celery.result import AsyncResult # see https://stackoverflow.com/a/75666126 @@ -115,6 +117,24 @@ def index(): for route, route_function in panoptes.panoptes.items(): application.route('/panoptes/{0}'.format(route), methods=['POST', 'PUT'])(lambda: route_function(request.args.to_dict(), request.get_json())) + @application.route('/run_aggregation', methods=['POST']) + def run_aggregation(): + content = request.json + project_id = content['project_id'] + workflow_id = content['workflow_id'] + user_id = content['user_id'] + task = batch_aggregation.run_aggregation.delay(project_id, workflow_id, user_id) + return json.dumps({"task_id": task.id}), 202 + + @application.route('/tasks/', methods=['GET']) + def get_status(task_id): + task_result = AsyncResult(task_id) + result = { + 'task_id': task_id, + 'task_status': task_result.status + } + return jsonify(result), 200 + @application.route('/docs') def web_docs(): return application.send_static_file('index.html') diff --git a/panoptes_aggregation/tests/batch_aggregation/__init__.py b/panoptes_aggregation/tests/batch_aggregation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/panoptes_aggregation/tests/batch_aggregation/cls_export.csv b/panoptes_aggregation/tests/batch_aggregation/cls_export.csv new file mode 100644 index 00000000..dd2ed4e4 --- /dev/null +++ b/panoptes_aggregation/tests/batch_aggregation/cls_export.csv @@ -0,0 +1,8 @@ +classification_id,user_name,user_id,user_ip,workflow_id,workflow_name,workflow_version,created_at,gold_standard,expert,metadata,annotations,subject_data,subject_ids +543695319,not-logged-in-b644753d0e3948f81dc2,,b644753d0e3948f81dc2,10,Superluminous Supernovae,16.55,2024-02-23 16:17:42 UTC,,,"{""source"":""api"",""session"":""7a1f4a17d190291faa1824be3b3febf1d8b77a4f2d25dd6f191f76ef335684bf"",""viewport"":{""width"":1710,""height"":948},""started_at"":""2024-02-23T16:16:35.085Z"",""user_agent"":""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36"",""utc_offset"":""18000"",""finished_at"":""2024-02-23T16:17:42.334Z"",""live_project"":true,""interventions"":{""opt_in"":false,""messageShown"":false},""user_language"":""en"",""subject_dimensions"":[{""clientWidth"":558,""clientHeight"":419,""naturalWidth"":1200,""naturalHeight"":900},{""clientWidth"":150,""clientHeight"":150,""naturalWidth"":300,""naturalHeight"":300}],""subject_selection_state"":{""retired"":false,""selected_at"":""2024-02-23T16:16:35.003Z"",""already_seen"":false,""selection_state"":""normal"",""finished_workflow"":false,""user_has_finished_workflow"":false},""workflow_translation_id"":""28176""}","[{""task"":""T0"",""task_label"":""**Has the lightcurve been rising for more than 20 days?**\n\nNOTE: The y-axis shows the magnitude (brightness) of the supernova. In astronomy smaller magnitudes are brighter!"",""value"":""Yes""},{""task"":""T1"",""task_label"":""Is the cross-hair in the image close to a faint, fuzzy galaxy?"",""value"":""Yes""}]","{""96588114"":{""retired"":{""id"":125510348,""workflow_id"":10,""classifications_count"":10,""created_at"":""2024-02-21T09:44:28.745Z"",""updated_at"":""2024-02-23T16:17:42.488Z"",""retired_at"":""2024-02-23T16:17:42.479Z"",""subject_id"":96588114,""retirement_reason"":""classification_count""},""ramean"":123.7681641625,""ZTF_URL"":""https://lasair-ztf.lsst.ac.uk/objects/ZTF23abjrdem"",""decmean"":57.182124325000004,""objectId"":""ZTF23abjrdem""}}",96588114 +543695340,not-logged-in-b644753d0e3948f81dc2,,b644753d0e3948f81dc2,10,Superluminous Supernovae,16.55,2024-02-23 16:17:48 UTC,,,"{""source"":""api"",""session"":""7a1f4a17d190291faa1824be3b3febf1d8b77a4f2d25dd6f191f76ef335684bf"",""viewport"":{""width"":1710,""height"":948},""started_at"":""2024-02-23T16:17:42.351Z"",""user_agent"":""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36"",""utc_offset"":""18000"",""finished_at"":""2024-02-23T16:17:48.539Z"",""live_project"":true,""interventions"":{""opt_in"":false,""messageShown"":false},""user_language"":""en"",""subject_dimensions"":[{""clientWidth"":558,""clientHeight"":419,""naturalWidth"":1200,""naturalHeight"":900},{""clientWidth"":300,""clientHeight"":300,""naturalWidth"":300,""naturalHeight"":300}],""subject_selection_state"":{""retired"":false,""selected_at"":""2024-02-23T16:16:35.003Z"",""already_seen"":false,""selection_state"":""normal"",""finished_workflow"":false,""user_has_finished_workflow"":false},""workflow_translation_id"":""28176""}","[{""task"":""T0"",""task_label"":""**Has the lightcurve been rising for more than 20 days?**\n\nNOTE: The y-axis shows the magnitude (brightness) of the supernova. In astronomy smaller magnitudes are brighter!"",""value"":""No""}]","{""96588105"":{""retired"":{""id"":125510263,""workflow_id"":10,""classifications_count"":10,""created_at"":""2024-02-21T09:43:24.074Z"",""updated_at"":""2024-02-23T16:17:48.696Z"",""retired_at"":""2024-02-23T16:17:48.686Z"",""subject_id"":96588105,""retirement_reason"":""classification_count""},""ramean"":157.50762885625,""ZTF_URL"":""https://lasair-ztf.lsst.ac.uk/objects/ZTF22abycniv"",""decmean"":8.21724599375,""objectId"":""ZTF22abycniv""}}",96588105 +543695374,not-logged-in-b644753d0e3948f81dc2,,b644753d0e3948f81dc2,10,Superluminous Supernovae,16.55,2024-02-23 16:18:02 UTC,,,"{""source"":""api"",""session"":""7a1f4a17d190291faa1824be3b3febf1d8b77a4f2d25dd6f191f76ef335684bf"",""viewport"":{""width"":1710,""height"":948},""started_at"":""2024-02-23T16:17:48.559Z"",""user_agent"":""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36"",""utc_offset"":""18000"",""finished_at"":""2024-02-23T16:18:02.264Z"",""live_project"":true,""interventions"":{""opt_in"":false,""messageShown"":false},""user_language"":""en"",""subject_dimensions"":[{""clientWidth"":558,""clientHeight"":419,""naturalWidth"":1200,""naturalHeight"":900},{""clientWidth"":300,""clientHeight"":300,""naturalWidth"":300,""naturalHeight"":300}],""subject_selection_state"":{""retired"":false,""selected_at"":""2024-02-23T16:16:35.003Z"",""already_seen"":false,""selection_state"":""normal"",""finished_workflow"":false,""user_has_finished_workflow"":false},""workflow_translation_id"":""28176""}","[{""task"":""T0"",""task_label"":""**Has the lightcurve been rising for more than 20 days?**\n\nNOTE: The y-axis shows the magnitude (brightness) of the supernova. In astronomy smaller magnitudes are brighter!"",""value"":""Yes""},{""task"":""T1"",""task_label"":""Is the cross-hair in the image close to a faint, fuzzy galaxy?"",""value"":""No""}]","{""96588126"":{""retired"":{""id"":125510270,""workflow_id"":10,""classifications_count"":10,""created_at"":""2024-02-21T09:43:25.834Z"",""updated_at"":""2024-02-23T16:18:02.396Z"",""retired_at"":""2024-02-23T16:18:02.389Z"",""subject_id"":96588126,""retirement_reason"":""classification_count""},""ramean"":98.49884808888889,""ZTF_URL"":""https://lasair-ztf.lsst.ac.uk/objects/ZTF22abfnkve"",""decmean"":58.67660070000001,""objectId"":""ZTF22abfnkve""}}",96588126 +543695390,not-logged-in-b644753d0e3948f81dc2,,b644753d0e3948f81dc2,10,Superluminous Supernovae,16.55,2024-02-23 16:18:09 UTC,,,"{""source"":""api"",""session"":""7a1f4a17d190291faa1824be3b3febf1d8b77a4f2d25dd6f191f76ef335684bf"",""viewport"":{""width"":1710,""height"":948},""started_at"":""2024-02-23T16:18:02.283Z"",""user_agent"":""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36"",""utc_offset"":""18000"",""finished_at"":""2024-02-23T16:18:09.532Z"",""live_project"":true,""interventions"":{""opt_in"":false,""messageShown"":false},""user_language"":""en"",""subject_dimensions"":[{""clientWidth"":558,""clientHeight"":419,""naturalWidth"":1200,""naturalHeight"":900},{""clientWidth"":300,""clientHeight"":300,""naturalWidth"":300,""naturalHeight"":300}],""subject_selection_state"":{""retired"":false,""selected_at"":""2024-02-23T16:16:35.003Z"",""already_seen"":false,""selection_state"":""normal"",""finished_workflow"":false,""user_has_finished_workflow"":false},""workflow_translation_id"":""28176""}","[{""task"":""T0"",""task_label"":""**Has the lightcurve been rising for more than 20 days?**\n\nNOTE: The y-axis shows the magnitude (brightness) of the supernova. In astronomy smaller magnitudes are brighter!"",""value"":""No""}]","{""96588128"":{""retired"":{""id"":125510290,""workflow_id"":10,""classifications_count"":10,""created_at"":""2024-02-21T09:43:36.461Z"",""updated_at"":""2024-02-23T16:18:09.674Z"",""retired_at"":""2024-02-23T16:18:09.667Z"",""subject_id"":96588128,""retirement_reason"":""classification_count""},""ramean"":41.573462775,""ZTF_URL"":""https://lasair-ztf.lsst.ac.uk/objects/ZTF23aavvcjd"",""decmean"":-5.001660237499999,""objectId"":""ZTF23aavvcjd""}}",96588128 +543695425,not-logged-in-b644753d0e3948f81dc2,,b644753d0e3948f81dc2,10,Superluminous Supernovae,16.55,2024-02-23 16:18:24 UTC,,,"{""source"":""api"",""session"":""7a1f4a17d190291faa1824be3b3febf1d8b77a4f2d25dd6f191f76ef335684bf"",""viewport"":{""width"":1710,""height"":948},""started_at"":""2024-02-23T16:18:09.551Z"",""user_agent"":""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36"",""utc_offset"":""18000"",""finished_at"":""2024-02-23T16:18:24.225Z"",""live_project"":true,""interventions"":{""opt_in"":false,""messageShown"":false},""user_language"":""en"",""subject_dimensions"":[{""clientWidth"":558,""clientHeight"":419,""naturalWidth"":1200,""naturalHeight"":900},{""clientWidth"":300,""clientHeight"":300,""naturalWidth"":300,""naturalHeight"":300}],""subject_selection_state"":{""retired"":false,""selected_at"":""2024-02-23T16:16:35.003Z"",""already_seen"":false,""selection_state"":""normal"",""finished_workflow"":false,""user_has_finished_workflow"":false},""workflow_translation_id"":""28176""}","[{""task"":""T0"",""task_label"":""**Has the lightcurve been rising for more than 20 days?**\n\nNOTE: The y-axis shows the magnitude (brightness) of the supernova. In astronomy smaller magnitudes are brighter!"",""value"":""Yes""},{""task"":""T1"",""task_label"":""Is the cross-hair in the image close to a faint, fuzzy galaxy?"",""value"":""No""}]","{""96588109"":{""retired"":{""id"":125510335,""workflow_id"":10,""classifications_count"":10,""created_at"":""2024-02-21T09:44:14.501Z"",""updated_at"":""2024-02-23T16:18:24.390Z"",""retired_at"":""2024-02-23T16:18:24.378Z"",""subject_id"":96588109,""retirement_reason"":""classification_count""},""ramean"":11.719328585714285,""ZTF_URL"":""https://lasair-ztf.lsst.ac.uk/objects/ZTF23aatzhso"",""decmean"":42.02810038571429,""objectId"":""ZTF23aatzhso""}}",96588109 +543695436,not-logged-in-b644753d0e3948f81dc2,,b644753d0e3948f81dc2,10,Superluminous Supernovae,16.55,2024-02-23 16:18:27 UTC,,,"{""source"":""api"",""session"":""7a1f4a17d190291faa1824be3b3febf1d8b77a4f2d25dd6f191f76ef335684bf"",""viewport"":{""width"":1710,""height"":948},""started_at"":""2024-02-23T16:18:24.243Z"",""user_agent"":""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36"",""utc_offset"":""18000"",""finished_at"":""2024-02-23T16:18:27.892Z"",""live_project"":true,""interventions"":{""opt_in"":false,""messageShown"":false},""user_language"":""en"",""subject_dimensions"":[{""clientWidth"":558,""clientHeight"":419,""naturalWidth"":1200,""naturalHeight"":900},{""clientWidth"":300,""clientHeight"":300,""naturalWidth"":300,""naturalHeight"":300}],""subject_selection_state"":{""retired"":false,""selected_at"":""2024-02-23T16:16:35.003Z"",""already_seen"":false,""selection_state"":""normal"",""finished_workflow"":false,""user_has_finished_workflow"":false},""workflow_translation_id"":""28176""}","[{""task"":""T0"",""task_label"":""**Has the lightcurve been rising for more than 20 days?**\n\nNOTE: The y-axis shows the magnitude (brightness) of the supernova. In astronomy smaller magnitudes are brighter!"",""value"":""No""}]","{""96588106"":{""retired"":{""id"":125510296,""workflow_id"":10,""classifications_count"":10,""created_at"":""2024-02-21T09:43:44.966Z"",""updated_at"":""2024-02-23T17:54:11.466Z"",""retired_at"":""2024-02-23T17:54:11.458Z"",""subject_id"":96588106,""retirement_reason"":""classification_count""},""ramean"":47.78652812,""ZTF_URL"":""https://lasair-ztf.lsst.ac.uk/objects/ZTF23aauyuay"",""decmean"":73.76492526000001,""objectId"":""ZTF23aauyuay""}}",96588106 +543695453,not-logged-in-b644753d0e3948f81dc2,,b644753d0e3948f81dc2,10,Superluminous Supernovae,16.55,2024-02-23 16:18:35 UTC,,,"{""source"":""api"",""session"":""7a1f4a17d190291faa1824be3b3febf1d8b77a4f2d25dd6f191f76ef335684bf"",""viewport"":{""width"":1710,""height"":948},""started_at"":""2024-02-23T16:18:27.902Z"",""user_agent"":""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36"",""utc_offset"":""18000"",""finished_at"":""2024-02-23T16:18:35.478Z"",""live_project"":true,""interventions"":{""opt_in"":false,""messageShown"":false},""user_language"":""en"",""subject_dimensions"":[{""clientWidth"":558,""clientHeight"":419,""naturalWidth"":1200,""naturalHeight"":900},{""clientWidth"":300,""clientHeight"":300,""naturalWidth"":300,""naturalHeight"":300}],""subject_selection_state"":{""retired"":false,""selected_at"":""2024-02-23T16:16:35.003Z"",""already_seen"":false,""selection_state"":""normal"",""finished_workflow"":false,""user_has_finished_workflow"":false},""workflow_translation_id"":""28176""}","[{""task"":""T0"",""task_label"":""**Has the lightcurve been rising for more than 20 days?**\n\nNOTE: The y-axis shows the magnitude (brightness) of the supernova. In astronomy smaller magnitudes are brighter!"",""value"":""No""}]","{""96588131"":{""retired"":{""id"":125510305,""workflow_id"":10,""classifications_count"":10,""created_at"":""2024-02-21T09:43:48.500Z"",""updated_at"":""2024-02-23T18:31:31.686Z"",""retired_at"":""2024-02-23T18:31:31.677Z"",""subject_id"":96588131,""retirement_reason"":""classification_count""},""ramean"":158.6458489125,""ZTF_URL"":""https://lasair-ztf.lsst.ac.uk/objects/ZTF23absjgik"",""decmean"":-27.650916137499998,""objectId"":""ZTF23absjgik""}}",96588131 diff --git a/panoptes_aggregation/tests/batch_aggregation/test_batch_aggregation.py b/panoptes_aggregation/tests/batch_aggregation/test_batch_aggregation.py new file mode 100644 index 00000000..d68ca476 --- /dev/null +++ b/panoptes_aggregation/tests/batch_aggregation/test_batch_aggregation.py @@ -0,0 +1,155 @@ +import unittest +import os +from unittest.mock import patch, MagicMock, call +from panoptes_aggregation.scripts import batch_utils +from panoptes_aggregation.batch_aggregation import run_aggregation +from panoptes_aggregation import batch_aggregation as batch_agg + +wf_export = 'panoptes_aggregation/tests/batch_aggregation/wf_export.csv' +cls_export = 'panoptes_aggregation/tests/batch_aggregation/cls_export.csv' + + +@patch("panoptes_aggregation.batch_aggregation.BatchAggregator._connect_api_client", new=MagicMock()) +class TestBatchAggregation(unittest.TestCase): + @patch("panoptes_aggregation.batch_aggregation.BatchAggregator") + def test_run_aggregation_permission_failure(self, mock_aggregator): + mock_aggregator_instance = mock_aggregator.return_value + mock_aggregator_instance.check_permission.return_value = False + + with self.assertRaises(SystemExit): + run_aggregation(1, 10, 100) + mock_aggregator_instance.update_panoptes.assert_not_called() + + @patch("panoptes_aggregation.batch_aggregation.workflow_extractor_config") + @patch("panoptes_aggregation.batch_aggregation.BatchAggregator") + def test_run_aggregation_success(self, mock_aggregator, mock_wf_ext_conf): + mock_aggregator_instance = mock_aggregator.return_value + mock_aggregator_instance.check_permission.return_value = True + + mock_df = MagicMock() + test_extracts = {'question_extractor': mock_df} + batch_utils.batch_extract = MagicMock(return_value=test_extracts) + mock_reducer = MagicMock() + batch_utils.batch_reduce = mock_reducer + + run_aggregation(1, 10, 100) + mock_aggregator_instance.check_permission.assert_called_once() + mock_aggregator.assert_called_once_with(1, 10, 100) + mock_wf_ext_conf.assert_called_once() + batch_utils.batch_extract.assert_called_once() + mock_df.to_csv.assert_called() + batch_utils.batch_reduce.assert_called() + self.assertEqual(mock_reducer.call_count, 2) + mock_aggregator_instance.upload_files.assert_called_once() + mock_aggregator_instance.update_panoptes.assert_called_once() + + @patch("panoptes_aggregation.batch_aggregation.os.mkdir") + @patch("panoptes_aggregation.batch_aggregation.Workflow") + @patch("panoptes_aggregation.batch_aggregation.Project") + def test_save_exports(self, mock_project, mock_workflow, mock_mkdir): + # Test that Panoptes calls are made and files are saved + csv_dict = {'media': [{'src': 'http://zooniverse.org/123.csv'}]} + mock_project.return_value.describe_export.return_value = csv_dict + mock_workflow.return_value.describe_export.return_value = csv_dict + ba = batch_agg.BatchAggregator(1, 10, 100) + batch_agg.BatchAggregator._download_export = MagicMock(side_effect=['./cls_export.csv', './wf_export.csv']) + expected_response = {'classifications': 'tmp/10/10_cls_export.csv', 'workflows': 'tmp/10/10_workflow_export.csv'} + + response = ba.save_exports() + + assert ba.id is not None + self.assertEqual(response, expected_response) + mock_mkdir.assert_called_once() + mock_project.assert_called_once_with(1) + mock_workflow.assert_called_once_with(10) + mock_project.return_value.describe_export.assert_called_once_with('workflows') + mock_workflow.return_value.describe_export.assert_called_once_with('classifications') + + def test_process_wf_export(self): + ba = batch_agg.BatchAggregator(1, 10, 100) + result = ba.process_wf_export(wf_export) + self.assertEqual(ba.wf_maj_version, 16) + self.assertEqual(ba.wf_min_version, 55) + self.assertEqual(ba.workflow_version, '16.55') + self.assertEqual(result.__class__.__name__, 'DataFrame') + + def test_process_cls_export(self): + ba = batch_agg.BatchAggregator(1, 10, 100) + ba.workflow_version = '16.55' + result = ba.process_cls_export(cls_export) + self.assertEqual(result.__class__.__name__, 'DataFrame') + + @patch("panoptes_aggregation.batch_aggregation.BatchAggregator.connect_blob_storage") + @patch("panoptes_aggregation.batch_aggregation.make_archive") + def test_upload_files(self, archive_mock, client_mock): + zipped_mock = MagicMock() + archive_mock.return_value = zipped_mock + ba = batch_agg.BatchAggregator(1, 10, 100) + ba.upload_file_to_storage = MagicMock() + ba.output_path = os.path.join('tmp', '10') + reductions_file = os.path.join('tmp', '10', '10_reductions.csv') + ba.upload_files() + client_mock.assert_called_once() + archive_mock.assert_called_once() + ba.upload_file_to_storage.assert_has_calls([call(ba.id, reductions_file), call(ba.id, zipped_mock)]) + + def test_upload_file_to_storage(self): + ba = batch_agg.BatchAggregator(1, 10, 100) + mock_client = MagicMock() + ba.blob_service_client = MagicMock(return_value=mock_client) + ba.upload_file_to_storage('container', cls_export) + mock_client.upload_blob.assert_called_once + + @patch("panoptes_aggregation.batch_aggregation.Project") + def test_check_permission_success(self, mock_project): + mock_user = MagicMock() + mock_user.id = 100 + mock_project.find().collaborators.return_value = [mock_user] + + ba = batch_agg.BatchAggregator(1, 10, 100) + ba.check_permission() + mock_project.find.assert_called_with(1) + mock_project.find().collaborators.assert_called() + self.assertEqual(ba.check_permission(), True) + + @patch("panoptes_aggregation.batch_aggregation.Project") + def test_check_permission_failure(self, mock_project): + mock_user = MagicMock() + + # List of collaborators does not include initiating user + mock_user.id = 999 + mock_project.find().collaborators.return_value = [mock_user] + + ba = batch_agg.BatchAggregator(1, 10, 100) + ba.update_panoptes = MagicMock() + ba.check_permission() + mock_project.find.assert_called_with(1) + mock_project.find().collaborators.assert_called() + self.assertEqual(ba.check_permission(), False) + ba.update_panoptes.assert_not_called() + + @patch("panoptes_aggregation.batch_aggregation.Panoptes.put") + @patch("panoptes_aggregation.batch_aggregation.Panoptes.get") + def test_update_panoptes_success(self, mock_get, mock_put): + ba = batch_agg.BatchAggregator(1, 10, 100) + mock_get.return_value = ({'aggregations': [{'id': 5555}]}, 'thisisanetag') + body = {'uuid': ba.id, 'status': 'completed'} + ba.update_panoptes(body) + mock_get.assert_called_with('/aggregations', params={'workflow_id': 10}) + mock_put.assert_called_with('/aggregations/5555', etag='thisisanetag', json={'aggregations': body}) + + @patch("panoptes_aggregation.batch_aggregation.Panoptes.put") + @patch("panoptes_aggregation.batch_aggregation.Panoptes.get") + def test_update_panoptes_failure(self, mock_get, mock_put): + ba = batch_agg.BatchAggregator(1, 10, 100) + mock_get.return_value = ({'aggregations': [{'id': 5555}]}, 'thisisanetag') + body = {'status': 'failure'} + ba.update_panoptes(body) + mock_get.assert_called_with('/aggregations', params={'workflow_id': 10}) + mock_put.assert_called_with('/aggregations/5555', etag='thisisanetag', json={'aggregations': body}) + + @patch("panoptes_aggregation.batch_aggregation.BlobServiceClient") + def test_connect_blob_storage(self, mock_client): + ba = batch_agg.BatchAggregator(1, 10, 100) + ba.connect_blob_storage() + ba.blob_service_client.create_container.assert_called_once_with(name=ba.id) diff --git a/panoptes_aggregation/tests/batch_aggregation/wf_export.csv b/panoptes_aggregation/tests/batch_aggregation/wf_export.csv new file mode 100644 index 00000000..14e7fe45 --- /dev/null +++ b/panoptes_aggregation/tests/batch_aggregation/wf_export.csv @@ -0,0 +1,7 @@ +workflow_id,display_name,version,active,classifications_count,pairwise,grouped,prioritized,primary_language,first_task,tutorial_subject_id,retired_set_member_subjects_count,tasks,retirement,aggregation,strings,minor_version +10,Superluminous Supernovae,14,true,105171,false,false,false,en,T0,,9056,"{""T0"":{""help"":""T0.help"",""type"":""single"",""answers"":[{""next"":""T1"",""label"":""T0.answers.0.label""},{""label"":""T0.answers.1.label""}],""question"":""T0.question""},""T1"":{""help"":""T1.help"",""type"":""single"",""answers"":[{""label"":""T1.answers.0.label""},{""label"":""T1.answers.1.label""}],""question"":""T1.question""}}","{""options"":{""count"":10},""criteria"":""classification_count""}",{},"{""T0.help"":""The lightcurve is the plot showing brightness over time. The numbers on the x-axis are in days. Look to see if the brightness has been increasing over a period of more than 20 days.\n\nThe blue diamonds represent how bright the supernova is in blue light (at around 450 nanometres)\nand red circles show how bright it is in red light (about 610 nanometres). Often supernovae start blue and\nbecome red as they cool.\n\nHere are some examples of lightcurves that have been rising for over 20 days:\n\n![ZTF20abobpcb_light_curve.jpeg](https://panoptes-uploads.zooniverse.org/production/project_attached_image/d04087a4-ea8d-48f2-b297-401a58a091dd.jpeg)\n\n![ZTF20aadcbvz_light_curve.jpeg](https://panoptes-uploads.zooniverse.org/production/project_attached_image/a8dde630-fcc6-4577-afbc-6c1b7f451857.jpeg)\n\nEven though we observed this object well into its decline, the start of it still has a rise more than 20 days long, so it is a good candidate.\n\nRemember that the \n"",""T1.help"":""Here are some examples of what a faint, fuzzy galaxy hosting a superluminous supernova looks like (Please note that the supernova is not visible in these images. The cross-hair marks where the candidate supernova was detected):\n\n![ZTF19abxekxi-image.png](https://panoptes-uploads.zooniverse.org/production/project_attached_image/928be57f-844b-4947-b30f-28e70dfeb365.png)\n\n![ZTF19abpbopt-image.png](https://panoptes-uploads.zooniverse.org/production/project_attached_image/1f125022-f2f8-411b-a1c4-b1d825d06d9f.png)"",""T0.question"":""**Has the lightcurve been rising for more than 20 days?**\n\nNOTE: The y-axis shows the magnitude (brightness) of the supernova. In astronomy smaller magnitudes are brighter!"",""T1.question"":""Is the cross-hair in the image close to a faint, fuzzy galaxy?"",""T0.answers.0.label"":""Yes"",""T0.answers.1.label"":""No"",""T1.answers.0.label"":""Yes"",""T1.answers.1.label"":""No""}",52 +10,Superluminous Supernovae,14,true,105171,false,false,false,en,T0,,9056,"{""T0"":{""help"":""T0.help"",""type"":""single"",""answers"":[{""next"":""T1"",""label"":""T0.answers.0.label""},{""label"":""T0.answers.1.label""}],""question"":""T0.question""},""T1"":{""help"":""T1.help"",""type"":""single"",""answers"":[{""label"":""T1.answers.0.label""},{""label"":""T1.answers.1.label""}],""question"":""T1.question""}}","{""options"":{""count"":10},""criteria"":""classification_count""}",{},"{""T0.help"":""The lightcurve is the plot showing brightness over time. The numbers on the x-axis are in days. Look to see if the brightness has been increasing over a period of more than 20 days.\n\nThe blue diamonds represent how bright the supernova is in blue light (at around 450 nanometres)\nand red circles show how bright it is in red light (about 610 nanometres). Often supernovae start blue and\nbecome red as they cool.\n\nHere are some examples of lightcurves that have been rising for over 20 days:\n\n![ZTF20abobpcb_light_curve.jpeg](https://panoptes-uploads.zooniverse.org/production/project_attached_image/d04087a4-ea8d-48f2-b297-401a58a091dd.jpeg)\n\n![ZTF20aadcbvz_light_curve.jpeg](https://panoptes-uploads.zooniverse.org/production/project_attached_image/a8dde630-fcc6-4577-afbc-6c1b7f451857.jpeg)\n\nEven though we observed this object well into its decline, the start of it still has a rise more than 20 days long, so it is a good candidate.\n\nRemember that the size of a 20-day period will change depending on the number of observations, so you need to look at the axis. \n"",""T1.help"":""Here are some examples of what a faint, fuzzy galaxy hosting a superluminous supernova looks like (Please note that the supernova is not visible in these images. The cross-hair marks where the candidate supernova was detected):\n\n![ZTF19abxekxi-image.png](https://panoptes-uploads.zooniverse.org/production/project_attached_image/928be57f-844b-4947-b30f-28e70dfeb365.png)\n\n![ZTF19abpbopt-image.png](https://panoptes-uploads.zooniverse.org/production/project_attached_image/1f125022-f2f8-411b-a1c4-b1d825d06d9f.png)"",""T0.question"":""**Has the lightcurve been rising for more than 20 days?**\n\nNOTE: The y-axis shows the magnitude (brightness) of the supernova. In astronomy smaller magnitudes are brighter!"",""T1.question"":""Is the cross-hair in the image close to a faint, fuzzy galaxy?"",""T0.answers.0.label"":""Yes"",""T0.answers.1.label"":""No"",""T1.answers.0.label"":""Yes"",""T1.answers.1.label"":""No""}",53 +10,Superluminous Supernovae,14,true,105171,false,false,false,en,T0,,9056,"{""T0"":{""help"":""T0.help"",""type"":""single"",""answers"":[{""next"":""T1"",""label"":""T0.answers.0.label""},{""label"":""T0.answers.1.label""}],""question"":""T0.question""},""T1"":{""help"":""T1.help"",""type"":""single"",""answers"":[{""label"":""T1.answers.0.label""},{""label"":""T1.answers.1.label""}],""question"":""T1.question""}}","{""options"":{""count"":10},""criteria"":""classification_count""}",{},"{""T0.help"":""The lightcurve is the plot showing brightness over time. The numbers on the x-axis are in days. Look to see if the brightness has been increasing over a period of more than 20 days.\n\nThe blue diamonds represent how bright the supernova is in blue light (at around 450 nanometres)and red circles show how bright it is in red light (about 610 nanometres). Often supernovae start blue and\nbecome red as they cool.\n\nHere are some examples of lightcurves that have been rising for over 20 days:\n\n![ZTF20abobpcb_light_curve.jpeg](https://panoptes-uploads.zooniverse.org/production/project_attached_image/d04087a4-ea8d-48f2-b297-401a58a091dd.jpeg)\n\n![ZTF20aadcbvz_light_curve.jpeg](https://panoptes-uploads.zooniverse.org/production/project_attached_image/a8dde630-fcc6-4577-afbc-6c1b7f451857.jpeg)\n\nEven though we observed this object well into its decline, the start of it still has a rise more than 20 days long, so it is a good candidate.\n\nRemember that the size of a 20-day period will change depending on the number of observations, so you need to look at the axis. \n"",""T1.help"":""Here are some examples of what a faint, fuzzy galaxy hosting a superluminous supernova looks like (Please note that the supernova is not visible in these images. The cross-hair marks where the candidate supernova was detected):\n\n![ZTF19abxekxi-image.png](https://panoptes-uploads.zooniverse.org/production/project_attached_image/928be57f-844b-4947-b30f-28e70dfeb365.png)\n\n![ZTF19abpbopt-image.png](https://panoptes-uploads.zooniverse.org/production/project_attached_image/1f125022-f2f8-411b-a1c4-b1d825d06d9f.png)"",""T0.question"":""**Has the lightcurve been rising for more than 20 days?**\n\nNOTE: The y-axis shows the magnitude (brightness) of the supernova. In astronomy smaller magnitudes are brighter!"",""T1.question"":""Is the cross-hair in the image close to a faint, fuzzy galaxy?"",""T0.answers.0.label"":""Yes"",""T0.answers.1.label"":""No"",""T1.answers.0.label"":""Yes"",""T1.answers.1.label"":""No""}",54 +10,Superluminous Supernovae,14,true,105171,false,false,false,en,T0,,9056,"{""T0"":{""help"":""T0.help"",""type"":""single"",""answers"":[{""next"":""T1"",""label"":""T0.answers.0.label""},{""label"":""T0.answers.1.label""}],""question"":""T0.question""},""T1"":{""help"":""T1.help"",""type"":""single"",""answers"":[{""label"":""T1.answers.0.label""},{""label"":""T1.answers.1.label""}],""question"":""T1.question""}}","{""options"":{""count"":10},""criteria"":""classification_count""}",{},"{""T0.help"":""The lightcurve is the plot showing brightness over time. The numbers on the x-axis are in days. Look to see if the brightness has been increasing over a period of more than 20 days.\n\nThe blue diamonds represent how bright the supernova is in blue light (at around 450 nanometres)and red circles show how bright it is in red light (about 610 nanometres). Often supernovae start blue and become red as they cool.\n\nHere are some examples of lightcurves that have been rising for over 20 days:\n\n![ZTF20abobpcb_light_curve.jpeg](https://panoptes-uploads.zooniverse.org/production/project_attached_image/d04087a4-ea8d-48f2-b297-401a58a091dd.jpeg)\n\n![ZTF20aadcbvz_light_curve.jpeg](https://panoptes-uploads.zooniverse.org/production/project_attached_image/a8dde630-fcc6-4577-afbc-6c1b7f451857.jpeg)\n\nEven though we observed this object well into its decline, the start of it still has a rise more than 20 days long, so it is a good candidate.\n\nRemember that the size of a 20-day period will change depending on the number of observations, so you need to look at the axis. \n"",""T1.help"":""Here are some examples of what a faint, fuzzy galaxy hosting a superluminous supernova looks like (Please note that the supernova is not visible in these images. The cross-hair marks where the candidate supernova was detected):\n\n![ZTF19abxekxi-image.png](https://panoptes-uploads.zooniverse.org/production/project_attached_image/928be57f-844b-4947-b30f-28e70dfeb365.png)\n\n![ZTF19abpbopt-image.png](https://panoptes-uploads.zooniverse.org/production/project_attached_image/1f125022-f2f8-411b-a1c4-b1d825d06d9f.png)"",""T0.question"":""**Has the lightcurve been rising for more than 20 days?**\n\nNOTE: The y-axis shows the magnitude (brightness) of the supernova. In astronomy smaller magnitudes are brighter!"",""T1.question"":""Is the cross-hair in the image close to a faint, fuzzy galaxy?"",""T0.answers.0.label"":""Yes"",""T0.answers.1.label"":""No"",""T1.answers.0.label"":""Yes"",""T1.answers.1.label"":""No""}",55 +10,Superluminous Supernovae,15,true,105171,false,false,false,en,T0,,9056,"{""T0"":{""help"":""T0.help"",""type"":""single"",""answers"":[{""next"":""T1"",""label"":""T0.answers.0.label""},{""label"":""T0.answers.1.label""}],""question"":""T0.question"",""required"":true},""T1"":{""help"":""T1.help"",""type"":""single"",""answers"":[{""label"":""T1.answers.0.label""},{""label"":""T1.answers.1.label""}],""question"":""T1.question""}}","{""options"":{""count"":10},""criteria"":""classification_count""}",{},"{""T0.help"":""The lightcurve is the plot showing brightness over time. The numbers on the x-axis are in days. Look to see if the brightness has been increasing over a period of more than 20 days.\n\nThe blue diamonds represent how bright the supernova is in blue light (at around 450 nanometres)and red circles show how bright it is in red light (about 610 nanometres). Often supernovae start blue and become red as they cool.\n\nHere are some examples of lightcurves that have been rising for over 20 days:\n\n![ZTF20abobpcb_light_curve.jpeg](https://panoptes-uploads.zooniverse.org/production/project_attached_image/d04087a4-ea8d-48f2-b297-401a58a091dd.jpeg)\n\n![ZTF20aadcbvz_light_curve.jpeg](https://panoptes-uploads.zooniverse.org/production/project_attached_image/a8dde630-fcc6-4577-afbc-6c1b7f451857.jpeg)\n\nEven though we observed this object well into its decline, the start of it still has a rise more than 20 days long, so it is a good candidate.\n\nRemember that the size of a 20-day period will change depending on the number of observations, so you need to look at the axis. \n"",""T1.help"":""Here are some examples of what a faint, fuzzy galaxy hosting a superluminous supernova looks like (Please note that the supernova is not visible in these images. The cross-hair marks where the candidate supernova was detected):\n\n![ZTF19abxekxi-image.png](https://panoptes-uploads.zooniverse.org/production/project_attached_image/928be57f-844b-4947-b30f-28e70dfeb365.png)\n\n![ZTF19abpbopt-image.png](https://panoptes-uploads.zooniverse.org/production/project_attached_image/1f125022-f2f8-411b-a1c4-b1d825d06d9f.png)"",""T0.question"":""**Has the lightcurve been rising for more than 20 days?**\n\nNOTE: The y-axis shows the magnitude (brightness) of the supernova. In astronomy smaller magnitudes are brighter!"",""T1.question"":""Is the cross-hair in the image close to a faint, fuzzy galaxy?"",""T0.answers.0.label"":""Yes"",""T0.answers.1.label"":""No"",""T1.answers.0.label"":""Yes"",""T1.answers.1.label"":""No""}",55 +10,Superluminous Supernovae,16,true,105171,false,false,false,en,T0,,9056,"{""T0"":{""help"":""T0.help"",""type"":""single"",""answers"":[{""next"":""T1"",""label"":""T0.answers.0.label""},{""label"":""T0.answers.1.label""}],""question"":""T0.question"",""required"":true},""T1"":{""help"":""T1.help"",""type"":""single"",""answers"":[{""label"":""T1.answers.0.label""},{""label"":""T1.answers.1.label""}],""question"":""T1.question"",""required"":true}}","{""options"":{""count"":10},""criteria"":""classification_count""}",{},"{""T0.help"":""The lightcurve is the plot showing brightness over time. The numbers on the x-axis are in days. Look to see if the brightness has been increasing over a period of more than 20 days.\n\nThe blue diamonds represent how bright the supernova is in blue light (at around 450 nanometres)and red circles show how bright it is in red light (about 610 nanometres). Often supernovae start blue and become red as they cool.\n\nHere are some examples of lightcurves that have been rising for over 20 days:\n\n![ZTF20abobpcb_light_curve.jpeg](https://panoptes-uploads.zooniverse.org/production/project_attached_image/d04087a4-ea8d-48f2-b297-401a58a091dd.jpeg)\n\n![ZTF20aadcbvz_light_curve.jpeg](https://panoptes-uploads.zooniverse.org/production/project_attached_image/a8dde630-fcc6-4577-afbc-6c1b7f451857.jpeg)\n\nEven though we observed this object well into its decline, the start of it still has a rise more than 20 days long, so it is a good candidate.\n\nRemember that the size of a 20-day period will change depending on the number of observations, so you need to look at the axis. \n"",""T1.help"":""Here are some examples of what a faint, fuzzy galaxy hosting a superluminous supernova looks like (Please note that the supernova is not visible in these images. The cross-hair marks where the candidate supernova was detected):\n\n![ZTF19abxekxi-image.png](https://panoptes-uploads.zooniverse.org/production/project_attached_image/928be57f-844b-4947-b30f-28e70dfeb365.png)\n\n![ZTF19abpbopt-image.png](https://panoptes-uploads.zooniverse.org/production/project_attached_image/1f125022-f2f8-411b-a1c4-b1d825d06d9f.png)"",""T0.question"":""**Has the lightcurve been rising for more than 20 days?**\n\nNOTE: The y-axis shows the magnitude (brightness) of the supernova. In astronomy smaller magnitudes are brighter!"",""T1.question"":""Is the cross-hair in the image close to a faint, fuzzy galaxy?"",""T0.answers.0.label"":""Yes"",""T0.answers.1.label"":""No"",""T1.answers.0.label"":""Yes"",""T1.answers.1.label"":""No""}",55 diff --git a/panoptes_aggregation/tests/router_tests/test_routes.py b/panoptes_aggregation/tests/router_tests/test_routes.py index 0e3ff633..70db0aa5 100644 --- a/panoptes_aggregation/tests/router_tests/test_routes.py +++ b/panoptes_aggregation/tests/router_tests/test_routes.py @@ -68,6 +68,30 @@ def test_one_running_reducer_route(self): running_reducer_name ) + # Override json.dumps() for this test so it doesn't try to jsonify the mock + import json + + @patch("panoptes_aggregation.batch_aggregation.json.dumps", return_value=json.dumps({'project_id': 1, 'workflow_id': 10, 'user_id': 100, 'task_id': 'asdf'})) + @patch("panoptes_aggregation.batch_aggregation.run_aggregation.delay") + def test_run_aggregation_route(self, mocked_task, mocked_json): + '''Test that the bg task gets called on batch aggregation route''' + with routes.make_application().test_client() as client: + mocked_task.id = 'asdf' + response = client.post('/run_aggregation', json={'project_id': 1, 'workflow_id': 10, 'user_id': 100}) + mocked_task.assert_called_once_with(1, 10, 100) + self.assertEqual(response.status_code, 202) + self.assertIn('"task_id": "asdf"', response.text) + + @patch("celery.result.AsyncResult") + def test_get_status(self, asyncresult): + '''Test task status works''' + with self.application.test_client() as client: + result = '"task_id": "asdf", "task_status": "PENDING"' + asyncresult.get = result + response = client.get('/tasks/asdf') + self.assertEqual(response.status_code, 200) + self.assertIn(result, response.text) + @unittest.skipIf("TRAVIS" in os.environ and os.environ["TRAVIS"] == "true", "Skipping this test on Travis CI.") def test_docs_route(self): '''Test docs route works''' diff --git a/pyproject.toml b/pyproject.toml index f37e5f19..28567368 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,11 @@ dependencies = [ [project.optional-dependencies] online = [ + "azure-identity>=1,<2", + "azure-storage-blob>=12,<13", + "celery>=5.3,<5.4", + "redis>=5,<6", + "flower>2,<3", "flask>=2.3,<3.1", "flask-cors>=3.0,<4.1", "panoptes-client>=1.6,<1.7", @@ -83,7 +88,7 @@ include = [ exclude = [ "docs/", "kubernetes", - "make_docs.sh" + "scripts/make_docs.sh" ] [tool.coverage.run] diff --git a/make_docs.sh b/scripts/make_docs.sh similarity index 100% rename from make_docs.sh rename to scripts/make_docs.sh diff --git a/scripts/start-celery.sh b/scripts/start-celery.sh new file mode 100755 index 00000000..082f6d72 --- /dev/null +++ b/scripts/start-celery.sh @@ -0,0 +1,3 @@ +#!/bin/bash -e + +exec celery --app panoptes_aggregation.tasks.celery worker --loglevel=info diff --git a/start-flask.sh b/scripts/start-flask.sh similarity index 100% rename from start-flask.sh rename to scripts/start-flask.sh diff --git a/scripts/start-flower.sh b/scripts/start-flower.sh new file mode 100755 index 00000000..68789359 --- /dev/null +++ b/scripts/start-flower.sh @@ -0,0 +1,4 @@ +#!/bin/bash -e + +BROKER=${CELERY_BROKER_URL:='redis://redis:6379/0'} +exec celery --app panoptes_aggregation.tasks.celery flower --port=5555 --broker=$BROKER From 0bfb066e8e690cfa9806c9fa9e3be99f7725d39b Mon Sep 17 00:00:00 2001 From: Zach Wolfenbarger Date: Mon, 1 Jul 2024 17:52:10 -0500 Subject: [PATCH 2/3] Batch aggregation staging deploy (#786) * Add logging statements * Update celery task namespace * Add staging deployment template * Clean up new resource names * Build to a single docker image * Rename deployment & use Panoptes staging in staging deploy * Fix secret name * Sringify ID in comparison to value returned from Panoptes * Update test * Fix mock data type * Use client's admin mode * Fix a couple filepaths * Use UUID as tmpdir path * Finish run if Panoptes is unupdateable * When the update panoptes resource doesn't exist but the call is successful * Use jsonify to set mimetype * cast inputs to ints just in case * Enable public access to new containers * Deploy staging with action * hound? * test fixes * new hound * Use correct k8s secret --- .github/workflows/deploy_staging.yml | 44 +++ docker-compose.yml | 14 +- kubernetes/deployment-staging.yaml | 329 ++++++++++++++++++ panoptes_aggregation/batch_aggregation.py | 37 +- panoptes_aggregation/routes.py | 2 +- .../test_batch_aggregation.py | 30 +- scripts/start-celery.sh | 2 +- scripts/start-flower.sh | 2 +- 8 files changed, 428 insertions(+), 32 deletions(-) create mode 100644 .github/workflows/deploy_staging.yml create mode 100644 kubernetes/deployment-staging.yaml diff --git a/.github/workflows/deploy_staging.yml b/.github/workflows/deploy_staging.yml new file mode 100644 index 00000000..5caf7666 --- /dev/null +++ b/.github/workflows/deploy_staging.yml @@ -0,0 +1,44 @@ +name: Deploy to Staging + +on: + push: + branches: + - master + workflow_dispatch: + +jobs: + build_and_push_image: + name: Build and Push Image + uses: zooniverse/ci-cd/.github/workflows/build_and_push_image.yaml@main + with: + repo_name: aggregation-for-caesar + commit_id: ${{ github.sha }} + latest: true + build_args: "REVISION=${{ github.sha }}" + + deploy_staging: + name: Deploy to Staging + uses: zooniverse/ci-cd/.github/workflows/deploy_app.yaml@main + needs: build_and_push_image + with: + app_name: aggregation + repo_name: aggregation-for-caesar + commit_id: ${{ github.sha }} + environment: staging + deploy_check: false + secrets: + creds: ${{ secrets.AZURE_AKS }} + + slack_notification: + name: Slack notification + uses: zooniverse/ci-cd/.github/workflows/slack_notification.yaml@main + needs: deploy_staging + if: always() + with: + commit_id: ${{ github.sha }} + job_name: Deploy to Staging / deploy_app + status: ${{ needs.deploy_staging.result }} + title: "Aggregation Staging deploy complete" + title_link: "https://aggregation-staging.zooniverse.org" + secrets: + slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }} diff --git a/docker-compose.yml b/docker-compose.yml index b1c2acd5..414ad5ba 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,10 +1,11 @@ -version: '2' +version: '3' services: aggregation: build: context: ./ args: REVISION: fake-git-sha-id + image: aggregation-for-caesar:local volumes: - ./:/usr/src/aggregation - ~/.aws:/root/.aws @@ -25,11 +26,8 @@ services: - redis:redis worker: - build: - context: ./ - args: - REVISION: fake-git-sha-id - command: celery --app panoptes_aggregation.tasks.celery worker --loglevel=info + image: aggregation-for-caesar:local + command: celery --app panoptes_aggregation.batch_aggregation.celery worker --loglevel=info volumes: - ./:/usr/src/aggregation environment: @@ -44,8 +42,8 @@ services: - redis dashboard: - build: . - command: celery --app panoptes_aggregation.tasks.celery flower --port=5555 --broker=redis://redis:6379/0 + image: aggregation-for-caesar:local + command: celery --app panoptes_aggregation.batch_aggregation.celery flower --port=5555 --broker=redis://redis:6379/0 ports: - 5556:5555 environment: diff --git a/kubernetes/deployment-staging.yaml b/kubernetes/deployment-staging.yaml new file mode 100644 index 00000000..659c14b1 --- /dev/null +++ b/kubernetes/deployment-staging.yaml @@ -0,0 +1,329 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: aggregation-staging-app + labels: + app: aggregation-staging-app +spec: + selector: + matchLabels: + app: aggregation-staging-app + template: + metadata: + labels: + app: aggregation-staging-app + spec: + containers: + - name: aggregation-staging-app + image: ghcr.io/zooniverse/aggregation-for-caesar:batch-aggregation-staging + ports: + - containerPort: 80 + resources: + requests: + memory: "500Mi" + cpu: "500m" + limits: + memory: "1000Mi" + cpu: "1000m" + startupProbe: + httpGet: + path: / + port: 80 + # wait 6 * 10 seconds(default periodSeconds) for the container to start + # after this succeeds once the liveness probe takes over + failureThreshold: 6 + livenessProbe: + httpGet: + path: / + port: 80 + # allow a longer response time than 1s + timeoutSeconds: 10 + readinessProbe: + httpGet: + path: / + port: 80 + # start checking for readiness after 20s (to serve traffic) + initialDelaySeconds: 20 + # allow a longer response time than 1s + timeoutSeconds: 10 + env: + - name: FLASK_ENV + value: production + - name: CELERY_BROKER_URL + value: redis://aggregation-staging-redis:6379/0 + - name: CELERY_RESULT_BACKEND + value: redis://aggregation-staging-redis:6379/0 + - name: PANOPTES_URL + value: https://panoptes-staging.zooniverse.org/ + - name: PANOPTES_CLIENT_ID + valueFrom: + secretKeyRef: + name: aggregation-staging-env + key: PANOPTES_CLIENT_ID + - name: PANOPTES_CLIENT_SECRET + valueFrom: + secretKeyRef: + name: aggregation-staging-env + key: PANOPTES_CLIENT_SECRET + - name: MAST_AUTH_TOKEN + valueFrom: + secretKeyRef: + name: aggregation-staging-env + key: MAST_AUTH_TOKEN + - name: MAST_PROD_TOKEN + valueFrom: + secretKeyRef: + name: aggregation-staging-env + key: MAST_PROD_TOKEN + - name: SENTRY_DSN + valueFrom: + secretKeyRef: + name: aggregation-staging-env + key: SENTRY_DSN + - name: NEW_RELIC_LICENSE_KEY + valueFrom: + secretKeyRef: + name: aggregation-staging-env + key: NEW_RELIC_LICENSE_KEY + - name: NEW_RELIC_APP_NAME + value: 'Aggregation Caesar (Staging)' + volumeMounts: + - name: aggregation-staging-volume + mountPath: /usr/src/aggregation/tmp + volumes: + - name: aggregation-staging-volume + persistentVolumeClaim: + claimName: aggregation-staging-data-storage +--- +apiVersion: v1 +kind: Service +metadata: + name: aggregation-staging-app +spec: + selector: + app: aggregation-staging-app + ports: + - protocol: TCP + port: 80 + targetPort: 80 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: aggregation-staging-celery + labels: + app: aggregation-staging-celery +spec: + selector: + matchLabels: + app: aggregation-staging-celery + template: + metadata: + labels: + app: aggregation-staging-celery + spec: + containers: + - name: aggregation-staging-celery + image: ghcr.io/zooniverse/aggregation-for-caesar:batch-aggregation-staging + resources: + requests: + memory: "500Mi" + cpu: "500m" + limits: + memory: "1000Mi" + cpu: "1000m" + livenessProbe: + exec: + command: + - sh + - -c + - celery inspect ping -d celery@$(hostname) | grep -q OK + initialDelaySeconds: 30 + periodSeconds: 30 + timeoutSeconds: 10 + failureThreshold: 3 + readinessProbe: + exec: + command: + - sh + - -c + - celery inspect ping -d celery@$(hostname) | grep -q OK + initialDelaySeconds: 60 + periodSeconds: 30 + timeoutSeconds: 10 + failureThreshold: 3 + args: ["/usr/src/aggregation/scripts/start-celery.sh"] + env: + - name: FLASK_ENV + value: production + - name: CELERY_BROKER_URL + value: redis://aggregation-staging-redis:6379/0 + - name: CELERY_RESULT_BACKEND + value: redis://aggregation-staging-redis:6379/0 + - name: PANOPTES_URL + value: https://panoptes-staging.zooniverse.org/ + - name: PANOPTES_CLIENT_ID + valueFrom: + secretKeyRef: + name: aggregation-staging-env + key: PANOPTES_CLIENT_ID + - name: PANOPTES_CLIENT_SECRET + valueFrom: + secretKeyRef: + name: aggregation-staging-env + key: PANOPTES_CLIENT_SECRET + - name: MAST_AUTH_TOKEN + valueFrom: + secretKeyRef: + name: aggregation-staging-env + key: MAST_AUTH_TOKEN + - name: MAST_PROD_TOKEN + valueFrom: + secretKeyRef: + name: aggregation-staging-env + key: MAST_PROD_TOKEN + - name: SENTRY_DSN + valueFrom: + secretKeyRef: + name: aggregation-staging-env + key: SENTRY_DSN + - name: NEW_RELIC_LICENSE_KEY + valueFrom: + secretKeyRef: + name: aggregation-staging-env + key: NEW_RELIC_LICENSE_KEY + - name: NEW_RELIC_APP_NAME + value: 'Aggregation Caesar (Staging)' + volumeMounts: + - name: aggregation-staging-volume + mountPath: /usr/src/aggregation/tmp + volumes: + - name: aggregation-staging-volume + persistentVolumeClaim: + claimName: aggregation-staging-data-storage +--- +apiVersion: v1 +kind: Service +metadata: + name: aggregation-staging-celery +spec: + selector: + app: aggregation-staging-celery + ports: + - protocol: TCP + port: 80 + targetPort: 80 +--- +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: aggregation-staging-redis +spec: + accessModes: + - ReadWriteOnce + storageClassName: azurefile + resources: + requests: + storage: 1Gi +--- +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: aggregation-staging-data-storage +spec: + accessModes: + - ReadWriteOnce + storageClassName: azurefile + resources: + requests: + storage: 20Gi +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: aggregation-staging-redis + labels: + app: aggregation-staging-redis +spec: + replicas: 1 + strategy: + type: Recreate + selector: + matchLabels: + app: aggregation-staging-redis + template: + metadata: + labels: + app: aggregation-staging-redis + spec: + tolerations: + - key: "servicelife" + operator: "Equal" + value: "longlife" + effect: "NoSchedule" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: servicelife + operator: In + values: + - longlife + containers: + - name: aggregation-staging-redis + image: redis + resources: + requests: + memory: "100Mi" + cpu: "10m" + limits: + memory: "100Mi" + cpu: "500m" + volumeMounts: + - name: aggregation-staging-redis-data + mountPath: "/data" + volumes: + - name: aggregation-staging-redis-data + persistentVolumeClaim: + claimName: aggregation-staging-redis +--- +apiVersion: v1 +kind: Service +metadata: + name: aggregation-staging-redis +spec: + selector: + app: aggregation-staging-redis + ports: + - protocol: TCP + port: 6379 + targetPort: 6379 + type: NodePort +--- +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: aggregation-staging-ingress + annotations: + kubernetes.io/ingress.class: nginx + cert-manager.io/cluster-issuer: letsencrypt-prod + nginx.ingress.kubernetes.io/proxy-buffer-size: "128k" + nginx.ingress.kubernetes.io/proxy-body-size: 20m + nginx.ingress.kubernetes.io/set-real-ip-from: "10.0.0.0/8" +spec: + tls: + - hosts: + - aggregation-staging.zooniverse.org + secretName: aggregation-staging-tls-secret + rules: + - host: aggregation-staging.zooniverse.org + http: + paths: + - pathType: Prefix + path: / + backend: + service: + name: aggregation-staging-app + port: + number: 80 diff --git a/panoptes_aggregation/batch_aggregation.py b/panoptes_aggregation/batch_aggregation.py index 330b11f2..e7e3788b 100644 --- a/panoptes_aggregation/batch_aggregation.py +++ b/panoptes_aggregation/batch_aggregation.py @@ -23,15 +23,20 @@ def run_aggregation(project_id, workflow_id, user_id): ba = BatchAggregator(project_id, workflow_id, user_id) if not ba.check_permission(): - print(f'Batch Aggregation: Unauthorized attempt by user {user_id} to aggregate workflow {workflow_id}') + print(f'[Batch Aggregation] Unauthorized attempt by user {user_id} to aggregate workflow {workflow_id}') # Exit the task gracefully without retrying or erroring sys.exit() + print(f'[Batch Aggregation] Run beginning for workflow {workflow_id} by user {user_id}') + + print(f'[Batch Aggregation] Saving exports for workflow {workflow_id})') ba.save_exports() + print(f'[Batch Aggregation] Processing exports for workflow {workflow_id})') ba.process_wf_export(ba.wf_csv) cls_df = ba.process_cls_export(ba.cls_csv) + print(f'[Batch Aggregation] Extacting workflow {workflow_id})') extractor_config = workflow_extractor_config(ba.tasks) extracted_data = batch_utils.batch_extract(cls_df, extractor_config) @@ -40,8 +45,10 @@ def run_aggregation(project_id, workflow_id, user_id): 'survey_extractor': ['survey_reducer'] } + print(f'[Batch Aggregation] Reducing workflow {workflow_id})') for task_type, extract_df in extracted_data.items(): - extract_df.to_csv(f'{ba.output_path}/{ba.workflow_id}_{task_type}.csv') + csv_filepath = os.path.join(ba.output_path, f'{ba.workflow_id}_{task_type}.csv') + extract_df.to_csv(csv_filepath) reducer_list = batch_standard_reducers[task_type] reduced_data = {} @@ -51,18 +58,20 @@ def run_aggregation(project_id, workflow_id, user_id): reducer_config = {'reducer_config': {reducer: {}}} reduced_data[reducer] = batch_utils.batch_reduce(extract_df, reducer_config) # filename = f'{ba.output_path}/{ba.workflow_id}_reductions.csv' - filename = os.path.join(ba.output_path, ba.workflow_id, '_reductions.csv') + filename = os.path.join(ba.output_path, f'{ba.workflow_id}_reductions.csv') reduced_data[reducer].to_csv(filename, mode='a') # Upload zip & reduction files to blob storage + print(f'[Batch Aggregation] Uploading results for {workflow_id})') ba.upload_files() # This could catch PanoptesAPIException, but what to do if it fails? + print(f'[Batch Aggregation] Updating Panoptes for {workflow_id})') success_attrs = {'uuid': ba.id, 'status': 'completed'} ba.update_panoptes(success_attrs) # STDOUT messages get printed to kubernetes logs - print(f'Batch Aggregation: Run successful for workflow {workflow_id} by user {user_id}') + print(f'[Batch Aggregation] Run successful for workflow {workflow_id} by user {user_id}') class BatchAggregator: @@ -71,15 +80,15 @@ class BatchAggregator: """ def __init__(self, project_id, workflow_id, user_id): - self.project_id = project_id - self.workflow_id = workflow_id - self.user_id = user_id + self.project_id = int(project_id) + self.workflow_id = int(workflow_id) + self.user_id = int(user_id) self._generate_uuid() self._connect_api_client() def save_exports(self): - self.output_path = os.path.join('tmp', str(self.workflow_id)) - os.mkdir(self.output_path) + self.output_path = os.path.join('tmp', str(self.id)) + os.makedirs(self.output_path) cls_export = Workflow(self.workflow_id).describe_export('classifications') full_cls_url = cls_export['media'][0]['src'] @@ -113,7 +122,7 @@ def process_cls_export(self, cls_csv): def connect_blob_storage(self): connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING') self.blob_service_client = BlobServiceClient.from_connection_string(connect_str) - self.blob_service_client.create_container(name=self.id) + self.blob_service_client.create_container(name=self.id, public_access='container') def upload_file_to_storage(self, container_name, filepath): blob = filepath.split('/')[-1] @@ -133,6 +142,9 @@ def update_panoptes(self, body_attributes): # An Aggregation class can be added to the python client to avoid doing this manually params = {'workflow_id': self.workflow_id} response = Panoptes.client().get('/aggregations', params=params) + if not response[0]['aggregations']: + print('[Batch Aggregation] Panoptes Aggregation resource not found. Unable to update.') + return False agg_id = response[0]['aggregations'][0]['id'] fresh_etag = response[1] @@ -146,7 +158,7 @@ def check_permission(self): project = Project.find(self.project_id) permission = False for user in project.collaborators(): - if user.id == self.user_id: + if user.id == str(self.user_id): permission = True return permission @@ -169,5 +181,6 @@ def _connect_api_client(self): Panoptes.connect( endpoint=os.getenv('PANOPTES_URL', 'https://panoptes.zooniverse.org/'), client_id=os.getenv('PANOPTES_CLIENT_ID'), - client_secret=os.getenv('PANOPTES_CLIENT_SECRET') + client_secret=os.getenv('PANOPTES_CLIENT_SECRET'), + admin='true' ) diff --git a/panoptes_aggregation/routes.py b/panoptes_aggregation/routes.py index 4a8e9daf..9d73648e 100644 --- a/panoptes_aggregation/routes.py +++ b/panoptes_aggregation/routes.py @@ -124,7 +124,7 @@ def run_aggregation(): workflow_id = content['workflow_id'] user_id = content['user_id'] task = batch_aggregation.run_aggregation.delay(project_id, workflow_id, user_id) - return json.dumps({"task_id": task.id}), 202 + return jsonify({"task_id": task.id}), 202 @application.route('/tasks/', methods=['GET']) def get_status(task_id): diff --git a/panoptes_aggregation/tests/batch_aggregation/test_batch_aggregation.py b/panoptes_aggregation/tests/batch_aggregation/test_batch_aggregation.py index d68ca476..1ca25376 100644 --- a/panoptes_aggregation/tests/batch_aggregation/test_batch_aggregation.py +++ b/panoptes_aggregation/tests/batch_aggregation/test_batch_aggregation.py @@ -43,23 +43,24 @@ def test_run_aggregation_success(self, mock_aggregator, mock_wf_ext_conf): mock_aggregator_instance.upload_files.assert_called_once() mock_aggregator_instance.update_panoptes.assert_called_once() - @patch("panoptes_aggregation.batch_aggregation.os.mkdir") + @patch("panoptes_aggregation.batch_aggregation.os.makedirs") @patch("panoptes_aggregation.batch_aggregation.Workflow") @patch("panoptes_aggregation.batch_aggregation.Project") - def test_save_exports(self, mock_project, mock_workflow, mock_mkdir): + def test_save_exports(self, mock_project, mock_workflow, mock_makedirs): # Test that Panoptes calls are made and files are saved csv_dict = {'media': [{'src': 'http://zooniverse.org/123.csv'}]} mock_project.return_value.describe_export.return_value = csv_dict mock_workflow.return_value.describe_export.return_value = csv_dict ba = batch_agg.BatchAggregator(1, 10, 100) + ba.id = 'asdf123asdf' batch_agg.BatchAggregator._download_export = MagicMock(side_effect=['./cls_export.csv', './wf_export.csv']) - expected_response = {'classifications': 'tmp/10/10_cls_export.csv', 'workflows': 'tmp/10/10_workflow_export.csv'} + expected_response = {'classifications': 'tmp/asdf123asdf/10_cls_export.csv', 'workflows': 'tmp/asdf123asdf/10_workflow_export.csv'} response = ba.save_exports() assert ba.id is not None self.assertEqual(response, expected_response) - mock_mkdir.assert_called_once() + mock_makedirs.assert_called_once() mock_project.assert_called_once_with(1) mock_workflow.assert_called_once_with(10) mock_project.return_value.describe_export.assert_called_once_with('workflows') @@ -103,7 +104,8 @@ def test_upload_file_to_storage(self): @patch("panoptes_aggregation.batch_aggregation.Project") def test_check_permission_success(self, mock_project): mock_user = MagicMock() - mock_user.id = 100 + # Panoptes responses return strings + mock_user.id = '100' mock_project.find().collaborators.return_value = [mock_user] ba = batch_agg.BatchAggregator(1, 10, 100) @@ -117,7 +119,7 @@ def test_check_permission_failure(self, mock_project): mock_user = MagicMock() # List of collaborators does not include initiating user - mock_user.id = 999 + mock_user.id = '999' mock_project.find().collaborators.return_value = [mock_user] ba = batch_agg.BatchAggregator(1, 10, 100) @@ -130,7 +132,7 @@ def test_check_permission_failure(self, mock_project): @patch("panoptes_aggregation.batch_aggregation.Panoptes.put") @patch("panoptes_aggregation.batch_aggregation.Panoptes.get") - def test_update_panoptes_success(self, mock_get, mock_put): + def test_update_panoptes_run_success(self, mock_get, mock_put): ba = batch_agg.BatchAggregator(1, 10, 100) mock_get.return_value = ({'aggregations': [{'id': 5555}]}, 'thisisanetag') body = {'uuid': ba.id, 'status': 'completed'} @@ -140,7 +142,7 @@ def test_update_panoptes_success(self, mock_get, mock_put): @patch("panoptes_aggregation.batch_aggregation.Panoptes.put") @patch("panoptes_aggregation.batch_aggregation.Panoptes.get") - def test_update_panoptes_failure(self, mock_get, mock_put): + def test_update_panoptes_run_failure(self, mock_get, mock_put): ba = batch_agg.BatchAggregator(1, 10, 100) mock_get.return_value = ({'aggregations': [{'id': 5555}]}, 'thisisanetag') body = {'status': 'failure'} @@ -148,8 +150,18 @@ def test_update_panoptes_failure(self, mock_get, mock_put): mock_get.assert_called_with('/aggregations', params={'workflow_id': 10}) mock_put.assert_called_with('/aggregations/5555', etag='thisisanetag', json={'aggregations': body}) + @patch("panoptes_aggregation.batch_aggregation.Panoptes.put") + @patch("panoptes_aggregation.batch_aggregation.Panoptes.get") + def test_update_panoptes_get_failure(self, mock_get, mock_put): + ba = batch_agg.BatchAggregator(1, 10, 100) + mock_get.return_value = ({'aggregations': []}, 'etag') + body = {'status': 'failure'} + ba.update_panoptes(body) + mock_get.assert_called_with('/aggregations', params={'workflow_id': 10}) + mock_put.assert_not_called() + @patch("panoptes_aggregation.batch_aggregation.BlobServiceClient") def test_connect_blob_storage(self, mock_client): ba = batch_agg.BatchAggregator(1, 10, 100) ba.connect_blob_storage() - ba.blob_service_client.create_container.assert_called_once_with(name=ba.id) + ba.blob_service_client.create_container.assert_called_once_with(name=ba.id, public_access='container') diff --git a/scripts/start-celery.sh b/scripts/start-celery.sh index 082f6d72..3cd41639 100755 --- a/scripts/start-celery.sh +++ b/scripts/start-celery.sh @@ -1,3 +1,3 @@ #!/bin/bash -e -exec celery --app panoptes_aggregation.tasks.celery worker --loglevel=info +exec celery --app panoptes_aggregation.batch_aggregation.celery worker --loglevel=info diff --git a/scripts/start-flower.sh b/scripts/start-flower.sh index 68789359..bb39483c 100755 --- a/scripts/start-flower.sh +++ b/scripts/start-flower.sh @@ -1,4 +1,4 @@ #!/bin/bash -e BROKER=${CELERY_BROKER_URL:='redis://redis:6379/0'} -exec celery --app panoptes_aggregation.tasks.celery flower --port=5555 --broker=$BROKER +exec celery --app panoptes_aggregation.batch_aggregation.celery flower --port=5555 --broker=$BROKER From 178570e79b2010b22f0ab2d5240e2fd56f81f2b5 Mon Sep 17 00:00:00 2001 From: Zach Wolfenbarger Date: Mon, 1 Jul 2024 18:17:09 -0500 Subject: [PATCH 3/3] Use tag deployment for production (#788) * Use tag deployment for production * Add batchagg resources to prod template --- .../{deploy_app.yml => deploy_production.yml} | 8 +- kubernetes/deployment-production.tmpl | 234 +++++++++++++++++- 2 files changed, 226 insertions(+), 16 deletions(-) rename .github/workflows/{deploy_app.yml => deploy_production.yml} (90%) diff --git a/.github/workflows/deploy_app.yml b/.github/workflows/deploy_production.yml similarity index 90% rename from .github/workflows/deploy_app.yml rename to .github/workflows/deploy_production.yml index 7393a06f..b207ac58 100644 --- a/.github/workflows/deploy_app.yml +++ b/.github/workflows/deploy_production.yml @@ -2,8 +2,8 @@ name: Deploy to Production on: push: - branches: - - master + tags: + - production-release workflow_dispatch: jobs: @@ -21,7 +21,7 @@ jobs: uses: zooniverse/ci-cd/.github/workflows/deploy_app.yaml@main needs: build_and_push_image with: - app_name: aggregation-caesar + app_name: aggregation repo_name: aggregation-for-caesar commit_id: ${{ github.sha }} environment: production @@ -38,7 +38,7 @@ jobs: commit_id: ${{ github.sha }} job_name: Deploy to Production / deploy_app status: ${{ needs.deploy_production.result }} - title: "Aggregation deploy complete" + title: "Aggregation Production deploy complete" title_link: "https://aggregation-caesar.zooniverse.org" secrets: slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }} diff --git a/kubernetes/deployment-production.tmpl b/kubernetes/deployment-production.tmpl index c54084fe..458accf8 100644 --- a/kubernetes/deployment-production.tmpl +++ b/kubernetes/deployment-production.tmpl @@ -1,20 +1,20 @@ apiVersion: apps/v1 kind: Deployment metadata: - name: aggregation-caesar + name: aggregation-production-app labels: - app: aggregation-caesar + app: aggregation-production-app spec: selector: matchLabels: - app: aggregation-caesar + app: aggregation-production-app template: metadata: labels: - app: aggregation-caesar + app: aggregation-production-app spec: containers: - - name: aggregation-caesar-app + - name: aggregation-production-app image: ghcr.io/zooniverse/aggregation-for-caesar:__IMAGE_TAG__ ports: - containerPort: 80 @@ -51,6 +51,10 @@ spec: value: production - name: PANOPTES_URL value: https://panoptes.zooniverse.org/ + - name: CELERY_BROKER_URL + value: redis://aggregation-production-redis:6379/0 + - name: CELERY_RESULT_BACKEND + value: redis://aggregation-production-redis:6379/0 - name: PANOPTES_CLIENT_ID valueFrom: secretKeyRef: @@ -61,6 +65,11 @@ spec: secretKeyRef: name: aggregation-for-caesar-environment key: PANOPTES_CLIENT_SECRET + - name: AZURE_STORAGE_CONNECTION_STRING + valueFrom: + secretKeyRef: + name: aggregation-for-caesar-environment + key: AZURE_STORAGE_CONNECTION_STRING - name: MAST_AUTH_TOKEN valueFrom: secretKeyRef: @@ -83,16 +92,23 @@ spec: key: NEW_RELIC_LICENSE_KEY - name: NEW_RELIC_APP_NAME value: 'Aggregation Caesar' + volumeMounts: + - name: aggregation-production-volume + mountPath: /usr/src/aggregation/tmp + volumes: + - name: aggregation-production-volume + persistentVolumeClaim: + claimName: aggregation-production-data-storage --- apiVersion: autoscaling/v1 kind: HorizontalPodAutoscaler metadata: - name: aggregation-caesar + name: aggregation-production-app spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment - name: aggregation-caesar + name: aggregation-production-app minReplicas: 2 maxReplicas: 3 targetCPUUtilizationPercentage: 80 @@ -100,21 +116,215 @@ spec: apiVersion: policy/v1 kind: PodDisruptionBudget metadata: - name: aggregation-caesar + name: aggregation-production-app spec: minAvailable: 50% selector: matchLabels: - app: aggregation-caesar + app: aggregation-production-app +--- +apiVersion: v1 +kind: Service +metadata: + name: aggregation-production-app +spec: + selector: + app: aggregation-production-app + ports: + - protocol: TCP + port: 80 + targetPort: 80 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: aggregation-production-celery + labels: + app: aggregation-production-celery +spec: + selector: + matchLabels: + app: aggregation-production-celery + template: + metadata: + labels: + app: aggregation-production-celery + spec: + containers: + - name: aggregation-production-celery + image: ghcr.io/zooniverse/aggregation-for-caesar:__IMAGE_TAG__ + resources: + requests: + memory: "500Mi" + cpu: "500m" + limits: + memory: "1000Mi" + cpu: "1000m" + livenessProbe: + exec: + command: + - sh + - -c + - celery inspect ping -d celery@$(hostname) | grep -q OK + initialDelaySeconds: 30 + periodSeconds: 30 + timeoutSeconds: 10 + failureThreshold: 3 + readinessProbe: + exec: + command: + - sh + - -c + - celery inspect ping -d celery@$(hostname) | grep -q OK + initialDelaySeconds: 60 + periodSeconds: 30 + timeoutSeconds: 10 + failureThreshold: 3 + args: ["/usr/src/aggregation/scripts/start-celery.sh"] + env: + - name: FLASK_ENV + value: production + - name: CELERY_BROKER_URL + value: redis://aggregation-production-redis:6379/0 + - name: CELERY_RESULT_BACKEND + value: redis://aggregation-production-redis:6379/0 + - name: PANOPTES_URL + value: https://panoptes.zooniverse.org/ + - name: PANOPTES_CLIENT_ID + valueFrom: + secretKeyRef: + name: aggregation-for-caesar-environment + key: PANOPTES_CLIENT_ID + - name: PANOPTES_CLIENT_SECRET + valueFrom: + secretKeyRef: + name: aggregation-for-caesar-environment + key: PANOPTES_CLIENT_SECRET + - name: MAST_AUTH_TOKEN + valueFrom: + secretKeyRef: + name: aggregation-for-caesar-environment + key: MAST_AUTH_TOKEN + - name: MAST_PROD_TOKEN + valueFrom: + secretKeyRef: + name: aggregation-for-caesar-environment + key: MAST_PROD_TOKEN + - name: SENTRY_DSN + valueFrom: + secretKeyRef: + name: aggregation-for-caesar-environment + key: SENTRY_DSN + - name: NEW_RELIC_LICENSE_KEY + valueFrom: + secretKeyRef: + name: aggregation-for-caesar-environment + key: NEW_RELIC_LICENSE_KEY + - name: NEW_RELIC_APP_NAME + value: 'Aggregation Caesar (Production)' + volumeMounts: + - name: aggregation-production-volume + mountPath: /usr/src/aggregation/tmp + volumes: + - name: aggregation-production-volume + persistentVolumeClaim: + claimName: aggregation-production-data-storage --- apiVersion: v1 kind: Service metadata: - name: aggregation-caesar + name: aggregation-production-celery spec: selector: - app: aggregation-caesar + app: aggregation-production-celery ports: - protocol: TCP port: 80 - targetPort: 80 \ No newline at end of file + targetPort: 80 +--- +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: aggregation-production-redis +spec: + accessModes: + - ReadWriteOnce + storageClassName: azurefile + resources: + requests: + storage: 1Gi +--- +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: aggregation-production-data-storage +spec: + accessModes: + - ReadWriteOnce + storageClassName: azurefile + resources: + requests: + storage: 20Gi +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: aggregation-production-redis + labels: + app: aggregation-production-redis +spec: + replicas: 1 + strategy: + type: Recreate + selector: + matchLabels: + app: aggregation-production-redis + template: + metadata: + labels: + app: aggregation-production-redis + spec: + tolerations: + - key: "servicelife" + operator: "Equal" + value: "longlife" + effect: "NoSchedule" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: servicelife + operator: In + values: + - longlife + containers: + - name: aggregation-production-redis + image: redis + resources: + requests: + memory: "100Mi" + cpu: "10m" + limits: + memory: "100Mi" + cpu: "500m" + volumeMounts: + - name: aggregation-production-redis-data + mountPath: "/data" + volumes: + - name: aggregation-production-redis-data + persistentVolumeClaim: + claimName: aggregation-production-redis +--- +apiVersion: v1 +kind: Service +metadata: + name: aggregation-production-redis +spec: + selector: + app: aggregation-production-redis + ports: + - protocol: TCP + port: 6379 + targetPort: 6379 + type: NodePort