Skip to content

Commit

Permalink
Merge pull request #368 from sanger/DPL-034-wrap-crawler-in-flask
Browse files Browse the repository at this point in the history
Dpl 034 wrap crawler in flask
  • Loading branch information
sdjmchattie authored Jul 6, 2021
2 parents ae56a2f + 202c69d commit 2b07fce
Show file tree
Hide file tree
Showing 17 changed files with 422 additions and 220 deletions.
1 change: 0 additions & 1 deletion .env.example

This file was deleted.

14 changes: 14 additions & 0 deletions .flaskenv
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# use publicly acessible env variables in this file
# https://flask.palletsprojects.com/en/1.1.x/cli/#environment-variables-from-dotenv

# https://flask.palletsprojects.com/en/1.1.x/cli/#application-discovery
FLASK_APP=crawler

# https://flask.palletsprojects.com/en/1.1.x/cli/#setting-command-options
FLASK_RUN_HOST=0.0.0.0
FLASK_RUN_PORT=8000

# https://flask.palletsprojects.com/en/1.1.x/config/#environment-and-debug-features
FLASK_ENV=development

SETTINGS_MODULE=crawler.config.development
2 changes: 1 addition & 1 deletion .release-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.17.0
1.18.0
9 changes: 6 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ COPY . .
# "The best use for ENTRYPOINT is to set the image’s main command, allowing that image to be run as though it was that
# command (and then use CMD as the default flags)."
# https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#entrypoint
# have a look in .flaskenv for configured run options
ENTRYPOINT ["python", "runner.py"]
CMD ["--sftp", "--scheduled", "--add-to-dart"]
ENTRYPOINT ["flask"]
CMD ["run"]

# https://docs.docker.com/engine/reference/builder/#healthcheck
HEALTHCHECK --interval=1m --timeout=3s \
CMD curl -f http://localhost:8000/health || exit 1
16 changes: 10 additions & 6 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ black = "*"
types-python-dateutil = "*"

[packages]
pymongo = "~=3.10"
colorlog = "~=5.0"
flask = "~=2.0"
flask-apscheduler = "~=1.11"
flask-cors = "~=3.0"
gunicorn = "~=20.1"
more-itertools = "~=8.6"
mysql-connector-python = "~=8.0"
pandas = "~=1.1"
pymongo = "~=3.10"
pymysql = "~=1.0"
pyodbc = "~=4.0"
colorlog = "~=5.0"
schedule = "~=1.0"
pysftp = "~=0.2"
python-dotenv = "~=0.17"
slackclient = "~=2.5"
more-itertools = "~=8.6"
pandas = "~=1.1"
sqlalchemy = "~=1.3"
pymysql = "~=1.0"

[requires]
python_version = "3.8"
Expand Down
415 changes: 264 additions & 151 deletions Pipfile.lock

Large diffs are not rendered by default.

44 changes: 27 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ The following tools are required for development:

### Configuring Environment

Create a `.env` file (or copy and rename the `.env.example`) file with the following values:
The app is set to run with development settings when not deployed via Ansible.
To change this you can update the line in `.flaskenv` to another module if desired:

SETTINGS_MODULE=crawler.config.development

Expand All @@ -75,32 +76,41 @@ Once all the required packages are installed, enter the virtual environment with

pipenv shell

The following runtime flags are available:
To then run the app, use the command:

flask run

This will cause the crawler to execute an ingest every 30 minutes, triggered by cron, so at 10 and 40 minutes past the hour.
This scheduled behaviour can be turned off by adding the following to the `development.py` file:

SCHEDULER_RUN = False

You can also adjust the behaviour of the scheduled ingest using the settings in the same file.
To run an ingest immediately, whether Flask is running or not, the `runner.py` file can be used with the arguments shown:

python runner.py --help

usage: runner.py [-h] [--sftp] [--scheduled]
usage: runner.py [-h] [--sftp] [--keep-files] [--add-to-dart] [--centre_prefix {ALDP,MILK,QEUH,CAMC,RAND,HSLL,PLYM,BRBR}]

Store external samples in mongo.
Parse CSV files from the Lighthouse Labs and store the sample information in MongoDB

optional arguments:
-h, --help show this help message and exit
--scheduled start scheduled execution, defaults to running once
--sftp use SFTP to download CSV files, defaults to using local files
--keep-files keeps the CSV files after the runner has been executed
--add-to-dart add samples to DART, by default they are not
-h, --help show this help message and exit
--sftp use SFTP to download CSV files, defaults to using local files
--keep-files keeps the CSV files after the runner has been executed
--add-to-dart on processing samples, also add them to DART
--centre_prefix {ALDP,MILK,QEUH,CAMC,RAND,HSLL,PLYM,BRBR}
process only this centre's plate map files

