Apache Airflow® est un orchestrateur de workflows puissant, basé sur la planification de DAGs (Directed Acyclic Graphs). Par défaut, les DAGs sont planifiés à l’aide d’expressions cron classiques ou d’objets Timetable intégrés (CronDataTimetable, OnceTimetable, etc….).
Mais dans la réalité, les besoins sont souvent plus subtils que ce qu’une simple cron peut exprimer :
- Exécuter plusieurs fois un DAG dans une journée à des heures spécifiques non régulières.
- Déclencher un traitement uniquement à des dates et heures précises, dans le cadre d’un projet ponctuel ou d’une synchronisation inter-systèmes.
- Gérer des événements métier non prédictibles comme les changements d’heure, les jours fériés ou les périodes de blackout technique.
C’est là qu’interviennent les Timetables personnalisés, une fonctionnalité puissante et sous-utilisée d’Airflow.
Pourquoi ne pas dupliquer un DAG ?
Certain·e·s développeur·euse·s contournent la limitation des crons en dupliquant un DAG plusieurs fois avec des heures différentes. C’est une mauvaise pratique car cela :
- Rend la maintenance plus complexe (plusieurs DAGs à surveiller, historiser, documenter).
- Rend les logs, les métriques et le monitoring plus dispersés.
- Multiplie les risques d’erreurs sur les dépendances ou les paramètres.
Un seul DAG avec une Timetable bien pensée est souvent la solution la plus propre, scalable et lisible.
Cas 1 : MultiTimeTimetable – Plusieurs exécutions par jour
Cas d’usage métier réel
Dans un projet d’analyse temps réel pour une chaîne logistique, nous devions récupérer des fichiers FTP plusieurs fois par jour, à des horaires bien précis définis contractuellement :
- 06:15
- 07:14
- 08:16
- 10:00
Ces horaires ne sont pas réguliers (impossible d’utiliser un simple */X) et ne peuvent pas être représentés par une seule cron
Solution : créer un MultiTimeTimetable
L’idée est de créer une classe MultiTimeTimetable, qui prend une liste fixe d’horaires journaliers ([« HH:MM »]) et planifie l’exécution du DAG selon ces heures.
Si toutes les heures du jour sont déjà passées, la prochaine exécution sera au premier horaire du lendemain.
Code – MultiTimeTimetable
from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable
import pendulum
from pendulum import time
from typing import List, Optional, Dict, Any
class MultiTimeTimetable(Timetable):
"""
Timetable personnalisé pour exécuter plusieurs fois par jour à des horaires fixes.
"""
# Liste d'horaires fixes (modifiable ici)
FIXED_RUN_TIMES_STR = ["06:15", "07:14", "08:16","10:00"]
FIXED_RUN_TIMES = [time(int(t.split(":")[0]), int(t.split(":")[1])) for t in FIXED_RUN_TIMES_STR]
def __init__(self):
# Aucun paramètre en entrée, on utilise les horaires codés en dur
self.run_times_str = self.FIXED_RUN_TIMES_STR
self.run_times = self.FIXED_RUN_TIMES
def infer_manual_data_interval(self, run_after: pendulum.DateTime) -> tuple:
return (run_after, run_after)
def next_dagrun_info(
self,
last_automated_data_interval: Optional[tuple],
restriction: TimeRestriction
) -> Optional[DagRunInfo]:
tz = restriction.earliest.tz if restriction.earliest else pendulum.now().tz
now_time = pendulum.now(tz)
# Génère les horaires possibles pour aujourd'hui
future_times = sorted([
now_time.replace(hour=t.hour, minute=t.minute, second=0, microsecond=0)
for t in self.run_times
])
# Cherche la prochaine exécution aujourd'hui
for run_at in future_times:
if run_at > now_time:
return DagRunInfo.interval(start=run_at, end=run_at)
# Sinon, planifie la première exécution de demain
next_day = now_time.add(days=1)
next_run = next_day.replace(
hour=self.run_times[0].hour,
minute=self.run_times[0].minute,
second=0,
microsecond=0
)
return DagRunInfo.interval(start=next_run, end=next_run)
def serialize(self) -> Dict[str, Any]:
return {
"__type": "custom_operators.multi_time_timetable.MultiTimeTimetable"
}
@classmethod
def deserialize(cls, data: Dict[str, Any]) -> "MultiTimeTimetable":
return cls()
Exemple de DAG utilisant ce Timetable
Ce DAG exécute une tâche simple (say_hello) à chaque horaire spécifié dans la Timetable personnalisée.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from custom_operators.multi_time_timetable_fixe import MultiTimeTimetable
def hello():
print("Hello from MultiTimeTimetable DAG")
default_args = {
"owner": "airflow",
}
with DAG(
dag_id="my_multi_time_fixe_dag",
default_args=default_args,
start_date=datetime(2025, 7, 1),
catchup=False,
timetable=MultiTimeTimetable(),
tags=["example"],
) as dag:
task = PythonOperator(
task_id="say_hello",
python_callable=hello,
)

