Skip to content

Commit

Permalink
Creation classe intermediaires Extractor & TransformerLoader - Implém…
Browse files Browse the repository at this point in the history
…entation de la classe Cantee
  • Loading branch information
qloridant committed Nov 22, 2024
1 parent 21b799a commit 42a9eba
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 49 deletions.
12 changes: 6 additions & 6 deletions macantine/etl/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def aggregate(df):
return df


class ANALYSIS(etl.ETL):
class ANALYSIS(etl.TRASNFORMER_LOADER):
"""
Create a dataset for analysis in a Data Warehouse
* Extract data from prod
Expand Down Expand Up @@ -300,7 +300,7 @@ def compute_miscellaneous_columns(self):
self.df["ratio_egalim_sans_bio"] = self.df.apply(get_ratio_egalim_sans_bio, axis=1)


class ETL_ANALYSIS_CANTEEN(ANALYSIS):
class ETL_ANALYSIS_CANTEEN(etl.CANTEENS, ANALYSIS):
"""
Create a dataset for analysis in a Data Warehouse
* Extract data from prod
Expand All @@ -313,10 +313,12 @@ class ETL_ANALYSIS_CANTEEN(ANALYSIS):
"""

def __init__(self):
self.df = None
super().__init__()

self.extracted_table_name = "canteens"
self.warehouse = DataWareHouse()
self.schema = json.load(open("data/schemas/schema_analysis_cantines.json"))

# The following mapper is used for renaming columns and for selecting the columns to extract from db
self.columns_mapper = {
"id": "id",
Expand All @@ -338,9 +340,7 @@ def __init__(self):
"line_ministry": "ministere_tutelle",
"sectors": "secteur",
}

def extract_dataset(self):
self.df = utils.fetch_canteens(self.columns_mapper.keys())
self.columns = self.columns_mapper.keys()

def transform_dataset(self):
logger.info("Filling geo names")
Expand Down
63 changes: 46 additions & 17 deletions macantine/etl/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@
import json
import logging
import os
import time
from abc import ABC, abstractmethod

import pandas as pd
import requests
from django.core.files.storage import default_storage

from data.department_choices import Department
from data.models import Teledeclaration
from data.models import Canteen, Teledeclaration
from data.region_choices import Region
from macantine.etl.utils import CAMPAIGN_DATES, filter_empty_values, format_geo_name
from macantine.etl.utils import (
CAMPAIGN_DATES,
common_members,
filter_empty_values,
format_geo_name,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -41,18 +47,6 @@ def fill_geo_names(self, prefix=""):
del self.df[f"{col_geo_zoom}_lib"]
self.df.insert(self.df.columns.get_loc(col_geo_zoom) + 1, f"{col_geo_zoom}_lib", col_to_insert)

@abstractmethod
def extract_dataset(self):
pass

@abstractmethod
def transform_dataset(self):
pass

@abstractmethod
def load_dataset(self):
pass

def get_schema(self):
return self.schema

Expand Down Expand Up @@ -93,9 +87,26 @@ def is_valid(self, filepath) -> bool:
return 1


class TELEDECLARATIONS(ETL):
class EXTRACTOR(ETL):
@abstractmethod
def extract_dataset(self):
pass


class TRASNFORMER_LOADER(ETL):
@abstractmethod
def transform_dataset(self):
pass

@abstractmethod
def load_dataset(self):
pass


class TELEDECLARATIONS(EXTRACTOR):
def __init__(self):
self.years = []
self.columns = []

def filter_aberrant_td(self):
"""
Expand Down Expand Up @@ -148,5 +159,23 @@ def extract_dataset(self) -> pd.DataFrame:
self.df = pd.DataFrame(columns=self.columns)


class CANTEENS(ETL):
pass
class CANTEENS(EXTRACTOR):
def __init__(self):
super().__init__()
self.exclude_filter = None
self.columns = []

def extract_dataset(self):
start = time.time()
canteens = Canteen.objects.all()
if self.exclude_filter:
canteens = Canteen.objects.exclude(self.exclude_filter)
if canteens.count() == 0:
self.df = pd.DataFrame(columns=self.columns)
else:
# Creating a dataframe with all canteens. The canteens can have multiple lines if they have multiple sectors
columns_model = [field.name for field in Canteen._meta.get_fields()]
columns_to_extract = common_members(self.columns, columns_model)
self.df = pd.DataFrame(canteens.values(*columns_to_extract))
end = time.time()
logger.info(f"Time spent on canteens extraction : {end - start}")
22 changes: 9 additions & 13 deletions macantine/etl/open_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
from data.models import Canteen
from macantine.etl import etl
from macantine.etl.etl import logger
from macantine.etl.utils import extract_sectors, fetch_canteens
from macantine.etl.utils import extract_sectors