## Migrations

### Updating the MLWH `lighthouse_sample` Table

When the crawler process runs nightly it should be updating the MLWH lighthouse_sample table as it goes with records for
all rows that are inserted into MongoDB. If that MLWH insert process fails you should see a critical exception for the
file in Lighthouse-UI. This may be after records inserted correctly into MongoDB, and re-running the file will not
re-attempt the MLWH inserts in that situation.
When the crawler process runs every 30 minutes it should be updating the MLWH lighthouse_sample table as it goes with records for all rows that are inserted into MongoDB.
If that MLWH insert process fails you should see a critical exception for the file in Lighthouse-UI.
This may be after records inserted correctly into MongoDB, and re-running the file will not re-attempt the MLWH inserts in that situation.

There is a manual migration task that can be run to fix this discrepancy (update_mlwh_with_legacy_samples) that allows
insertion of rows to the MLWH between two MongoDB `created_at` datetimes.
There is a manual migration task that can be run to fix this discrepancy (update_mlwh_with_legacy_samples) that allows insertion of rows to the MLWH between two MongoDB `created_at` datetimes.

__NB__: Both datetimes are inclusive: range includes those rows greater than or equal to start datetime, and less than
or equal to end datetime.
Expand Down Expand Up @@ -265,8 +275,8 @@ A little convenience script can be used to run the formatting, type checking and

### Docker

If you do not have root access pyodbc will not work if you use brew
Using the docker compose you can set up the full stack and it will also set the correct environment variables
If you do not have root access pyodbc will not work if you use brew.
Using the docker compose you can set up the full stack and it will also set the correct environment variables.

To build the containers:

Expand Down
32 changes: 32 additions & 0 deletions crawler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from http import HTTPStatus
import logging
import logging.config
import os

from flask import Flask
from flask_apscheduler import APScheduler

from crawler.constants import SCHEDULER_JOB_ID_RUN_CRAWLER

scheduler = APScheduler()


def create_app() -> Flask:
app = Flask(__name__)
app.config.from_object(os.environ["SETTINGS_MODULE"])

# setup logging
logging.config.dictConfig(app.config["LOGGING"])

if app.config.get("SCHEDULER_RUN", False):
scheduler.init_app(app)
scheduler.start()

@app.get("/health")
def health_check():
if scheduler.get_job(SCHEDULER_JOB_ID_RUN_CRAWLER):
return "Crawler is working", HTTPStatus.OK

return "Crawler is not working correctly", HTTPStatus.INTERNAL_SERVER_ERROR

return app
29 changes: 27 additions & 2 deletions crawler/config/defaults.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
# flake8: noqa
import os

from crawler.constants import SCHEDULER_JOB_ID_RUN_CRAWLER
from crawler.config.centres import *
from crawler.config.logging import *

# setting here will overwrite those in 'centre.py'
# setting here will overwrite those in 'centres.py'

# general details
DIR_DOWNLOADED_DATA = "data/sftp_files/"

DIR_DOWNLOADED_DATA = "data/sftp_files/"
ADD_LAB_ID = False

# ingest behaviour for scheduled runs
USE_SFTP = True
KEEP_FILES = False
ADD_TO_DART = True

# If we're running in a container, then instead of localhost
# we want host.docker.internal, you can specify this in the
# .env file you use for docker. eg
Expand Down Expand Up @@ -56,3 +62,22 @@
# slack details
SLACK_API_TOKEN = ""
SLACK_CHANNEL_ID = ""

###
# APScheduler config
###
SCHEDULER_RUN = True
SCHEDULER_TIMEZONE = (
"Europe/London" # We need to define timezone because current flask_apscheduler does not load from TZ env
)
SCHEDULER_API_ENABLED = False
JOBS = [
{
"id": SCHEDULER_JOB_ID_RUN_CRAWLER,
"func": "crawler.jobs.apscheduler:scheduled_run",
"trigger": "cron",
"day": "*",
"hour": "*",
"minute": "10/30",
}
]
5 changes: 5 additions & 0 deletions crawler/config/development.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

# setting here will overwrite those in 'defaults.py'

# ingest behaviour for scheduled runs
USE_SFTP = False
KEEP_FILES = True
ADD_TO_DART = False

