Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add refashion dag #11

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 301 additions & 0 deletions dags/refashion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
import json
from datetime import datetime

import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

from utils.api_utils import fetch_dataset_from_point_apport
from utils.mapping_utils import transform_acteur_type_id, generate_unique_id
from utils.utils import transform_ecoorganisme, transform_location, extract_details

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 3, 23),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
}

dag = DAG(
'refashion',
default_args=default_args,
description='A pipeline to fetch, process, and load to validate data into postgresql for Refashion dataset',
schedule_interval=None,
)


def fetch_data_from_api(**kwargs):
dataset = kwargs["dataset"]
api_url = f"https://data.pointsapport.ademe.fr/data-fair/api/v1/datasets/{dataset}/lines?size=10000"
data = fetch_dataset_from_point_apport(api_url)
df = pd.DataFrame(data)
return df

def create_proposition_services(**kwargs):
df = kwargs['ti'].xcom_pull(task_ids='create_actors')['df']
data_dict = kwargs["ti"].xcom_pull(task_ids="load_data_from_postgresql")
idx_max = data_dict['max_pds_idx']

rows_list = []

for index, row in df.iterrows():
acteur_id = row['identifiant_unique']
sous_categories = row['produitsdechets_acceptes']
if row['point_dapport_de_service_reparation']:
acteur_service_id = 17
action_id = 1
elif row['point_dapport_pour_reemploi']:
acteur_service_id = 4
action_id = 4
elif row['point_de_reparation']:
acteur_service_id = 15
action_id = 1
elif row['point_de_collecte_ou_de_reprise_des_dechets']:
acteur_service_id = 4
action_id = 11
else:
continue

rows_list.append({"acteur_service_id": acteur_service_id, "action_id": action_id, "acteur_id": acteur_id,
"sous_categories": sous_categories})

df_pds = pd.DataFrame(rows_list)
df_pds.index = range(idx_max, idx_max + len(df_pds))
df_pds['id'] = df_pds.index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cette solution me parait bancale, si il y a une seule création de propositionservice dans l'interface ce dags a planter

Pas sur qu'on ai besoin de créer de colonne id car c'est un auto incrément.

A tester

return df_pds


def create_proposition_services_sous_categories(**kwargs):
df = kwargs['ti'].xcom_pull(task_ids='create_proposition_services')

rows_list = []
sous_categories = {
"Vêtement": 107,
"Linge": 104,
"Chaussure": 109
Comment on lines +75 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note pour plus tard, on pourrait extraire ces mapping dans un fichier de configuration

Comment on lines +75 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attention aux id écrit en dur, rien ne garanti que les id sont les mêmesentre les environnements.

ici c'est le cas car on copie fréquemment la prod vers la preprod mais il est préférable de se baser sur un "code/nom"

Il y a peut-être une rationnalisation de la DB à faire ici : à discuter ensemble

}
for index, row in df.iterrows():
products = str(row["sous_categories"]).split("|")
for product in products:
if product.strip() in sous_categories:
rows_list.append({
'propositionservice_id': row['id'],
'souscategorieobjet_id': sous_categories[product.strip()]
})

df_sous_categories = pd.DataFrame(rows_list, columns=['propositionservice_id', 'souscategorieobjet_id'])
return df_sous_categories


def serialize_to_json(**kwargs):
df_actors = kwargs["ti"].xcom_pull(task_ids="create_actors")['df']
df_pds = kwargs["ti"].xcom_pull(task_ids="create_proposition_services")
df_pdsc = kwargs["ti"].xcom_pull(task_ids="create_proposition_services_sous_categories")
aggregated_pdsc = df_pdsc.groupby('propositionservice_id').apply(
lambda x: x.to_dict('records')).reset_index(name='pds_sous_categories')

df_pds_joined = pd.merge(df_pds, aggregated_pdsc, how='left', left_on='id', right_on='propositionservice_id')
df_pds_joined.drop('id', axis=1, inplace=True)
aggregated_pds = df_pds_joined.groupby('acteur_id').apply(lambda x: x.to_dict('records')).reset_index(
name='proposition_services')

df_joined = pd.merge(df_actors, aggregated_pds, how='left', left_on='identifiant_unique', right_on='acteur_id')

df_joined.drop('acteur_id', axis=1, inplace=True)
df_joined = df_joined.where(pd.notnull(df_joined), None)
df_joined['row_updates'] = df_joined[[
"identifiant_unique",
"nom",
"adresse",
"adresse_complement",
"code_postal",
"ville",
"url",
"email",
"location",
"telephone",
"nom_commercial",
"label_reparacteur",
"siret",
"identifiant_externe",
"acteur_type_id",
"statut",
"source_id",
"cree_le",
"modifie_le",
"commentaires",
"horaires",
"proposition_services"
]].apply(lambda row: json.dumps(row.to_dict()), axis=1)

return df_joined


