Skip to content

Commit

Permalink
Taking a swing at extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
zwolf committed Feb 27, 2024
1 parent 477da25 commit 5703918
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions panoptes_aggregation/batch_aggregation.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from celery import Celery
import csv
import json
import pandas as pd
import os
import urllib3
import numpy as np
import pandas as pd

from panoptes_client import Panoptes, Project, Workflow
from panoptes_aggregation.workflow_config import workflow_extractor_config, workflow_reducer_config
from panoptes_aggregation.scripts import batch_utils
from panoptes_client.panoptes import PanoptesAPIException

import logging
Expand All @@ -25,6 +26,11 @@ def run_aggregation(project_id, workflow_id, user_id):
wf_df = ba.process_wf_export(ba.wf_csv)
cls_df = ba.process_cls_export(ba.cls_csv)

extractor_config = workflow_extractor_config(ba.tasks)
extracted_data = batch_utils.batch_extract(cls_df, extractor_config)

reducer_config = workflow_reducer_config(extractor_config)
reduced_data = batch_utils.batch_reduce(extracted_data, reducer_config)

class BatchAggregator:
"""
Expand Down Expand Up @@ -54,11 +60,14 @@ def process_wf_export(self, wf_csv):
self.wf_df = pd.read_csv(wf_csv)
self.wf_maj_version = self.wf_df.query(f'workflow_id == {self.workflow_id}')['version'].max()
self.wf_min_version = self.wf_df.query(f'workflow_id == {self.workflow_id} & version == {self.wf_maj_version}')['minor_version'].max()
self.workflow_version = f'{self.wf_maj_version}.{self.wf_min_version}'
self.workflow_row = self.wf_df.query(f'workflow_id == {self.workflow_id} & minor_version == {self.wf_min_version}')
self.tasks = json.loads(self.workflow_row.iloc[0]['tasks'])
return self.wf_df

def process_cls_export(self, cls_csv):
cls_df = pd.read_csv(cls_csv)
self.cls_df = cls_df.query(f'workflow_version == {self.wf_maj_version}.{self.wf_min_version}')
self.cls_df = cls_df.query(f'workflow_version == {self.workflow_version}')
return self.cls_df

def _download_export(self, url, filepath):
Expand Down

0 comments on commit 5703918

Please sign in to comment.