class ETL_OPEN_DATA(etl.ETL):
class OPEN_DATA(etl.TRASNFORMER_LOADER):
"""
Abstract class implementing the specifity for open data export
"""
Expand Down Expand Up @@ -131,17 +131,20 @@ def load_dataset(self):
logger.error(f"Error saving validated data: {e}")


class ETL_OPEN_DATA_CANTEEN(ETL_OPEN_DATA):
class ETL_OPEN_DATA_CANTEEN(etl.CANTEENS, OPEN_DATA):
def __init__(self):
super().__init__()
self.dataset_name = "registre_cantines"
self.schema = json.load(open("data/schemas/schema_cantine.json"))
self.schema_url = (
"https://raw.githubusercontent.com/betagouv/ma-cantine/staging/data/schemas/schema_cantine.json"
)
self.columns = [field["name"] for field in self.schema["fields"]]
self.canteens = None
self.exclude_filter = Q(sectors__id=22) # Filtering out the police / army sectors
self.exclude_filter |= Q(deletion_date__isnull=False) # Filtering out the deleted canteens

def extract_dataset(self):
def transform_dataset(self):
all_canteens_col = [i["name"] for i in self.schema["fields"]]
self.canteens_col_from_db = all_canteens_col
for col_processed in [
Expand All @@ -156,14 +159,6 @@ def extract_dataset(self):
]:
self.canteens_col_from_db.remove(col_processed)

start = time.time()
exclude_filter = Q(sectors__id=22) # Filtering out the police / army sectors
exclude_filter |= Q(deletion_date__isnull=False) # Filtering out the deleted canteens
self.df = fetch_canteens(self.canteens_col_from_db, exclude_filter)
end = time.time()
logger.info(f"Time spent on canteens extraction : {end - start}")

def transform_dataset(self):
# Adding the active_on_ma_cantine column
start = time.time()
non_active_canteens = Canteen.objects.filter(managers=None).values_list("id", flat=True)
Expand Down Expand Up @@ -202,10 +197,11 @@ def transform_dataset(self):
logger.info(f"Time spent on campaign participations : {end - start}")


class ETL_OPEN_DATA_TELEDECLARATIONS(etl.TELEDECLARATIONS, ETL_OPEN_DATA):
class ETL_OPEN_DATA_TELEDECLARATIONS(etl.TELEDECLARATIONS, OPEN_DATA):
def __init__(self, year: int):
super().__init__()
self.years = [year]
self.year = year
self.dataset_name = f"campagne_td_{year}"
self.schema = json.load(open("data/schemas/schema_teledeclaration.json"))
self.schema_url = (
Expand Down
18 changes: 5 additions & 13 deletions macantine/etl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import requests

from api.serializers import SectorSerializer
from data.models import Canteen, Sector, Teledeclaration
from data.models import Sector, Teledeclaration

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -37,6 +37,10 @@
SECTEURS_SPE = [26, 24, 23, 22, 4, 2]


def common_members(a, b):
return set(a) & set(b)


def get_ratio(row, valueKey, totalKey):
tdTotalKey = f"teledeclaration.{totalKey}"
tdValueKey = f"teledeclaration.{valueKey}"
Expand Down Expand Up @@ -231,18 +235,6 @@ def filter_empty_values(df: pd.DataFrame, col_name) -> pd.DataFrame:
return df.dropna(subset=col_name)


def fetch_canteens(columns, exclude_filter=None):
canteens = Canteen.objects.all()
if exclude_filter:
canteens = Canteen.objects.exclude(exclude_filter)
if canteens.count() == 0:
df = pd.DataFrame(columns=columns)
else:
# Creating a dataframe with all canteens. The canteens can have multiple lines if they have multiple sectors
df = pd.DataFrame(canteens.values(*columns))
return df


def fetch_commune_detail(code_insee_commune, commune_details, geo_detail_type):
"""
Provide EPCI code/ Department code/ Region code for a city, given the insee code of the city
Expand Down

0 comments on commit 42a9eba

Please sign in to comment.