Skip to content

Commit

Permalink
Add upload_fluxiae_to_pilotage command and cron schedule
Browse files Browse the repository at this point in the history
Use a cronjob to share imported data with the pilotage via an S3 bucket.
  • Loading branch information
calummackervoy authored and rsebille committed Dec 23, 2024
1 parent 945d09e commit 58c5edf
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 0 deletions.
1 change: 1 addition & 0 deletions clevercloud/cron.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"0 0 * * 1 $ROOT/clevercloud/run_management_command.sh shorten_active_sessions",
"0 2 * * 1 $ROOT/clevercloud/crons/populate_metabase_matomo.sh",
"0 12 * * 1 $ROOT/clevercloud/run_management_command.sh import_ea_eatt --from-asp --wet-run",
"0 12 * * 1 $ROOT/clevercloud/run_management_command.sh upload_data_to_pilotage asp_riae_shared_bucket/",

"0 0 1 * * $ROOT/clevercloud/run_management_command.sh delete_old_emails --wet-run",
"0 0 1 * * $ROOT/clevercloud/run_management_command.sh sync_cities --wet-run",
Expand Down
7 changes: 7 additions & 0 deletions config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,13 @@
AWS_S3_ACCESS_KEY_ID = os.getenv("CELLAR_ADDON_KEY_ID")
AWS_S3_SECRET_ACCESS_KEY = os.getenv("CELLAR_ADDON_KEY_SECRET")
AWS_STORAGE_BUCKET_NAME = os.getenv("S3_STORAGE_BUCKET_NAME")

# S3 store for communicating with the Pilotage.
PILOTAGE_DATASTORE_S3_ENDPOINT_URL = os.getenv("PILOTAGE_DATASTORE_S3_ENDPOINT_URL")
PILOTAGE_DATASTORE_S3_ACCESS_KEY = os.getenv("PILOTAGE_DATASTORE_S3_ACCESS_KEY")
PILOTAGE_DATASTORE_S3_SECRET_KEY = os.getenv("PILOTAGE_DATASTORE_S3_SECRET_KEY")
PILOTAGE_DATASTORE_S3_BUCKET_NAME = os.getenv("PILOTAGE_DATASTORE_S3_BUCKET_NAME")

# The maximum amount of memory (in bytes) a file can take up before being rolled over into a temporary file on disk.
# Picked 5 MB, the max size for a resume. Keep it fast for files under that size, and avoid filling up the RAM.
AWS_S3_MAX_MEMORY_SIZE = 5 * 1024 * 1024
Expand Down
5 changes: 5 additions & 0 deletions config/settings/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
AWS_S3_SECRET_ACCESS_KEY = "minioadmin"
AWS_STORAGE_BUCKET_NAME = "tests"

PILOTAGE_DATASTORE_S3_ENDPOINT_URL = AWS_S3_ENDPOINT_URL
PILOTAGE_DATASTORE_S3_ACCESS_KEY = AWS_S3_ACCESS_KEY_ID
PILOTAGE_DATASTORE_S3_SECRET_KEY = AWS_S3_SECRET_ACCESS_KEY
PILOTAGE_DATASTORE_S3_BUCKET_NAME = AWS_STORAGE_BUCKET_NAME

API_DATADOG_API_KEY = "abcde"
API_DATADOG_APPLICATION_KEY = "fghij"

Expand Down
78 changes: 78 additions & 0 deletions itou/metabase/management/commands/upload_data_to_pilotage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
The FluxIAE file contains data used by les emplois and is uploaded to us directly by a supporting organization.
The same file is also parsed by the Pilotage, shared via an S3 bucket.
This command uploads the file from where it has been stored to the S3 bucket for sharing.
"""

import pathlib
import threading
from pathlib import Path

from django.conf import settings
from django.template.defaultfilters import filesizeformat

from itou.utils.command import BaseCommand
from itou.utils.storage.s3 import pilotage_s3_client


class Command(BaseCommand):
help = "Upload FluxIAE to S3 for sharing."

FILENAME_PREFIX = "fluxIAE_ITOU_"
DATASTORE_DIRECTORY = "flux-iae/"

def add_arguments(self, parser):
parser.add_argument("directory", type=Path, help="Directory containing FluxIAE files")
parser.add_argument("--wet-run", dest="wet_run", action="store_true")

def _upload_file(self, file: pathlib.Path, *, wet_run=False):
lock = threading.Lock()
file_size = file.stat().st_size
bytes_transferred = 0
previous_progress = 0

def log_progress(chunk_size):
"""Logs to console or logs the progress of byte transfer"""
nonlocal bytes_transferred
nonlocal previous_progress

with lock:
bytes_transferred += chunk_size
progress = int((bytes_transferred / file_size) * 100)
if progress > previous_progress and progress % 5 == 0:
self.stdout.write(
f"> {file.name}: {filesizeformat(bytes_transferred)}/{filesizeformat(file_size)} transferred ({progress}%)." # noqa: E501
)
previous_progress = progress

if wet_run:
pilotage_s3_client().upload_file(
Filename=file.absolute(),
Bucket=settings.PILOTAGE_DATASTORE_S3_BUCKET_NAME,
Key=f"{self.DATASTORE_DIRECTORY}{file.name}",
Callback=log_progress,
)

def handle(self, *, directory: pathlib.Path, wet_run, **options):
client = pilotage_s3_client()
response = client.list_objects_v2(
Bucket=settings.PILOTAGE_DATASTORE_S3_BUCKET_NAME,
Prefix=self.DATASTORE_DIRECTORY,
)
datastore_files = set()
if response["KeyCount"]:
datastore_files.update(
metadata["Key"].replace(self.DATASTORE_DIRECTORY, "") for metadata in response["Contents"]
)
self.stdout.write(f"Files in datastore's {self.DATASTORE_DIRECTORY!r}: {sorted(datastore_files)}")

local_files = set(file.name for file in directory.glob(f"{self.FILENAME_PREFIX}*.tar.gz"))
self.stdout.write(f"Files in local's {directory.name!r}: {sorted(local_files)}")

files_to_upload = local_files - datastore_files
self.stdout.write(f"Files to upload: {sorted(files_to_upload)}")

for filename in files_to_upload:
self.stdout.write(f"Uploading {filename!r}...")
self._upload_file(directory / filename, wet_run=wet_run)
10 changes: 10 additions & 0 deletions itou/utils/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ def s3_client():
)


def pilotage_s3_client():
"""There is an S3 bucket dedicated to sharing files with Pilotage"""
return boto3.client(
"s3",
endpoint_url=settings.PILOTAGE_DATASTORE_S3_ENDPOINT_URL,
aws_access_key_id=settings.PILOTAGE_DATASTORE_S3_ACCESS_KEY,
aws_secret_access_key=settings.PILOTAGE_DATASTORE_S3_SECRET_KEY,
)


class PublicStorage(S3Boto3Storage):
# Not using the S3StaticStorage backend to ensure the listdir() operation remains forbidden.
# Don’t sign URLs, objects are public.
Expand Down

0 comments on commit 58c5edf

Please sign in to comment.