Dynamisez vos Pipelines de données avec Mage.ai
Mage.ai est une solution apparue en 2021, initialement prévue pour être un outil orienté vers le MLOps. Mage a pivoté son modèle pour devenir un orchestrateur ainsi qu’un outil ETL open source, versatile et aisément déployable. L’interface utilisateur de grande qualité ainsi que la création de pipelines font de cet outil un indispensable à utiliser au quotidien.
Exemple de Pipeline
Présentation du Pipeline :
Notre objectif dans cet exemple est d’utiliser Mage.ai pour créer un pipeline simple. Ce pipeline aura pour but de récupérer, au sein des fichiers Siren, tous les établissements actifs correspondant à un type d’établissement spécifique pour un département. Dans notre exemple, les établissements spécifiques sont caractérisés par le code Naf 56.60Z qui correspond aux débits de boissons. Le département choisi sera le 60.
De plus, nous aimerions que ces établissements soient correctement géolocalisés au format EPSG:4326 cependant les données disponibles sont au format Lambert (EPSG:2154).Il nous faudra donc convertir les géolocalisations existantes du format Lambert au nouveau format. Toutes les données de géolocalisation n’étant pas présentent il nous faudra ensuite récupérer les données manquante en utilisant l’API BANO.
Une fois nos données complétées, nous les sauvegarderons au format Parquet avant de les transférer dans S3 pour une exploitation ultérieure.
Création du Pipeline:
Dans notre exemple, nous allons récupérer la liste des débits de boissons de l’oise. Cependant, les données du Siren peuvent contenir des doublons, il est donc nécessaire de récupérer chaque mois un fichier nommé Siren Doublon (celui-ci permettant d’identifier les doublons).
Notre flux aura donc deux blocs d’entrées, ces deux blocs alimenteront un premier bloc de transformation qui aura pour but l’élimination des doublons potentiels. À tout moment, le pipeline est visualisable sous la forme d’un arbre modifiable dynamiquement (les liaisons entre les blocs sont éditables via l’interface graphique). Voici, dans notre cas, l’arbre du pipeline final.
Arbre représentant notre Pipeline
Pour réaliser un pipeline avec Mage, rien de plus simple. Il faut tout d’abord se connecter à Mage puis, dans le menu latéral, chercher l’onglet « Pipeline » et cliquer sur « New ».
Création d’un pipeline
Plusieurs types de pipelines prédéfinis sont proposés. Nous allons ici réaliser un pipeline batch standard. Une fois cette étape terminée, le pipeline est créé et nommé automatiquement. Ce nom attribué automatiquement peut être changé dans le menu « Edit/Pipeline Settings ».
Il est maintenant temps de créer le premier bloc du pipeline. Ce premier bloc est un data loader Python, nous sélectionnons donc un bloc de type generic. Il faut maintenant nommer ce bloc afin de le créer.
Création d’un bloc loader
@data_loader
def load_data(*args, **kwargs):
« » »
Template code for loading data from any source.
Returns:
Anything (e.g. data frame, dictionary, array, int, str, etc.)
« » »
# Specify your data loading logic here
return {}
Création du bloc load_siren:
Le code obtenu est le code par défaut pour créer un loader. Nous pourrons ensuite insérer dans ce loader notre code Python, qui téléchargera le fichier .ZIP contenant les informations du SIREN en utilisant l’URL permanente du fichier SIREN. Cette URL peut être paramétrée via l’onglet Variable qui se trouve dans le menu latéral droit lorsque nous sommes en mode édition du pipeline. Une fois paramétrée, une variable peut être récupérée dans n’importe quel bloc du pipeline.
Gestion des variables dans Mage.ai
Pour que les logs soient visibles dans Mage.ai, il faut utiliser kwarg_logger = kwargs.get(« logger »).
Les variables globales dans Mage.ai peuvent être définies dans le menu latéral lors de la définition du pipeline. Ces variables globales peuvent ensuite être appelées partout dans tous les blocs du pipeline.
De même, les mots de passe peuvent être stockés dans Mage.ai afin d’y accéder dans le code de n’importe quel pipeline.
import os
import pandas as pd
import requests
import zipfile
if ‘data_loader’ not in globals():
from mage_ai.data_preparation.decorators import data_loader
if ‘test’ not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data(*args, **kwargs):
kwarg_logger = kwargs.get(« logger »)
kwarg_logger.info(« loader in »)
url = str(kwargs[« URL_SIREN »])
zip_path = « StockEtablissement_utf8.zip »
csv_path = « StockEtablissement_utf8.csv »
activite_value = ‘56.30Z’
etat_value = « A »
code_commune_prefix = str(kwargs[« CODE_DEP »])
chunksize = 20000
columns_to_drop = [
« nic »,
« statutDiffusionEtablissement »,
« trancheEffectifsEtablissement »,
« anneeEffectifsEtablissement »,
« activitePrincipaleRegistreMetiersEtablissement »,
« libelleCommuneEtrangerEtablissement »,
« distributionSpecialeEtablissement »,
« codeCedexEtablissement »,
« libelleCedexEtablissement »,
« codePaysEtrangerEtablissement »,
« libellePaysEtrangerEtablissement »,
« identifiantAdresseEtablissement »,
« complementAdresse2Etablissement »,
« numeroVoie2Etablissement »,
« indiceRepetition2Etablissement »,
« typeVoie2Etablissement »,
« libelleVoie2Etablissement »,
« codePostal2Etablissement »,
« libelleCommune2Etablissement »,
« libelleCommuneEtranger2Etablissement »,
« distributionSpeciale2Etablissement »,
« codeCommune2Etablissement »,
« codeCedex2Etablissement »,
« libelleCedex2Etablissement »,
« codePaysEtranger2Etablissement »,
« libellePaysEtranger2Etablissement »,
]
dtype = {
« activitePrincipaleEtablissement »: str,
« etatAdministratifEtablissement »: str,
« codeCommuneEtablissement »: str,
}
try:
response = requests.get(url)
with open(zip_path, « wb ») as file:
file.write(response.content)
kwarg_logger.info(« Téléchargement effectué »)
with zipfile.ZipFile(zip_path, « r ») as zip_ref:
zip_ref.extractall()
kwarg_logger.info(« Le fichier est unzip »)
filtered_df = pd.DataFrame()
for chunk in pd.read_csv(
csv_path,
chunksize=chunksize,
dtype=dtype,
low_memory=False,
encoding=« utf-8 »,
):
chunk.drop(columns=columns_to_drop, inplace=True)
filtered_chunk = chunk[
(chunk[« activitePrincipaleEtablissement »] == activite_value)
& (chunk[« etatAdministratifEtablissement »] == etat_value)
& (
chunk[« codeCommuneEtablissement »].str.startswith(
code_commune_prefix
)
)
& (chunk[« codePostalEtablissement »] != « [ND] »)
]
kwarg_logger.info(filtered_chunk)
filtered_df = pd.concat([filtered_df, filtered_chunk])
kwarg_logger.info(filtered_df)
except Exception as e:
kwarg_logger.error(f« Une erreur s’est produite : {e} »)
raise
finally:
if os.path.exists(zip_path):
os.remove(zip_path)
if os.path.exists(csv_path):
os.remove(csv_path)
return filtered_df
@test
def test_output(output, *args) -> None:
assert len(output.index)>=1
Une fois les données récupérées, elles sont transmises, dans notre exemple, sous forme de DataFrame d’un bloc à l’autre. Il est également à noter que les blocs sont réutilisables, c’est-à-dire qu’un simple glisser-déposer permet de réutiliser un bloc déjà créé et utilisé dans un autre pipeline.
On peut également noter que Mage.ai offre la possibilité de tester directement si les données respectent certaines contraintes. Par exemple, ici nous allons vérifier que notre DataFrame contient au moins une ligne.
Création du bloc load_siren_doublon:
La création de ce bloc est identique à la méthode précédente.
import requests
import zipfile
import os
import pandas as pd
if ‘data_loader’ not in globals():
from mage_ai.data_preparation.decorators import data_loader
if ‘test’ not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data(*args, **kwargs):
kwarg_logger = kwargs.get(‘logger’)
kwarg_logger.info(« load siren doublon in »)
zip_path = ‘StockDoublons_utf8.zip’
csv_path = ‘StockDoublons_utf8.csv’
try:
response = requests.get(kwargs[« URL_DOUBLON »])
with open(zip_path, ‘wb’) as file:
file.write(response.content)
with zipfile.ZipFile(zip_path, ‘r’) as zip_ref:
zip_ref.extractall()
kwarg_logger.info(« Le fichier est unzip »)
df = pd.read_csv(csv_path, dtype=str)
except Exception as e:
kwarg_logger.error(f« Une erreur s’est produite : {e} »)
finally:
if os.path.exists(zip_path):
os.remove(zip_path)
if os.path.exists(csv_path):
os.remove(csv_path)
kwarg_logger.info(« load siren doublon out »)
return df
@test
def test_output(output, *args) -> None:
assert output is not None, ‘The output is undefined’
Une fois que les deux blocs d’entrée sont prêts, nous pourrons passer à l’étape suivante, c’est-à-dire la création d’un bloc de transformation (transformer). Ce bloc aura comme parents les deux blocs précédents.
Création du bloc delete_doublon
Par défaut, lors de la création d’un bloc, le bloc précédent est le parent attitré. Ici, notre premier bloc de transformation reçoit donc deux dataframes et renvoie en sortie une dataframe dédoublonnée. Il faut utiliser l’interface graphique pour définir le second parent.
Définir les parents d’un bloc
if ‘transformer’ not in globals():
from mage_ai.data_preparation.decorators import transformer
if ‘test’ not in globals():
from mage_ai.data_preparation.decorators import test
@transformer
def transform(filtered_df,df,*args, **kwargs):
output = filtered_df[~filtered_df[‘siren’].isin(df[‘ sirenDoublon’])]
return output
@test
def test_output(output, *args) -> None:
assert output is not None, ‘The output is undefined’
À la fin de l’exécution de chaque bloc, Mage.ai nous propose de visualiser partiellement la DataFrame résultante afin d’effectuer un contrôle visuel sur ce qui se passe.
Exemple de visualisation à la fin du dédoublonnage
Création du bloc convert_lambert
Une fois la DataFrame dédoublonnée, la transformation suivante va itérer sur les colonnes de coordonnées au format Lambert afin de les convertir en coordonnées au format EPSG:4326.
import pandas as pd
from pyproj import Transformer
if « transformer » not in globals():
from mage_ai.data_preparation.decorators import transformer
if « test » not in globals():
from mage_ai.data_preparation.decorators import test
def convert_lambert_to_gps(df, abscisse_col, ordonnee_col, kwarg_logger):
transformer = Transformer.from_crs(« EPSG:2154 », « EPSG:4326 », always_xy=True)
df[abscisse_col] = pd.to_numeric(df[abscisse_col], errors=« coerce »)
df[ordonnee_col] = pd.to_numeric(df[ordonnee_col], errors=« coerce »)
def convert(row):
if pd.notna(row[abscisse_col]) and pd.notna(row[ordonnee_col]):
x = row[abscisse_col]
y = row[ordonnee_col]
if 500000 <= x <= 1300000 and 6000000 <= y <= 7200000:
lon, lat = transformer.transform(x, y)
return pd.Series([lat, lon])
else:
kwarg_logger.warning(
f« Coordonnées hors des plages attendues : X={x}, Y={y} »
)
return pd.Series([None, None])
else:
return pd.Series([None, None])
df[[« latitude », « longitude »]] = df.apply(convert, axis=1)
df.drop(columns=[abscisse_col, ordonnee_col], inplace=True)
return df
@transformer
def transform(data, *args, **kwargs):
kwarg_logger = kwargs.get(« logger »)
kwarg_logger.info(« convert_lambert in »)
convert_lambert_to_gps(
data,
« coordonneeLambertAbscisseEtablissement »,
« coordonneeLambertOrdonneeEtablissement »,
kwarg_logger,
)
kwarg_logger.info(« convert_lambert out »)
return data
@test
def test_output(output, *args) -> None:
assert output is not None, « The output is undefined »
Création du bloc geoloc_bano
Enfin, nous arrivons au dernier bloc de transformation où nous allons combler les données de géolocalisation manquantes en appelant l’API de géolocalisation BANO à l’aide des adresses contenues dans la DataFrame.
import pandas as pd
import time
import requests
if « transformer » not in globals():
from mage_ai.data_preparation.decorators import transformer
if « test » not in globals():
from mage_ai.data_preparation.decorators import test
def get_gps_coordinates(address, kwarg_logger):
base_url = « https://api-adresse.data.gouv.fr/search/ »
params = {« q »: address, « limit »: 1}
response = requests.get(base_url, params=params)
if response.status_code == 200:
data = response.json()
if data[« features »]:
coordinates = data[« features »][0][« geometry »][« coordinates »]
return coordinates[1], coordinates[0]
kwarg_logger.warning(f« {address} non geoloc »)
return None, None
@transformer
def transform(df, *args, **kwargs):
kwarg_logger = kwargs.get(« logger »)
kwarg_logger.info(« geoloc_bano in »)
address_fields = [
« numeroVoieEtablissement »,
« typeVoieEtablissement »,
« libelleVoieEtablissement »,
« codePostalEtablissement »,
« libelleCommuneEtablissement »,
]
for index, row in df.iterrows():
if pd.isna(row[« latitude »]) or pd.isna(row[« longitude »]):
address = » « .join(
[str(row[field]) for field in address_fields if pd.notna(row[field])]
)
lat, lon = get_gps_coordinates(address, kwarg_logger)
if lat is not None and lon is not None:
df.at[index, « latitude »] = lat
df.at[index, « longitude »] = lon
time.sleep(0.1)
kwarg_logger.info(« geoloc_bano out »)
return df
@test
def test_output(output, *args) -> None:
« » »
Template code for testing the output of the block.
Création export_siren_to_s3
Une fois nos données récupérées, on peut les transmettre au dernier bloc d’export qui va se charger de transformer la DataFrame en un fichier .parquet, puis de pousser ce .parquet dans S3. Notre pipeline est ainsi terminé. Il est à noter que nous utilisons ici la variable “ACCESS” et le mot de passe “SECRET3”, qu’il faudra définir avec vos identifiants S3.
import os
import boto3
from mage_ai.data_preparation.shared.secrets import get_secret_value
if « data_exporter » not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data(df, *args, **kwargs):
kwarg_logger = kwargs.get(« logger »)
kwarg_logger.info(« export_siren_to_s3 in »)
parquet_filename = f« siren_{str(kwargs[‘CODE_DEP’])}_{kwargs[‘CODE_NAF’]}.parquet »
try:
df.to_parquet(parquet_filename, index=False)
kwarg_logger.info(« Dataframe écrit au format .parquet avec succès »)
s3 = boto3.client(
service_name=kwargs[« NAME »].lower(),
aws_access_key_id=kwargs[« ACCES »],
aws_secret_access_key=get_secret_value(« SECRET3 »),
)
kwarg_logger.info(« Connexion à S3 réussie »)
s3.upload_file(
parquet_filename,
« mchevallier-buckets3 »,
f« ARTICLE/siren_{str(kwargs[‘CODE_DEP’])}_{kwargs[‘CODE_NAF’]}.parquet »,
)
kwarg_logger.info(« Chargement vers S3 réussi »)
except Exception as e:
kwarg_logger.error(f« Erreur lors du chargement vers S3 : {str(e)} »)
raise
finally:
if os.path.exists(parquet_filename):
os.remove(parquet_filename)
kwarg_logger.info(« Nettoyage du .parquet réussi »)
On peut ensuite utiliser un outil comme Dremio pour venir interroger et visualiser cette donnée.
Visualisation d’un .parquet dans Dremio
Remarques
Il est aussi intéressant de noter qu’à tout moment, la qualité des données peut être vérifiée dans le pipeline à l’aide des blocs “power ups” dans le panneau latéral droit. Ce type de bloc permet d’utiliser Great Expectations et est générique, c’est-à-dire qu’on peut choisir plusieurs blocs du pipeline sur lesquels appliquer la vérification. Par exemple, ici, on peut vérifier que notre DataFrame ne contient que les données du bon département.
if ‘extension’ not in globals():
from mage_ai.data_preparation.decorators import extension
@extension(‘great_expectations’)
def validate(validator, *args, **kwargs):
validator.expect_column_values_to_match_regex(‘codePostalEtablissement’, r‘^’+str(kwargs[« CODE_DEP »]))
Entre chaque bloc, on peut commenter ce qui se passe avec des blocs de markdown. Généralement, je commence chaque pipeline avec un bloc de markdown décrivant les sources, les cibles du pipeline ainsi que son fonctionnement global.
Orchestration et logs
L’orchestration des pipelines se fait via le panneau latéral gauche dans l’onglet « Triggers », puis en cliquant sur le bouton « New Trigger ». Cet onglet permet de créer des triggers qui vont lancer le pipeline avec la fréquence souhaitée ou selon des événements. Dans notre cas, nous allons lancer le pipeline une seule fois, donc nous sélectionnons « Once ». Chaque trigger peut recevoir un ou plusieurs tags permettant de mieux organiser les différents triggers.
Exemple de création de trigger
Une fois le trigger créé, on peut le visualiser en cliquant sur « Triggers » dans le panneau latéral gauche. On se retrouve ainsi dans un menu contenant tous les triggers créés pour un pipeline. Ce menu permet de visualiser l’état des triggers (si le pipeline a réussi ou échoué) et donne accès aux détails et aux logs du pipeline.
Présentation du menu Trigger
En cliquant sur le bouton « Logs » dans le panneau latéral gauche ou sur le bouton dans l’onglet des logs, il est possible de visualiser les logs de chaque pipeline et de les filtrer en fonction du niveau de logs souhaité. En cliquant sur un log, un panneau latéral s’ouvre et permet d’accéder aux détails du log.
Exemple de logs dans Mage.ai
Conclusion
Dans cet article, nous nous sommes intéressés aux origines et aux qualités de Mage.ai. Nous avons montré étape par étape comment créer un pipeline fonctionnel en utilisant Mage. Nous avons ensuite montré comment orchestrer ce pipeline et lire ses logs.
Dans un prochain article, nous nous intéresserons à l’utilisation des fonctionnalités plus avancées de Mage.ai.