def load_data_from_postgresql(**kwargs):
pg_hook = PostgresHook(postgres_conn_id="lvao-preprod")
engine = pg_hook.get_sqlalchemy_engine()

df_acteurtype = pd.read_sql_table('qfdmo_acteurtype', engine)
df_sources = pd.read_sql_table('qfdmo_source', engine)
df_ps = pd.read_sql_table('qfdmo_propositionservice', engine)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_sql_table semble charger toute la table dans une dataframe
charger toute la table pour trouver l'id max semble un peu overkill ?


return {"acteurtype": df_acteurtype, "sources": df_sources, "max_pds_idx": df_ps['id'].max()}


def write_to_dagruns(**kwargs):
dag_id = kwargs['dag'].dag_id
run_id = kwargs['run_id']
df = kwargs["ti"].xcom_pull(task_ids="serialize_actors_to_records")
metadata = kwargs["ti"].xcom_pull(task_ids="create_actors")['metadata']
pg_hook = PostgresHook(postgres_conn_id="lvao-preprod")
engine = pg_hook.get_sqlalchemy_engine()
current_date = datetime.now()
with engine.connect() as conn:
result = conn.execute("""
INSERT INTO qfdmo_dagrun (dag_id, run_id, status, meta_data, created_date, updated_date)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING ID;
""", (
dag_id,
run_id,
'TO_VALIDATE',
json.dumps(metadata),
current_date,
current_date
))
dag_run_id = result.fetchone()[0]

df['change_type'] = 'CREATE'
df['dag_run_id'] = dag_run_id
df[['row_updates','dag_run_id','change_type']].to_sql(
"qfdmo_dagrunchange",
engine,
if_exists="append",
index=False,
method="multi",
chunksize=1000,
)



def create_actors(**kwargs):
data_dict = kwargs["ti"].xcom_pull(task_ids="load_data_from_postgresql")
df = kwargs["ti"].xcom_pull(task_ids="fetch_data_from_api")
df_sources = data_dict['sources']
df_acteurtype = data_dict['acteurtype']


column_mapping = {
'id_point_apport_ou_reparation': 'identifiant_externe',
'adresse_complement': 'adresse_complement',
'type_de_point_de_collecte': 'acteur_type_id',
'telephone': 'telephone',
'siret': 'siret',
'uniquement_sur_rdv': '',
'exclusivite_de_reprisereparation': '',
'filiere': '',
'public_accueilli': '',
'produitsdechets_acceptes': '',
'labels_etou_bonus': '',
'reprise': '',
'point_de_reparation': '',
'ecoorganisme': 'source_id',
'adresse_format_ban': 'adresse',
'nom_de_lorganisme': 'nom',
'enseigne_commerciale': 'nom_commercial',
'_updatedAt': 'cree_le',
'site_web': 'url',
'email': 'email',
'perimetre_dintervention': '',
'longitudewgs84': 'location',
'latitudewgs84': 'location',
'horaires_douverture': 'horaires',
'consignes_dacces': 'commentaires',
}

selected_columns = ['nom', 'adresse', 'type_de_point_de_collecte', 'id_point_apport_ou_reparation']

