diff --git a/panoptes_aggregation/batch_aggregation.py b/panoptes_aggregation/batch_aggregation.py index 93134ffc..16b02d6b 100644 --- a/panoptes_aggregation/batch_aggregation.py +++ b/panoptes_aggregation/batch_aggregation.py @@ -1,12 +1,13 @@ from celery import Celery -import csv +import json import pandas as pd import os import urllib3 -import numpy as np import pandas as pd from panoptes_client import Panoptes, Project, Workflow +from panoptes_aggregation.workflow_config import workflow_extractor_config, workflow_reducer_config +from panoptes_aggregation.scripts import batch_utils from panoptes_client.panoptes import PanoptesAPIException import logging @@ -25,6 +26,11 @@ def run_aggregation(project_id, workflow_id, user_id): wf_df = ba.process_wf_export(ba.wf_csv) cls_df = ba.process_cls_export(ba.cls_csv) + extractor_config = workflow_extractor_config(ba.tasks) + extracted_data = batch_utils.batch_extract(cls_df, extractor_config) + + reducer_config = workflow_reducer_config(extractor_config) + reduced_data = batch_utils.batch_reduce(extracted_data, reducer_config) class BatchAggregator: """ @@ -54,11 +60,14 @@ def process_wf_export(self, wf_csv): self.wf_df = pd.read_csv(wf_csv) self.wf_maj_version = self.wf_df.query(f'workflow_id == {self.workflow_id}')['version'].max() self.wf_min_version = self.wf_df.query(f'workflow_id == {self.workflow_id} & version == {self.wf_maj_version}')['minor_version'].max() + self.workflow_version = f'{self.wf_maj_version}.{self.wf_min_version}' + self.workflow_row = self.wf_df.query(f'workflow_id == {self.workflow_id} & minor_version == {self.wf_min_version}') + self.tasks = json.loads(self.workflow_row.iloc[0]['tasks']) return self.wf_df def process_cls_export(self, cls_csv): cls_df = pd.read_csv(cls_csv) - self.cls_df = cls_df.query(f'workflow_version == {self.wf_maj_version}.{self.wf_min_version}') + self.cls_df = cls_df.query(f'workflow_version == {self.workflow_version}') return self.cls_df def _download_export(self, url, filepath):