# logging config
LOGGING["loggers"]["crawler"]["level"] = "DEBUG"
LOGGING["loggers"]["crawler"]["handlers"] = ["colored_stream_dev"]
6 changes: 6 additions & 0 deletions crawler/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
from decimal import Decimal
from typing import Final, Set, Tuple


###
# AP Scheduler Jobs
###
SCHEDULER_JOB_ID_RUN_CRAWLER: Final[str] = "run_crawler"

###
# mongo collections
###
Expand Down
23 changes: 23 additions & 0 deletions crawler/jobs/apscheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from flask import current_app as app
import logging

from crawler import scheduler
from crawler.helpers.general_helpers import get_config
from crawler.main import run


logger = logging.getLogger(__name__)


def scheduled_run():
"""Scheduler's job to do a run every 30 minutes."""
config, _ = get_config()
logging.config.dictConfig(config.LOGGING)

logger.info("Starting scheduled_run job.")

with scheduler.app.app_context():
use_sftp = app.config["USE_SFTP"]
keep_files = app.config["KEEP_FILES"]
add_to_dart = app.config["ADD_TO_DART"]
run(use_sftp, keep_files, add_to_dart)
2 changes: 1 addition & 1 deletion crawler/priority_samples_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def extract_sample_id(sample: SampleDoc) -> ModifiedRowValue:

priority_samples_collection = get_mongo_collection(db, COLLECTION_PRIORITY_SAMPLES)
for sample_id in sample_ids:
priority_samples_collection.update({FIELD_SAMPLE_ID: sample_id}, {"$set": {FIELD_PROCESSED: True}})
priority_samples_collection.update_one({FIELD_SAMPLE_ID: sample_id}, {"$set": {FIELD_PROCESSED: True}})

logger.info("Mongo update of processed for priority samples successful")

Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ services:
# we need to set the localhost and root password as they can be different
crawler:
build: .
ports:
- "8000:8000"
environment:
- LOCALHOST=host.docker.internal
- ROOT_PASSWORD=
Expand Down
2 changes: 1 addition & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ ignore_missing_imports = True
[mypy-numpy]
ignore_missing_imports = True

[mypy-schedule]
[mypy-flask_apscheduler]
ignore_missing_imports = True
37 changes: 1 addition & 36 deletions runner.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,13 @@
import argparse
import logging
import logging.config
import time

import schedule

from crawler import main
from crawler.config.centres import CENTRES

logger = logging.getLogger(__name__)

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Parse CSV files from the Lighthouse Labs and store the sample information in MongoDB"
)

parser.add_argument(
"--scheduled",
dest="once",
action="store_false",
help="start scheduled execution, defaults to running once",
)
parser.add_argument(
"--sftp",
dest="sftp",
Expand All @@ -46,32 +33,10 @@
help="process only this centre's plate map files",
)

parser.set_defaults(once=True)
parser.set_defaults(sftp=False)
parser.set_defaults(keep_files=False)
parser.set_defaults(add_to_dart=False)

args = parser.parse_args()

if args.once:
main.run(
sftp=args.sftp, keep_files=args.keep_files, add_to_dart=args.add_to_dart, centre_prefix=args.centre_prefix
)
else:
print("Scheduled to run every 30 minutes")

# If a run misses its scheduled time, it queues up. If more than one run is queued up, they execute sequentially
# i.e. no parallel processing happens
schedule.every(30).minutes.do(
main.run, sftp=args.sftp, keep_files=args.keep_files, add_to_dart=args.add_to_dart
)

while True:
try:
schedule.run_pending()
time.sleep(1)
except Exception as e:
logger.error("There was an exception while running the scheduler")
logger.exception(e)
# We wait 60 seconds so it wont try to check it again during the same minute
time.sleep(60)
main.run(sftp=args.sftp, keep_files=args.keep_files, add_to_dart=args.add_to_dart, centre_prefix=args.centre_prefix)
3 changes: 2 additions & 1 deletion tests/test_file_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,9 @@ def test_process_files_correctly_handles_files_not_to_be_processed(

# Import records were made indicating files were not processed
imports_collection = get_mongo_collection(mongo_database, COLLECTION_IMPORTS)
assert imports_collection.count_documents({}) == 3

imports = imports_collection.find()
assert imports.count() == 3
for imp in imports:
assert len(imp["errors"]) == 2
assert all("TYPE 34" in err for err in imp["errors"])
Expand Down

0 comments on commit 2b07fce

Please sign in to comment.