Cas 2 : MultiDateTimetable – Déclenchements ponctuels à des dates précises
Cas d’usage métier réel
Dans le cadre d’un plan de migration ERP (Enterprise Resource Planning), nous avons dû déclencher des traitements techniques à des dates et heures très précises, validées dans un cahier de recettes et synchronisées avec une équipe externe :
- 2025-04-05 08:07
- 2025-05-17 08:09
- 2025-07-17 08:11
- 2026-01-01 08:11
Ces exécutions, bien que rares, sont critiques.
Toute erreur (exécution anticipée ou oubli) pouvait perturber un système tiers sensible.
Or, ce besoin est impossible à exprimer avec une expression CRON : on souhaite exécuter un DAG exactement à certaines dates/heures, quelques fois par an.
Solution : créer un MultiDateTimetable
On crée une classe MultiDateTimetable qui contient une liste fixe de timestamps (« YYYY-MM-DD HH:MM ») et garantit que le DAG ne s’exécutera que sur ces créneaux, ni avant, ni après.
Code – MultiDateTimetable
from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable
import pendulum
from typing import Optional, Dict, Any
class MultiDateTimetable(Timetable):
"""
Timetable personnalisé exécutant un DAG à une liste fixe de dates et heures.
"""
FIXED_DATES = [
"2025-04-05 08:07",
"2025-05-17 08:09",
"2025-07-17 08:11",
"2026-01-01 08:11"
]
def __init__(self):
# Initialisation directe avec la liste fixe
self.run_datetimes_str = self.FIXED_DATES
self.run_datetimes = sorted([
pendulum.parse(dt_str) for dt_str in self.run_datetimes_str
])
def infer_manual_data_interval(self, run_after: pendulum.DateTime) -> tuple:
return (run_after, run_after)
def next_dagrun_info(
self,
last_automated_data_interval: Optional[tuple],
restriction: TimeRestriction
) -> Optional[DagRunInfo]:
tz = restriction.earliest.tz if restriction.earliest else pendulum.now().tz
now = pendulum.now(tz)
for dt in self.run_datetimes:
if dt > now and (not restriction.latest or dt <= restriction.latest):
return DagRunInfo.interval(start=dt, end=dt)
return None
def serialize(self) -> Dict[str, Any]:
return {
"__type": "custom_operators.multi_date_timetable.MultiDateTimetable"
}
@classmethod
def deserialize(cls, data: Dict[str, Any]) -> "MultiDateTimetable":
return cls()
Exemple de DAG avec MultiDateTimetable
Ce DAG exécute une tâche simple à des moments bien définis, indépendamment de toute régularité.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from custom_operators.Multi_Date_Timetable_fixe import MultiDateTimetable
def hello():
print("Hello from MultiTimeTimetable DAG")
default_args = {
"owner": "airflow",
}
with DAG(
dag_id="multi_date_execution_fixe_dag",
default_args=default_args,
start_date=datetime(2025, 7, 1),
catchup=False,
timetable=MultiDateTimetable(),
tags=["example"],
) as dag:
task = PythonOperator(
task_id="say_hello",
python_callable=hello,
)

Pourquoi utiliser une timetable personnalisée
1. Flexibilité: Permet de planifier des DAGs à des horaires ou dates complexes, impossibles à gérer avec cron ou schedule_interval.
2. Lisibilité métier: Exprime clairement des règles métier (ex : jours ouvrés, heures spécifiques, dates fixes).
3. Ré-utilisabilité: Une fois définie, tu peux réutiliser la logique de planification dans plusieurs DAGs.
4. Fiabilité: Moins d’erreurs que les cron compliqués, et plus facile à tester et maintenir.
BONUS – Timetables intelligents & paramétrables
1- Rendre les Timetables réutilisables et flexibles
Jusqu’ici, nos classes MultiTimeTimetable et MultiDateTimetable utilisaient des listes codées en dur (FIXED_RUN_TIMES, FIXED_DATES).
Mais dans un vrai projet, on peut vouloir :
- Utiliser la même classe de Timetable dans plusieurs DAGs
- Avec des heures différentes dans chaque DAG
- Ou des dates spécifiques selon le contexte métier (ex : environnement, région, client)
Solution : rendre les Timetables paramétrables
On peut modifier la classe pour qu’elle accepte une liste en paramètre du constructeur, ce qui rend la Timetable générique et réutilisable :
Exemple : version paramétrable de MultiTimeTimetable
class MultiTimeTimetable(Timetable):
def __init__(self, run_times_str: list[str]):
self.run_times_str = run_times_str
self.run_times = [
pendulum.time(int(t.split(":")[0]), int(t.split(":")[1]))
for t in run_times_str
]
Et dans le DAG :
timetable=MultiTimeTimetable(["08:15", "12:30", "13:59", "17:45"])
2 – Timetables hybrides et intelligents
Exemples de comportements avancés :
- Sauter les jours fériés ou week-ends
Avec des bibliothèques comme holidays ou une API d’entreprise - Déclencher un DAG seulement si une condition externe est remplie
Par exemple :- Un fichier est disponible sur un SFTP ou un bucket
- Une API a confirmé qu’un traitement amont est terminé
- Une variable Airflow ou un tag est activé
- Limiter les exécutions à des plages horaires spécifiques sauf certains jours
- Exemple : « toutes les heures sauf entre 12h et 14h les mercredis et vendredis »
Conclusion
Les Timetables personnalisés sont un outil essentiel pour quiconque veut faire passer Airflow d’un simple orchestrateur à une véritable plateforme d’automatisation métier.
Ils permettent :
- D’intégrer les règles métier complexes.
- D’éviter les crons illisibles ou les DAGs dupliqués.
- D’assurer des workflows fiables, planifiables et maintenables.

