Partager via


Tutoriel : Créer un pipeline ETL à l’aide de la capture de données modifiées

Découvrez comment créer et déployer un pipeline ETL (extraire, transformer et charger) avec capture de données modifiées (CDC) à l’aide de Lakeflow Spark Declarative Pipelines (SDP) pour l’orchestration des données et le chargeur automatique. Un pipeline ETL implémente les étapes permettant de lire des données à partir de systèmes sources, de transformer ces données en fonction des exigences, telles que les vérifications de qualité des données et la déduplication des enregistrements, et d’écrire les données dans un système cible, comme un entrepôt de données ou un lac de données.

Dans ce tutoriel, vous allez utiliser des données d’une customers table dans une base de données MySQL pour :

  • Extrayez les modifications d’une base de données transactionnelle à l’aide de Debezium ou d’un autre outil et enregistrez-les dans le stockage d’objets cloud (S3, ADLS ou GCS). Dans ce tutoriel, vous ignorez la configuration d’un système cdc externe et générez plutôt des données factices pour simplifier le didacticiel.
  • Utilisez le chargeur automatique pour charger de manière incrémentielle les messages à partir du stockage d’objets cloud et stocker les messages bruts dans la customers_cdc table. Le chargeur automatique déduit le schéma et gère l’évolution du schéma.
  • Créez la customers_cdc_clean table pour vérifier la qualité des données à l'aide des attentes. Par exemple, il id ne doit jamais être null dû au fait qu’il est utilisé pour exécuter des opérations upsert.
  • Effectuez AUTO CDC ... INTO sur les données CDC nettoyées pour intégrer les modifications dans la table finale customers.
  • Montre comment un pipeline peut créer une table de dimension de type 2 à variation lente (SCD2) pour suivre toutes les modifications.

L’objectif est d’ingérer les données brutes en quasi temps réel et de créer une table pour votre équipe d’analystes tout en garantissant la qualité des données.

Le tutoriel utilise l’architecture de médaillon Lakehouse, où il ingère des données brutes par le biais de la couche bronze, nettoie et valide les données avec la couche argent, et applique la modélisation et l’agrégation dimensionnées à l’aide de la couche or. Pour plus d’informations, consultez Qu'est-ce que l'architecture Lakehouse Medallion.

Le flux implémenté ressemble à ceci :

Pipeline avec CDC

Pour plus d’informations sur le pipeline, Auto Loader et la capture de données modifiées (CDC), consultez Lakeflow Spark Declarative Pipelines, Qu'est-ce que Auto Loader ? et Qu’est-ce que la capture de données modifiées (CDC) ?

Spécifications

Pour suivre ce tutoriel, vous devez répondre aux exigences suivantes :

Capture des changements de données dans un pipeline ETL

La capture de données modifiées (CDC) est le processus qui capture les modifications apportées aux enregistrements apportés à une base de données transactionnelle (par exemple, MySQL ou PostgreSQL) ou un entrepôt de données. CDC capture les opérations telles que les suppressions de données, les ajouts et les mises à jour, généralement sous forme de flux pour re-matérialiser les modifications des tables dans des systèmes externes. La capture de données modifiées (CDC) active le chargement incrémentiel tout en éliminant la nécessité de mises à jour par lot.

Note

Pour simplifier ce tutoriel, évitez de configurer un système CDC externe. Supposons qu’il exécute et enregistre des données CDC en tant que fichiers JSON dans le stockage d’objets cloud (S3, ADLS ou GCS). Ce tutoriel utilise la Faker bibliothèque pour générer les données utilisées dans le didacticiel.

Capture de données modifiées

Divers outils CDC sont disponibles. L’une des principales solutions open source est Debezium, mais d’autres implémentations qui simplifient les sources de données existent, telles que Fivetran, Qlik Replicate, StreamSets, Talend, Oracle GoldenGate et AWS DMS.

Dans ce tutoriel, vous utilisez des données CDC à partir d’un système externe comme Debezium ou DMS. Debezium capture chaque ligne modifiée. Il envoie généralement l’historique des modifications de données aux rubriques Kafka ou les enregistre sous forme de fichiers.

Vous devez ingérer les informations CDC à partir du customers tableau (format JSON), vérifier qu'elles sont correctes, puis matérialiser la table des clients dans le Lakehouse.

