From e3aa237ab6ab0459e8c2f1c4e3713661e1cb8405 Mon Sep 17 00:00:00 2001 From: jparsai Date: Mon, 27 Jul 2020 22:09:23 +0530 Subject: [PATCH] Starting unknown flow for the packages missing latest version in daily ingestion report --- Dockerfile | 2 -- f8a_report/ingestion_helper.py | 48 ++++++++++++++++++++++++++++++++++ f8a_report/main.py | 32 +++++++++++++++++++++++ openshift/template.yaml | 16 +++++++++++- requirements.txt | 4 +++ 5 files changed, 99 insertions(+), 3 deletions(-) create mode 100644 f8a_report/ingestion_helper.py diff --git a/Dockerfile b/Dockerfile index 154cf6c..b828ac7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,8 +11,6 @@ RUN yum install -y epel-release &&\ mkdir -p ${APP_DIR} RUN pip3 install --upgrade pip -RUN pip3 install git+https://github.com/fabric8-analytics/fabric8-analytics-utils.git@${F8A_UTIL_VERSION} -RUN pip3 install git+https://git@github.com/fabric8-analytics/fabric8-analytics-version-comparator.git#egg=f8a_version_comparator COPY f8a_report/ ${APP_DIR}/f8a_report COPY requirements.txt ${APP_DIR} diff --git a/f8a_report/ingestion_helper.py b/f8a_report/ingestion_helper.py new file mode 100644 index 0000000..e57c334 --- /dev/null +++ b/f8a_report/ingestion_helper.py @@ -0,0 +1,48 @@ +"""Ingestion functions used to trigger a flow.""" + +import logging +from selinon import run_flow +from f8a_worker.utils import MavenCoordinates +import os + +logger = logging.getLogger(__name__) + + +def server_create_analysis(ecosystem, package, version, force=False, force_graph_sync=False): + """Create bayesianApiFlow handling analyses for specified EPV. + + :param ecosystem: ecosystem for which the flow should be run + :param package: package for which should be flow run + :param version: package version + :param force: force run flow even specified EPV exists + :param force_graph_sync: force synchronization to graph + :return: dispatcher ID handling flow + """ + component = MavenCoordinates.normalize_str(package) if ecosystem == 'maven' else package + + args = { + "ecosystem": ecosystem, + "force": force, + "force_graph_sync": force_graph_sync, + "name": component, + "recursive_limit": 0, + "version": version + } + + if os.environ.get("WORKER_ADMINISTRATION_REGION", "") == "api": + return server_run_flow('bayesianApiFlow', args) + else: + return server_run_flow('bayesianFlow', args) + + +def server_run_flow(flow_name, flow_args): + """Run a flow. + + :param flow_name: name of flow to be run as stated in YAML config file + :param flow_args: arguments for the flow + :return: dispatcher ID handling flow + """ + logger.info('Running flow {} for args {}'.format(flow_name, flow_args)) + dispacher_id = run_flow(flow_name, flow_args) + + return dispacher_id diff --git a/f8a_report/main.py b/f8a_report/main.py index 0185622..5c7b503 100644 --- a/f8a_report/main.py +++ b/f8a_report/main.py @@ -6,6 +6,9 @@ from v2.report_generator import StackReportBuilder from manifest_helper import manifest_interface import os +from f8a_worker.setup_celery import init_selinon, init_celery +from ingestion_helper import server_create_analysis +from s3_helper import S3Helper logger = logging.getLogger(__file__) @@ -80,3 +83,32 @@ def main(): if __name__ == '__main__': main() + + # initializing Selinon queues and celery workers + if os.environ.get("INVOKE_API_WORKERS", "") == "1": + init_selinon() + init_celery(result_backend=False) + + # Starting ingestion flow + today = dt.today() + end_date = today.strftime('%Y-%m-%d') + + report_type = 'ingestion-data' + report_name = dt.strptime(end_date, '%Y-%m-%d').strftime('%Y-%m-%d') + + s3 = S3Helper() + obj_key = '{type}/epv/{report_name}.json'.format( + type=report_type, report_name=report_name + ) + ingestion_report = s3.read_json_object(bucket_name=os.environ.get('REPORT_BUCKET_NAME'), + obj_key=obj_key) or {} + + ingestion_summary = ingestion_report['ingestion_summary']['missing_latest_node'] + + for eco, items in ingestion_summary.items(): + for item in items: + logger.info("Starting ingestion flow for eco= {}, pkg= {}, ver= {}" + .format(eco, item['package'], item['version'])) + dispatcher_id = server_create_analysis(eco, item['package'], item['version'], + force=True, force_graph_sync=False) + logger.info("Dispatcher_ID= {}".format(dispatcher_id)) diff --git a/openshift/template.yaml b/openshift/template.yaml index 4ed50a2..556143c 100644 --- a/openshift/template.yaml +++ b/openshift/template.yaml @@ -181,6 +181,20 @@ objects: value: ${PYPI_TRAINING_REPO} - name: GREMLIN_QUERY_SIZE value: "25" + - name: WORKER_ADMINISTRATION_REGION + value: "api" + - name: INVOKE_API_WORKERS + value: "1" + - name: AWS_SQS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: aws-sqs + key: aws_access_key_id + - name: AWS_SQS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: aws-sqs + key: aws_secret_access_key resources: requests: memory: ${MEMORY_REQUEST} @@ -231,7 +245,7 @@ parameters: name: MEMORY_REQUEST value: "1024Mi" -- description: Memory limit +- description: Memory limit displayName: Memory limit required: true name: MEMORY_LIMIT diff --git a/requirements.txt b/requirements.txt index c864418..ae470d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -61,5 +61,9 @@ xmltodict==0.12.0 # via moto zipp==3.1.0 # via importlib-metadata zope.interface==4.6.0 # via datetime +git+https://github.com/fabric8-analytics/fabric8-analytics-worker.git@58d3025 +git+https://github.com/fabric8-analytics/fabric8-analytics-utils.git@3bca34e +git+https://github.com/fabric8-analytics/fabric8-analytics-version-comparator.git#egg=f8a_version_comparator + # The following packages are considered to be unsafe in a requirements file: # setuptools