Skip to content

Commit

Permalink
Starting unknown flow for the packages missing latest version in dail…
Browse files Browse the repository at this point in the history
…y ingestion report
  • Loading branch information
jparsai committed Jul 28, 2020
1 parent 7fbf579 commit e3aa237
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 3 deletions.
2 changes: 0 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ RUN yum install -y epel-release &&\
mkdir -p ${APP_DIR}

RUN pip3 install --upgrade pip
RUN pip3 install git+https://github.com/fabric8-analytics/fabric8-analytics-utils.git@${F8A_UTIL_VERSION}
RUN pip3 install git+https://[email protected]/fabric8-analytics/fabric8-analytics-version-comparator.git#egg=f8a_version_comparator

COPY f8a_report/ ${APP_DIR}/f8a_report
COPY requirements.txt ${APP_DIR}
Expand Down
48 changes: 48 additions & 0 deletions f8a_report/ingestion_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Ingestion functions used to trigger a flow."""

import logging
from selinon import run_flow
from f8a_worker.utils import MavenCoordinates
import os

logger = logging.getLogger(__name__)


def server_create_analysis(ecosystem, package, version, force=False, force_graph_sync=False):
"""Create bayesianApiFlow handling analyses for specified EPV.
:param ecosystem: ecosystem for which the flow should be run
:param package: package for which should be flow run
:param version: package version
:param force: force run flow even specified EPV exists
:param force_graph_sync: force synchronization to graph
:return: dispatcher ID handling flow
"""
component = MavenCoordinates.normalize_str(package) if ecosystem == 'maven' else package

args = {
"ecosystem": ecosystem,
"force": force,
"force_graph_sync": force_graph_sync,
"name": component,
"recursive_limit": 0,
"version": version
}

if os.environ.get("WORKER_ADMINISTRATION_REGION", "") == "api":
return server_run_flow('bayesianApiFlow', args)
else:
return server_run_flow('bayesianFlow', args)


def server_run_flow(flow_name, flow_args):
"""Run a flow.
:param flow_name: name of flow to be run as stated in YAML config file
:param flow_args: arguments for the flow
:return: dispatcher ID handling flow
"""
logger.info('Running flow {} for args {}'.format(flow_name, flow_args))
dispacher_id = run_flow(flow_name, flow_args)

return dispacher_id
32 changes: 32 additions & 0 deletions f8a_report/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from v2.report_generator import StackReportBuilder
from manifest_helper import manifest_interface
import os
from f8a_worker.setup_celery import init_selinon, init_celery
from ingestion_helper import server_create_analysis
from s3_helper import S3Helper

logger = logging.getLogger(__file__)

Expand Down Expand Up @@ -80,3 +83,32 @@ def main():

if __name__ == '__main__':
main()

# initializing Selinon queues and celery workers
if os.environ.get("INVOKE_API_WORKERS", "") == "1":
init_selinon()
init_celery(result_backend=False)

# Starting ingestion flow
today = dt.today()
end_date = today.strftime('%Y-%m-%d')

report_type = 'ingestion-data'
report_name = dt.strptime(end_date, '%Y-%m-%d').strftime('%Y-%m-%d')

s3 = S3Helper()
obj_key = '{type}/epv/{report_name}.json'.format(
type=report_type, report_name=report_name
)
ingestion_report = s3.read_json_object(bucket_name=os.environ.get('REPORT_BUCKET_NAME'),
obj_key=obj_key) or {}

ingestion_summary = ingestion_report['ingestion_summary']['missing_latest_node']

for eco, items in ingestion_summary.items():
for item in items:
logger.info("Starting ingestion flow for eco= {}, pkg= {}, ver= {}"
.format(eco, item['package'], item['version']))
dispatcher_id = server_create_analysis(eco, item['package'], item['version'],
force=True, force_graph_sync=False)
logger.info("Dispatcher_ID= {}".format(dispatcher_id))
16 changes: 15 additions & 1 deletion openshift/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,20 @@ objects:
value: ${PYPI_TRAINING_REPO}
- name: GREMLIN_QUERY_SIZE
value: "25"
- name: WORKER_ADMINISTRATION_REGION
value: "api"
- name: INVOKE_API_WORKERS
value: "1"
- name: AWS_SQS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-sqs
key: aws_access_key_id
- name: AWS_SQS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-sqs
key: aws_secret_access_key
resources:
requests:
memory: ${MEMORY_REQUEST}
Expand Down Expand Up @@ -231,7 +245,7 @@ parameters:
name: MEMORY_REQUEST
value: "1024Mi"

- description: Memory limit
- description: Memory limit
displayName: Memory limit
required: true
name: MEMORY_LIMIT
Expand Down
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,9 @@ xmltodict==0.12.0 # via moto
zipp==3.1.0 # via importlib-metadata
zope.interface==4.6.0 # via datetime

git+https://github.com/fabric8-analytics/fabric8-analytics-worker.git@58d3025
git+https://github.com/fabric8-analytics/fabric8-analytics-utils.git@3bca34e
git+https://github.com/fabric8-analytics/fabric8-analytics-version-comparator.git#egg=f8a_version_comparator

# The following packages are considered to be unsafe in a requirements file:
# setuptools

0 comments on commit e3aa237

Please sign in to comment.