Entrée CDC de Debezium

Pour chaque modification, vous recevez un message JSON contenant tous les champs de la ligne en cours de mise à jour (id, firstname, lastname, email, ). address Le message inclut également des métadonnées supplémentaires :

  • operation: code d’opération, généralement (DELETE, APPEND, UPDATE).
  • operation_date: date et horodatage de l’enregistrement pour chaque action d’opération.

Les outils tels que Debezium peuvent produire des sorties plus avancées, telles que la valeur de ligne avant la modification, mais ce didacticiel les omet par souci de simplicité.

Étape 1 : Créer un pipeline

Créez un pipeline ETL pour interroger votre source de données CDC et générer des tables dans votre espace de travail.

  1. Dans votre espace de travail, cliquez sur l’icône Plus. Nouveautés dans le coin supérieur gauche.

  2. Cliquez sur Pipeline ETL.

  3. Remplacez le titre du pipeline Pipelines with CDC tutorial par ou par un nom que vous préférez.

  4. Sous le titre, choisissez un catalogue et un schéma pour lesquels vous disposez d’autorisations d’écriture.

    Ce catalogue et ce schéma sont utilisés par défaut si vous ne spécifiez pas de catalogue ou de schéma dans votre code. Votre code peut écrire dans n’importe quel catalogue ou schéma en spécifiant le chemin complet. Ce tutoriel utilise les valeurs par défaut que vous spécifiez ici.

  5. Dans les options avancées, sélectionnez Démarrer avec un fichier vide.

  6. Choisissez un dossier pour votre code. Vous pouvez sélectionner Parcourir pour parcourir la liste des dossiers dans l’espace de travail. Vous pouvez choisir n’importe quel dossier pour lequel vous disposez d’autorisations d’écriture.

    Pour utiliser le contrôle de version, sélectionnez un dossier Git. Si vous devez créer un dossier, sélectionnez l’icône Plus.

  7. Choisissez Python ou SQL pour le langage de votre fichier, en fonction du langage que vous souhaitez utiliser pour le didacticiel.

  8. Cliquez sur Sélectionner pour créer le pipeline avec ces paramètres et ouvrez l’éditeur de pipelines Lakeflow.

Vous disposez maintenant d’un pipeline vide avec un catalogue et un schéma par défaut. Ensuite, configurez les exemples de données à importer dans le didacticiel.

Étape 2 : Créer les exemples de données à importer dans ce tutoriel

Cette étape n’est pas nécessaire si vous importez vos propres données à partir d’une source existante. Pour ce tutoriel, générez des données factices comme exemple pour le didacticiel. Créez un notebook pour exécuter le script de génération de données Python. Ce code ne doit être exécuté qu’une seule fois pour générer les exemples de données. Créez-le donc dans le dossier du explorations pipeline, qui n’est pas exécuté dans le cadre d’une mise à jour de pipeline.

Note