for old_col, new_col in column_mapping.items():
if new_col:
if old_col == 'type_de_point_de_collecte':
df[new_col] = df[old_col].apply(lambda x: transform_acteur_type_id(x, df_acteurtype=df_acteurtype))
elif old_col in ('longitudewgs84', 'latitudewgs84'):
df['location'] = df.apply(lambda row: transform_location(row['longitudewgs84'], row['latitudewgs84']),
axis=1)
elif old_col == 'ecoorganisme':
df[new_col] = df[old_col].apply(lambda x: transform_ecoorganisme(x, df_sources=df_sources))
elif old_col == 'adresse_format_ban':
df[['adresse', 'code_postal', 'ville']] = df.apply(extract_details, axis=1)
else:
df[new_col] = df[old_col]
df['label_reparacteur'] = False
df['identifiant_unique'] = df.apply(lambda x: generate_unique_id(x, selected_columns=selected_columns), axis=1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Je propose que pour ne pas être dépendant des colonnes qui constitue cet id, on utiise comme identifiant :

SOURCE_IDEXTERNE(_d si c'est du digital)

A voir si ce format est suffisant pour obtenir des ID uniques sur l'ensemble du fichier

df['statut'] = 'ACTIF'
df['modifie_le'] = df['cree_le']
df['siret'] = df['siret'].astype(str).apply(lambda x: x[:14])
df['telephone'] = df['telephone'].dropna().apply(lambda x: x.replace(' ', ''))
df['telephone'] = df['telephone'].dropna().apply(lambda x: '0' + x[2:] if x.startswith('33') else x)

duplicates_mask = df.duplicated('identifiant_unique', keep=False)
duplicate_ids = df.loc[duplicates_mask, 'identifiant_unique'].unique()

number_of_duplicates = len(duplicate_ids)

df.drop_duplicates('identifiant_unique', keep='first', inplace=True)

metadata = {
'number_of_duplicates': number_of_duplicates,
'duplicate_ids': list(duplicate_ids),
'added_rows': len(df)
}

return {'df': df, 'metadata': metadata}


fetch_data_task = PythonOperator(
task_id='fetch_data_from_api',
python_callable=fetch_data_from_api,
op_kwargs={"dataset": "donnees-eo-refashion"},
dag=dag,
)

load_data_task = PythonOperator(
task_id='load_data_from_postgresql',
python_callable=load_data_from_postgresql,
dag=dag,
)

create_actors_task = PythonOperator(
task_id='create_actors',
python_callable=create_actors,
dag=dag,
)

create_proposition_services_task = PythonOperator(
task_id='create_proposition_services',
python_callable=create_proposition_services,
dag=dag,
)

create_proposition_services_sous_categories_task = PythonOperator(
task_id='create_proposition_services_sous_categories',
python_callable=create_proposition_services_sous_categories,
dag=dag,
)

write_data_task = PythonOperator(
task_id='write_data_to_validate_into_dagruns',
python_callable=write_to_dagruns,
dag=dag,
)

serialize_to_json_task = PythonOperator(task_id='serialize_actors_to_records', python_callable=serialize_to_json,
dag=dag)

([fetch_data_task,load_data_task] >>
create_actors_task >>
create_proposition_services_task >>
create_proposition_services_sous_categories_task >> serialize_to_json_task >> write_data_task)
16 changes: 16 additions & 0 deletions dags/utils/api_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import requests
def fetch_dataset_from_point_apport(url):
all_data = []
while url:
response = requests.get(url)
if response.status_code == 200:
data = response.json()
all_data.extend(data['results'])
url = data.get('next', None)
print(url)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On peut supprimer ce print

else:
print(f"Failed to fetch data: {response.status_code}")
break
return all_data


20 changes: 20 additions & 0 deletions dags/utils/mapping_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import hashlib

def transform_acteur_type_id(value, df_acteurtype):
mapping_dict = {
"Solution en ligne (site web, app. mobile)": "en ligne (web, mobile)",
"Artisan, commerce indépendant": "artisan, commerce indépendant",
"Magasin / Franchise, Enseigne commerciale / Distributeur / Point de vente": "commerce",
"Point d'Apport Volontaire Publique": "point d'apport volontaire public",
"Association, entreprise de l’économie sociale et solidaire (ESS)": "Association, entreprise de l'ESS",
"Déchèterie": "déchèterie",
}
nom_affiche = mapping_dict.get(value)
id_value = df_acteurtype.loc[df_acteurtype['nom_affiche'] == nom_affiche, 'id'].values[0] if any(df_acteurtype['nom_affiche'] == nom_affiche) else None
return id_value


def generate_unique_id(row, selected_columns):
unique_str = '_'.join([str(row[col]) for col in selected_columns])
return str(row['nom'].replace(' ','_').lower())+'_'+hashlib.sha256(unique_str.encode()).hexdigest()

47 changes: 46 additions & 1 deletion dags/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,57 @@
import io
import csv
from urllib.parse import urlparse

from shapely.geometry import Point
from shapely import wkb
import requests
import pandas as pd
import re


def transform_location(longitude, latitude):
point = Point(longitude, latitude)

transformed_location_binary = wkb.dumps(point)
transformed_location_hex = transformed_location_binary.hex()

return transformed_location_hex


def transform_ecoorganisme(value, df_sources):
id_value = df_sources.loc[df_sources['nom'].str.lower() == value.lower(), 'id'].values[0] if any(
df_sources['nom'].str.lower() == value.lower()) else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

est-ce qu'on a pas intérêt à lever une exception et/ou ignorer la ligne si l'éco-organisme n'est pas retrouvé dans la liste des sources ?

return id_value


def extract_details(row):
pattern = re.compile(r'\b(\d{5})\s+(.*)')

address = None
postal_code = None
city = None
if pd.isnull(row['adresse_format_ban']):
return pd.Series([None, None, None])

# Ensure adress_ban is treated as a string
adress_ban = str(row['adresse_format_ban'])

# Search for the pattern
match = pattern.search(adress_ban)
if match:
postal_code = match.group(1)
city = match.group(2)
address = adress_ban[:match.start()].strip()

return pd.Series([address, postal_code, city])


def transform_location(longitude, latitude):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Toutes les fonctions de ransformation pourrait être testé unitairement

point = Point(longitude, latitude)

transformed_location_binary = wkb.dumps(point)
transformed_location_hex = transformed_location_binary.hex()

return transformed_location_hex
def send_batch_to_api(batch):
"""
Send a batch of CSV lines to the geocoding API and return the response.
Expand Down
Loading