Si tu as suivi ma série sur Snowflake jusqu'ici, on a déjà vu pas mal de briques :
- L'architecture Snowflake (storage / compute / cloud services)
- Les différents types de tables
- Les views
- Les stages
- Les file formats
- Et plus récemment les streams pour suivre ce qui a changé dans une table
On a posé les bases : ingestion, stockage, exposition des données et suivi des changements avec les streams et la question logique qui arrive derrière, c'est :
“Qui déclenche mes requêtes, et quand ?”
C'est exactement le rôle des Tasks.
Tasks Snowflake vs Airflow ?
C'est quoi concrètement une Task Snowflake ?
Une Task est un objet Snowflake (comme une table, une view, un stream ...) dont le job est de lancer du SQL automatiquement selon un planning ou une condition.
Tu peux y mettre :
- une requête SQL simple,
- ou l'appel d'une procédure stockée (qui elle-même peut contenir plusieurs requêtes).
Quand utiliser une Task Snowflake ?
Une Task simple si : tu veux exécuter une requête SQL ou une procédure stockée à intervalle régulier (toutes les X minutes ou via CRON), sans dépendance.
Une Task avec WHEN si : tu veux optimiser le coût en ne déclenchant qu'à la présence de données nouvelles (typique combo avec un stream et SYSTEM$STREAM_HAS_DATA).
Une chaîne de Tasks (AFTER) si : tu construis un mini-DAG dans Snowflake où plusieurs étapes doivent s'enchaîner (Bronze => Silver => Gold typique en architecture Médaillon).
Une Task serverless si : tu ne veux pas gérer le warehouse et tu laisses Snowflake adapter automatiquement la taille selon la charge.
Exemple de la création d'une Task
Comme toujours rien ne vaut un exemple pour comprendre le concept, on commence par créer une table de log.
CREATE OR REPLACE TABLE task_log (
run_at TIMESTAMP_NTZ,
comment STRING
);
Ensuite, on crée une Task qui ajoute une ligne dans cette table toutes les 5 minutes :
CREATE OR REPLACE TASK log_task
WAREHOUSE = COMPUTE_WH
SCHEDULE = '5 MINUTE'
AS
INSERT INTO task_log
SELECT CURRENT_TIMESTAMP(), 'Task exécutée';
Pour les arguments :
WAREHOUSE = COMPUTE_WH=> on demande à la Task d'utiliser le warehouse COMPUTE_WHSCHEDULE = '5 MINUTE'=> exécution toutes les 5 minutes,- dans le bloc
AS=> le SQL à lancer à chaque run.
Nb : Au moment où on crée la Task, elle est suspendue. Il faut la démarrer :
ALTER TASK log_task RESUME;
À partir de là, Snowflake exécute l'INSERT toutes les 5 minutes.
Un SELECT * FROM task_log montre les exécutions qui s'accumulent.
Comment le compute est utilisé dans les Tasks
Pour exécuter le SQL d'une Task, Snowflake a besoin de compute.
On a principalement deux options :
- Task “classique” avec un warehouse qu'on gère nous-mêmes
- Task serverless, où Snowflake gère le compute.
Dans l'exemple précédent, on était en mode classique car on a indiqué WAREHOUSE = COMPUTE_WH et l'avantage c'est qu'on maîtrise précisément quel warehouse est utilisé dans la tache.
Snowflake propose aussi de laisser le moteur gérer tout ça pour nous avec des Tasks serverless. Dans ce cas, on ne donne pas de warehouse, mais juste une taille initiale, et on laisse Snowflake se débrouiller tout seul comme un grand.
Par exemple :
CREATE OR REPLACE TASK log_task_serverless
USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'
SCHEDULE = '5 MINUTE'
AS
INSERT INTO task_log
SELECT CURRENT_TIMESTAMP(), 'Task serverless exécutée';
Ici, on laisse Snowflake s’occuper du compute pour cette Task.
Streams + Tasks : le duo pour le chargement incrémental
Dans l'article sur les streams, on a vu comment Snowflake peut nous dire :
“Voici ce qui a changé dans cette table depuis la dernière fois.”
Les Tasks sont le complément naturel :
“À quel moment on lit ces changements, et quelle est l’étape suivante du pipeline qu’on enchaîne derrière ?”
Prenons l'exemple de l'article sur les Streams :
- on a une table
membersqui reçoit des nouvelles lignes, - on veut garder une autre table
members_dimà jour avec ces mêmes données,
CREATE OR REPLACE TABLE members (
id NUMBER,
name STRING,
age NUMBER
);
CREATE OR REPLACE TABLE members_dim LIKE members;
CREATE OR REPLACE STREAM members_stream
ON TABLE members
APPEND_ONLY = TRUE;
Ici, on utilise un stream APPEND_ONLY pour simplifer et ne suivre que les nouvelles lignes.
On peut alors créer une Task qui exécute régulièrement un MERGE entre le stream et members_dim :
CREATE OR REPLACE TASK members_dim_task
WAREHOUSE = COMPUTE_WH
SCHEDULE = '5 MINUTE'
AS
MERGE INTO members_dim d
USING members_stream s
ON d.id = s.id
WHEN MATCHED THEN
UPDATE SET
d.name = s.name,
d.age = s.age
WHEN NOT MATCHED THEN
INSERT (id, name, age)
VALUES (s.id, s.name, s.age);
On démarre la Task :
ALTER TASK members_dim_task RESUME;
Ce qui se passe ensuite est assez simple :
- des nouvelles lignes arrivent dans
members - le stream
members_streamles expose comme un delta - toutes les 5 minutes, la Task lit le stream et applique les changements sur
members_dim
On obtient un chargement incrémental piloté directement depuis Snowflake.
Lancer les tasks uniquement s’il y a du delta
Pour l'optimisation, on n'a pas forcément envie que la Task s'exécute pour rien si le stream est vide.
Snowflake permet d'ajouter une condition WHEN sur la Task.
CREATE OR REPLACE TASK members_dim_task
WAREHOUSE = COMPUTE_WH
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('MEMBERS_STREAM')
AS
MERGE INTO members_dim d
USING members_stream s
ON d.id = s.id
WHEN MATCHED THEN
UPDATE SET
d.name = s.name,
d.age = s.age
WHEN NOT MATCHED THEN
INSERT (id, name, age)
VALUES (s.id, s.name, s.age);
La fonction SYSTEM$STREAM_HAS_DATA('MEMBERS_STREAM') renvoie TRUE s'il y a du delta à traiter.
Dans le cas contraire, la Task se “réveille”, regarde la condition, et se rendort sans lancer le MERGE ni consommer de crédits pour rien.
Chaîner plusieurs Tasks
Dans la vraie vie, c'est rare d'avoir une tâche toute seule qui vit sa vie dans son coin. La plupart du temps, on a une petite cascade de tâches qui doivent s'enchaîner.
Par exemple :
- Une Task charge les données brutes dans des tables de staging
- Une autre construit les modèles métiers (data marts),
- Une troisième met à jour des agrégations ou des vues matérialisées.
Ça donne quelque chose comme :
CREATE OR REPLACE TASK task_load_raw
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON 0 2 * * * Europe/Paris'
AS
CALL sp_load_raw_data();
CREATE OR REPLACE TASK task_build_marts
WAREHOUSE = COMPUTE_WH
AFTER task_load_raw
AS
CALL sp_build_marts();
task_load_rawva tourner tous les jours à 2h du matin,task_build_martsn'a pas deSCHEDULEmais va se déclencher automatiquement aprèstask_load_raw.
On peut chaîner plusieurs niveaux comme ça et se retrouver avec un petit DAG de Tasks.
Dans un prochain article, on pourra pousser le curseur un peu plus loin et comparer le duo Streams + Tasks avec les Dynamic Tables, pour voir dans quels cas on préfère l'un ou l'autre.
Récap. des options Tasks
| Paramètre | Valeur | Usage |
|---|---|---|
SCHEDULE |
'5 MINUTE' ou intervalle |
Exécution périodique simple (1 minute à 11520 minutes) |
SCHEDULE |
'USING CRON ...' |
Planification fine via expression CRON avec timezone |
WAREHOUSE |
nom d'un warehouse | Task classique : Snowflake utilise ton warehouse |
USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE |
'XSMALL', 'SMALL'... |
Task serverless : Snowflake gère le compute |
WHEN |
SYSTEM$STREAM_HAS_DATA('...') |
Déclencher uniquement si le stream contient des données |
AFTER |
nom d'une autre Task | Chaîner après une Task parent (mini-DAG) |
AS |
requête SQL ou CALL sp_...() |
Le code SQL à exécuter à chaque run |
Pour démarrer une Task créée : ALTER TASK ma_task RESUME; (par défaut elle est suspendue).
Aller plus loin : préparer la SnowPro Core
Les Tasks sont un sujet récurrent à l'examen SnowPro Core, particulièrement la combinaison Streams + Tasks pour le chargement incrémental, les conditions WHEN avec SYSTEM$STREAM_HAS_DATA, et le chaînage avec AFTER. Sur DataCertification.fr, le module SnowPro Core couvre ces sujets avec 700 questions scenario-based au format de l'examen.
👉 Préparer la SnowPro Core sur DataCertification.fr
Pour aller plus loin sur Snowflake en général, j'ai regroupé tous mes articles dans un parcours complet.
👉 Accéder à la Formation Snowflake
Tu veux que je t'accompagne sur ton projet data (Snowflake, pipelines, orchestration, dbt, monitoring) ?
👉 Réserver un appel de 30 minutes
Questions
C'est quoi une Task dans Snowflake ?
Une Task Snowflake est un objet qui exécute du SQL automatiquement selon un planning (SCHEDULE ou CRON) ou une condition (WHEN). Elle peut contenir une requête SQL simple ou l'appel d'une procédure stockée. C'est le scheduler interne de Snowflake pour orchestrer les pipelines de données sans avoir besoin d'un orchestrateur externe comme Airflow.
Comment planifier une Task Snowflake ?
Deux options : SCHEDULE = '5 MINUTE' pour une exécution périodique simple (entre 1 minute et 11520 minutes), ou SCHEDULE = 'USING CRON 0 2 * * * Europe/Paris' pour une planification fine avec timezone. Attention : par défaut une Task créée est suspendue, il faut la démarrer avec ALTER TASK nom RESUME.
Quelle différence entre Tasks Snowflake et Airflow ?
Les Tasks Snowflake sont le scheduler interne de Snowflake, parfaites pour orchestrer des pipelines 100% SQL/Snowflake (chargements, transformations, streams). Airflow est un orchestrateur externe plus puissant, utile dès que le pipeline doit communiquer avec d'autres briques (APIs, autres bases, Spark, ML). Les deux ne s'opposent pas : on peut avoir des Tasks Snowflake déclenchées par Airflow, ou utiliser uniquement des Tasks pour un pipeline 100% Snowflake.
Comment chaîner plusieurs Tasks Snowflake ?
Utilise le paramètre AFTER pour créer une dépendance : CREATE TASK task_b WAREHOUSE = ... AFTER task_a déclenche task_b automatiquement après la fin de task_a. La task parent doit avoir un SCHEDULE, les tasks enfants n'en ont pas. Tu peux chaîner plusieurs niveaux pour créer un mini-DAG.
C'est quoi une Task serverless dans Snowflake ?
Une Task serverless est une Task qui n'utilise pas de warehouse classique. Au lieu de spécifier WAREHOUSE, tu utilises USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL' (ou autre taille). Snowflake gère le compute automatiquement, en adaptant la taille à la charge. Avantage : pas de warehouse à gérer, idéal pour les petites Tasks fréquentes. Inconvénient : moins de contrôle fin sur le coût.
Comment déclencher une Task uniquement si un stream a des données ?
Utilise le paramètre WHEN avec la fonction SYSTEM$STREAM_HAS_DATA('nom_du_stream'). Exemple : WHEN SYSTEM$STREAM_HAS_DATA('members_stream'). À chaque exécution prévue, la Task vérifie d'abord la condition. Si le stream est vide, la Task ne déclenche pas le SQL et ne consomme pas de crédits. C'est l'optimisation standard pour le chargement incrémental basé sur streams.
Pourquoi ma Task Snowflake ne se déclenche pas ?
Trois causes principales : (1) la Task est en mode SUSPENDED par défaut à la création, il faut la démarrer avec ALTER TASK nom RESUME, (2) si la Task a une condition WHEN, elle ne s'exécute que si la condition est vraie (vérifier avec SYSTEM$STREAM_HAS_DATA par exemple), (3) le rôle de la Task n'a pas les bons droits sur les objets référencés. Pour debug, regarder TASK_HISTORY : SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY()).
Comment surveiller l'historique des Tasks Snowflake ?
Utilise la fonction TABLE(INFORMATION_SCHEMA.TASK_HISTORY()) pour les 7 derniers jours sans latence, ou la vue SNOWFLAKE.ACCOUNT_USAGE.TASK_HISTORY pour l'historique sur 365 jours (latence de 45 min à 3h). Tu y trouves le statut de chaque exécution (SUCCEEDED, FAILED, SKIPPED), la durée, le warehouse utilisé et les éventuelles erreurs. C'est l'outil de base pour debugger un pipeline.
Quelle est la fréquence minimale d'une Task Snowflake ?
La fréquence minimale d'une Task avec SCHEDULE est de 1 minute (SCHEDULE = '1 MINUTE'). Pour des fréquences plus courtes, Snowflake ne supporte pas natif : il faut soit utiliser Snowpipe (pour l'ingestion continue), soit Snowpipe Streaming, soit des Dynamic Tables avec un TARGET_LAG court. Le maximum est de 11520 minutes (8 jours).
Comment activer ou désactiver une Task Snowflake ?
Activer : ALTER TASK nom_task RESUME. Désactiver : ALTER TASK nom_task SUSPEND. Pour modifier une Task existante (changer le schedule, le warehouse, le SQL), il faut d'abord la SUSPEND, puis faire un CREATE OR REPLACE, puis RESUME. Pour les Tasks chaînées avec AFTER, suspendre la task parent met automatiquement en pause toute la chaîne.