Ce code utilise Faker pour générer des exemples de données du CDC. Faker est disponible pour l’installation automatique, ce qui permet au tutoriel d’utiliser %pip install faker. Vous pouvez également définir une dépendance sur faker pour le bloc-notes. Consultez Ajouter des dépendances au bloc-notes.

  1. Dans l’éditeur de pipelines Lakeflow, dans la barre latérale du navigateur de ressources à gauche de l’éditeur, cliquez sur l’icône Plus.Ajouter, puis choisissez Exploration.

  2. Donnez-lui un nom, par Setup dataexemple, sélectionnez Python. Vous pouvez conserver le dossier de destination par défaut, qui est un nouveau explorations dossier.

  3. Cliquez sur Créer. Cela crée un bloc-notes dans le nouveau dossier.

  4. Entrez le code suivant dans la première cellule. Vous devez modifier la définition de <my_catalog> et <my_schema> pour correspondre au catalogue et au schéma par défaut que vous avez sélectionnés dans la procédure précédente :

    %pip install faker
    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = dbName = db = "<my_schema>"
    
    spark.sql(f'USE CATALOG `{catalog}`')
    spark.sql(f'USE SCHEMA `{schema}`')
    spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`')
    volume_folder =  f"/Volumes/{catalog}/{db}/raw_data"
    
    try:
      dbutils.fs.ls(volume_folder+"/customers")
    except:
      print(f"folder doesn't exist, generating the data under {volume_folder}...")
      from pyspark.sql import functions as F
      from faker import Faker
      from collections import OrderedDict
      import uuid
      fake = Faker()
      import random
    
      fake_firstname = F.udf(fake.first_name)
      fake_lastname = F.udf(fake.last_name)
      fake_email = F.udf(fake.ascii_company_email)
      fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
      fake_address = F.udf(fake.address)
      operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
      fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
      fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)
    
      df = spark.range(0, 100000).repartition(100)
      df = df.withColumn("id", fake_id())
      df = df.withColumn("firstname", fake_firstname())
      df = df.withColumn("lastname", fake_lastname())
      df = df.withColumn("email", fake_email())
      df = df.withColumn("address", fake_address())
      df = df.withColumn("operation", fake_operation())
      df_customers = df.withColumn("operation_date", fake_date())
      df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
    
  5. Pour générer le jeu de données utilisé dans le didacticiel, tapez Maj + Entrée pour exécuter le code :

  6. Optional. Pour afficher un aperçu des données utilisées dans ce didacticiel, entrez le code suivant dans la cellule suivante et exécutez le code. Mettez à jour le catalogue et le schéma pour qu’ils correspondent au chemin d’accès du code précédent.

    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = "<my_schema>"
    
    display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))
    

Cela génère un jeu de données volumineux (avec des données cdc factices) que vous pouvez utiliser dans le reste du tutoriel. À l’étape suivante, ingérer les données à l’aide du chargeur automatique.

Étape 3 : Ingérer de façon incrémentielle des données avec le chargeur automatique

L’étape suivante consiste à ingérer les données brutes du stockage cloud (faux) dans une couche bronze.

Cela peut être difficile pour plusieurs raisons, car vous devez :

  • Fonctionner à grande échelle, potentiellement ingérer des millions de petits fichiers.
  • Déduire le schéma et le type JSON.
  • Gérer les enregistrements incorrects avec un schéma JSON incorrect.
  • Prenez soin de l’évolution du schéma (par exemple, une nouvelle colonne dans la table client).

Le chargeur automatique simplifie cette ingestion, y compris l’inférence de schéma et l’évolution du schéma, tout en effectuant une mise à l’échelle vers des millions de fichiers entrants. Le chargeur automatique est disponible en Python avec cloudFiles et en SQL avec le SELECT * FROM STREAM read_files(...) et peut être utilisé avec divers formats (JSON, CSV, Apache Avro, etc.) :

La définition de la table en tant que table de diffusion en continu garantit que vous consommez uniquement de nouvelles données entrantes. Si vous ne le définissez pas en tant que table de diffusion en continu, il analyse et ingère toutes les données disponibles. Pour plus d’informations, consultez les tables de diffusion en continu .

  1. Pour ingérer les données CDC entrantes à l’aide du chargeur automatique, copiez et collez le code suivant dans le fichier de code créé avec votre pipeline (appelé my_transformation.py). Vous pouvez utiliser Python ou SQL, en fonction du langage que vous avez choisi lors de la création du pipeline. Veillez à remplacer le <catalog> et le <schema> par ceux que vous avez configurés comme valeurs par défaut pour le pipeline.

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    # Replace with the catalog and schema name that
    # you are using:
    path = "/Volumes/<catalog>/<schema>/raw_data/customers"
    
    
    # Create the target bronze table
    dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")
    
    # Create an Append Flow to ingest the raw data into the bronze table
    @dp.append_flow(
      target = "customers_cdc_bronze",
      name = "customers_bronze_ingest_flow"
    )
    def customers_bronze_ingest_flow():
      return (
          spark.readStream
              .format("cloudFiles")
              .option("cloudFiles.format", "json")
              .option("cloudFiles.inferColumnTypes", "true")
              .load(f"{path}")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_bronze_ingest_flow AS
    INSERT INTO customers_cdc_bronze BY NAME
      SELECT *
      FROM STREAM read_files(
        -- replace with the catalog/schema you are using:
        "/Volumes/<catalog>/<schema>/raw_data/customers",
        format => "json",
        inferColumnTypes => "true"
      )
    
  2. Cliquez sur l’icône Lecture.Exécutez le fichier ou Exécutez le pipeline pour lancer une mise à jour du pipeline connecté. Avec un seul fichier source dans votre pipeline, ceux-ci sont fonctionnellement équivalents.

Une fois la mise à jour terminée, l’éditeur est mis à jour avec des informations sur votre pipeline.

  • Le graphique de pipeline (DAG), dans la barre latérale à droite de votre code, affiche une table unique. customers_cdc_bronze
  • Un résumé de la mise à jour s’affiche en haut de l'explorateur des actifs du pipeline.
  • Les détails de la table qui a été générée sont affichés dans le volet inférieur, et vous pouvez parcourir les données de la table en la sélectionnant.

Il s’agit des données de couche bronze brutes importées à partir du stockage cloud. À l’étape suivante, nettoyez les données pour créer une table de couche argent.

Étape 4 : Nettoyage et attentes pour suivre la qualité des données

Une fois la couche bronze définie, créez la couche argent en ajoutant des attentes pour contrôler la qualité des données. Vérifiez les conditions suivantes :

  • L’ID ne doit jamais être null.
  • Le type d’opération CDC doit être valide.
  • JSON doit être lu correctement par le chargeur automatique.

Les lignes qui ne répondent pas à ces conditions sont supprimées.

Consultez la gestion de la qualité des données selon les attentes de pipeline pour plus d’informations.

  1. Dans la barre latérale du navigateur des ressources de pipeline, cliquez sur l’Icône Plus.Ajouter, puis Transformation.

  2. Entrez un nom et choisissez un langage (Python ou SQL) pour le fichier de code source. Vous pouvez combiner et mettre en correspondance des langues au sein d’un pipeline. Vous pouvez donc choisir l’une ou l’autre pour cette étape.

  3. Pour créer une couche argent avec une table nettoyée et imposer des contraintes, copiez et collez le code suivant dans le nouveau fichier (choisissez Python ou SQL en fonction du langage du fichier).

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    dp.create_streaming_table(
      name = "customers_cdc_clean",
      expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
      )
    
    @dp.append_flow(
      target = "customers_cdc_clean",
      name = "customers_cdc_clean_flow"
    )
    def customers_cdc_clean_flow():
      return (
          spark.readStream.table("customers_cdc_bronze")
              .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
      CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
    )
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_cdc_clean_flow AS
    INSERT INTO customers_cdc_clean BY NAME
    SELECT * FROM STREAM customers_cdc_bronze;
    
  4. Cliquez sur l’icône Lecture.Exécutez le fichier ou Exécutez le pipeline pour lancer une mise à jour du pipeline connecté.

    Étant donné qu’il existe maintenant deux fichiers sources, ceux-ci ne font pas la même chose, mais dans ce cas, la sortie est la même.

    • Exécuter le pipeline exécute l’intégralité de votre pipeline, y compris le code de l’étape 3. Si vos données d’entrée étaient mises à jour, cela intégrerait toutes les modifications de cette source vers votre couche bronze. Cela n’exécute pas le code à partir de l’étape de configuration des données, car il se trouve dans le dossier d’explorations et ne fait pas partie de la source de votre pipeline.
    • Exécuter le fichier exécute uniquement le fichier source actuel. Dans ce cas, sans mise à jour de vos données d'entrée, les données silver sont générées à partir de la table bronze mise en cache. Il serait utile d’exécuter uniquement ce fichier pour accélérer l’itération lors de la création ou de la modification de votre code de pipeline.

Une fois la mise à jour terminée, vous pouvez voir que le graphique de pipeline affiche désormais deux tables (avec la couche argent en fonction de la couche bronze), et le panneau inférieur affiche les détails des deux tables. La partie supérieure du navigateur des actifs de pipeline affiche désormais les durées de plusieurs exécutions, mais uniquement les détails de l'exécution la plus récente.

Ensuite, créez votre dernière version de couche d'or de la table customers.

Étape 5 : matérialiser la table clients avec un flux AUTO CDC

Jusqu'à présent, les tables ont juste transmis les données CDC à chaque étape. À présent, créez la customers table pour contenir la vue la plus à jour et pour être une réplique de la table d’origine, et non la liste des opérations de capture de données de changements (CDC) qui l’ont créée.

Il n'est pas évident de l'implémenter manuellement. Vous devez prendre en compte des éléments tels que la déduplication des données pour conserver la ligne la plus récente.

Toutefois, Lakeflow Spark Declarative Pipelines résout ces défis avec l’opération AUTO CDC .

  1. Dans la barre latérale du navigateur des ressources de pipeline, cliquez sur l’icône Plus.Ajouter et transformer.

  2. Entrez un nom et choisissez un langage (Python ou SQL) pour le nouveau fichier de code source. Vous pouvez à nouveau choisir l’une ou l’autre langue de cette étape, mais utiliser le code approprié, ci-dessous.

  3. Pour traiter les données CDC à l’aide de AUTO CDC dans les pipelines déclaratifs Spark Lakeflow, copiez et collez le code suivant dans le nouveau fichier.

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    dp.create_streaming_table(name="customers", comment="Clean, materialized customers")
    
    dp.create_auto_cdc_flow(
      target="customers",  # The customer table being materialized
      source="customers_cdc_clean",  # the incoming CDC
      keys=["id"],  # what we'll be using to match the rows to upsert
      sequence_by=col("operation_date"),  # de-duplicate by operation date, getting the most recent value
      ignore_null_updates=False,
      apply_as_deletes=expr("operation = 'DELETE'"),  # DELETE condition
      except_column_list=["operation", "operation_date", "_rescued_data"],
    )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers;
    
    CREATE FLOW customers_cdc_flow
    AS AUTO CDC INTO customers
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 1;
    
  4. Cliquez sur l’icône Lecture.Exécutez le fichier pour lancer une mise à jour du pipeline connecté.

Une fois la mise à jour terminée, vous pouvez voir que votre graphique de pipeline affiche 3 tables, progressant du bronze à l'argent à l'or.

Étape 6 : Suivi de l’historique des mises à jour avec le type de dimension à variation lente de type 2 (SCD2)

Il est souvent nécessaire de créer une table qui suit toutes les modifications résultant de APPEND, UPDATEet DELETE:

  • Historique : vous souhaitez conserver un historique de toutes les modifications apportées à votre table.
  • Traçabilité : vous souhaitez voir quelle opération s’est produite.

SCD2 avec Lakeflow SDP

Delta prend en charge le flux de données de modification (CDF) et table_change peut interroger les modifications de table dans SQL et Python. Toutefois, le cas d’usage principal du CDF consiste à capturer les modifications dans un pipeline, et non à créer une vue complète des modifications de table à partir du début.

Les choses sont particulièrement complexes à implémenter si vous avez des événements hors commande. Si vous devez séquencer vos modifications par horodatage et recevoir une modification qui s’est produite dans le passé, vous devez ajouter une nouvelle entrée dans votre table SCD et mettre à jour les entrées précédentes.

Lakeflow SDP supprime cette complexité et vous permet de créer une table distincte qui contient toutes les modifications du début du temps. Cette table peut ensuite être utilisée à grande échelle, avec des partitions spécifiques ou des colonnes ZORDER si nécessaire. Les champs hors séquence sont gérés nativement en fonction du _sequence_by.

Pour créer une table SCD2, utilisez l’option STORED AS SCD TYPE 2 sql ou stored_as_scd_type="2" Python.

Note

Vous pouvez également limiter les colonnes que cette fonctionnalité suit en utilisant l’option : TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}

  1. Dans la barre latérale du navigateur des ressources de pipeline, cliquez sur l’icône Plus.Ajouter et transformer.

  2. Entrez un nom et choisissez un langage (Python ou SQL) pour le nouveau fichier de code source.

  3. Copiez et collez le code suivant dans le nouveau fichier.

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    # create the table
    dp.create_streaming_table(
        name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
    )
    
    # store all changes as SCD2
    dp.create_auto_cdc_flow(
        target="customers_history",
        source="customers_cdc_clean",
        keys=["id"],
        sequence_by=col("operation_date"),
        ignore_null_updates=False,
        apply_as_deletes=expr("operation = 'DELETE'"),
        except_column_list=["operation", "operation_date", "_rescued_data"],
        stored_as_scd_type="2",
    )  # Enable SCD2 and store individual updates
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_history;
    
    CREATE FLOW customers_history_cdc
    AS AUTO CDC INTO
      customers_history
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 2;
    
  4. Cliquez sur l’icône Lecture.Exécutez le fichier pour lancer une mise à jour du pipeline connecté.

Une fois la mise à jour terminée, le graphique de pipeline inclut le nouveau customers_history tableau, également dépendant du tableau de la couche argent, et le panneau inférieur affiche les détails des 4 tableaux.

Étape 7 : Créer une vue matérialisée qui suit les personnes ayant modifié leurs informations les plus

La table customers_history contient toutes les modifications historiques qu’un utilisateur a apportées à ses informations. Créez une vue matérialisée simple dans la couche d’or qui suit les personnes qui ont modifié leurs informations le plus. Cela peut être utilisé pour l’analyse de détection des fraudes ou les recommandations des utilisateurs dans un scénario réel. En outre, l’application de modifications avec SCD2 a déjà supprimé des doublons, ce qui vous permet de compter directement les lignes par ID utilisateur.

  1. Dans la barre latérale du navigateur des ressources de pipeline, cliquez sur l’icône Plus.Ajouter et transformer.

  2. Entrez un nom et choisissez un langage (Python ou SQL) pour le nouveau fichier de code source.

  3. Copiez et collez le code suivant dans le nouveau fichier source.

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    @dp.table(
      name = "customers_history_agg",
      comment = "Aggregated customer history"
    )
    def customers_history_agg():
      return (
        spark.read.table("customers_history")
          .groupBy("id")
          .agg(
              count("address").alias("address_count"),
              count("email").alias("email_count"),
              count("firstname").alias("firstname_count"),
              count("lastname").alias("lastname_count")
          )
      )
    

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
    SELECT
      id,
      count("address") as address_count,
      count("email") AS email_count,
      count("firstname") AS firstname_count,
      count("lastname") AS lastname_count
    FROM customers_history
    GROUP BY id
    
  4. Cliquez sur l’icône Lecture.Exécutez le fichier pour lancer une mise à jour du pipeline connecté.

Une fois la mise à jour terminée, il existe une nouvelle table dans le graphique de pipeline qui dépend de la customers_history table et vous pouvez l’afficher dans le volet inférieur. Votre pipeline est maintenant terminé. Vous pouvez le tester en effectuant un pipeline d’exécution complet. Les seules étapes restantes sont de planifier la mise à jour régulière du pipeline.

Étape 8 : Créer un travail pour exécuter le pipeline ETL

Ensuite, créez un flux de travail pour automatiser les étapes d’ingestion, de traitement et d’analyse des données dans votre pipeline à l’aide d’un travail Databricks.

  1. En haut de l’éditeur, choisissez le bouton Planifier .
  2. Si la boîte de dialogue Planifications s’affiche, choisissez Ajouter une planification.
  3. La boîte de dialogue Nouvelle planification s’ouvre, où vous pouvez créer un travail pour exécuter votre pipeline selon une planification.
  4. Si vous le souhaitez, donnez un nom au travail.
  5. Par défaut, la planification est définie pour s’exécuter une fois par jour. Vous pouvez accepter cette valeur par défaut ou définir votre propre planification. Le choix d’Advanced vous donne la possibilité de définir une heure spécifique à laquelle le travail s’exécutera. La sélection d’autres options vous permet de créer des notifications lors de l’exécution du travail.
  6. Sélectionnez Créer pour appliquer les modifications et créer le travail.

Maintenant, la tâche s'exécutera quotidiennement pour maintenir votre pipeline à jour. Vous pouvez choisir de nouveau planifier pour afficher la liste des planifications. Vous pouvez gérer les planifications de votre pipeline à partir de cette boîte de dialogue, notamment l’ajout, la modification ou la suppression de planifications.

Cliquez sur le nom de la planification (ou de la tâche) pour accéder à la page de la tâche dans la liste Tâches & pipelines. À partir de là, vous pouvez afficher des détails sur les exécutions de travaux, y compris l’historique des exécutions, ou exécuter immédiatement le travail avec le bouton Exécuter maintenant .

Pour plus d’informations sur l’exécution des tâches, consultez Suivi et observabilité des tâches Lakeflow.

Ressources supplémentaires