Partager via


Tutoriel : Créer un pipeline géospatial avec des types spatiaux natifs

Découvrez comment créer et déployer un pipeline qui ingère des données GPS, convertit les coordonnées en types de données spatiales natifs et associe aux géofences de l'entrepôt pour suivre les arrivées à l'aide des Lakeflow Spark Declarative Pipelines (SDP) pour l'orchestration des données et Auto Loader. Ce tutoriel utilise des types spatiaux natifs Databricks (GEOMETRY, GEOGRAPHY) et des fonctions spatiales intégrées telles que ST_Point, ST_GeomFromWKTet ST_Contains, afin de pouvoir exécuter des flux de travail géospatiaux à grande échelle sans bibliothèques externes.

Dans ce tutoriel, vous allez :

  • Créez un pipeline et générez des exemples de données GPS et de limite géographique dans un volume de catalogue Unity.
  • Ingérer des pings GPS bruts de façon incrémentielle avec Auto Loader dans une table de streaming bronze.
  • Créez une table de streaming argentée qui convertit la latitude et la longitude en point GEOMETRY natif.
  • Créez une vue matérialisée des limites géographiques de l’entrepôt à partir de polygones WKT.
  • Exécutez une jointure spatiale pour produire une table des arrivées à l'entrepôt (quel appareil est entré dans quel géorepérage).

Le résultat est un pipeline de style médaillon : bronze (GPS brut), argent (points en géométrie) et or (géofences et événements d’arrivée). Pour plus d’informations, consultez Qu'est-ce que l'architecture Lakehouse Medallion.

Exigences

Pour suivre ce tutoriel, vous devez répondre aux exigences suivantes :

  • Connectez-vous à un espace de travail Azure Databricks.
  • Assurez-vous que le catalogue Unity est activé pour votre espace de travail.
  • Assurez la disponibilité d’un calcul serverless dans votre espace de travail si vous souhaitez utiliser des pipelines déclaratifs Spark Serverless Lakeflow (activés par défaut dans les espaces de travail avec Unity Catalog). Si le calcul serverless n’est pas disponible, les étapes fonctionnent avec le calcul par défaut de votre espace de travail.
  • Disposez de l’autorisation de créer une ressource de calcul ou d’accéder à une ressource de calcul.
  • Disposez des autorisations nécessaires pour créer un schéma dans un catalogue. Les autorisations requises sont USE CATALOG et CREATE SCHEMA.
  • Disposez des autorisations nécessaires pour créer un volume dans un schéma existant. Les autorisations requises sont USE SCHEMA et CREATE VOLUME.
  • Utilisez un runtime qui prend en charge les types spatiaux natifs et les fonctions spatiales.

Étape 1 : Créer un pipeline

Créez un pipeline ETL et définissez le catalogue et le schéma par défaut pour vos tables.

  1. Dans votre espace de travail, cliquez sur l’icône Plus. Nouveautés dans le coin supérieur gauche.

  2. Cliquez sur Pipeline ETL.

  3. Remplacez le titre du pipeline Spatial pipeline tutorial par ou par un nom que vous préférez.

  4. Sous le titre, choisissez un catalogue et un schéma pour lesquels vous disposez d’autorisations d’écriture.

    Ce catalogue et ce schéma sont utilisés par défaut lorsque vous ne spécifiez pas de catalogue ou de schéma dans votre code. Remplacez <catalog> et <schema> , dans les étapes suivantes, par les valeurs que vous choisissez ici.

  5. Dans les options avancées, sélectionnez Démarrer avec un fichier vide.

  6. Choisissez un dossier pour votre code. Vous pouvez sélectionner Parcourir pour choisir un dossier ; vous pouvez utiliser un dossier Git pour le contrôle de version.

  7. Choisissez Python ou SQL pour le langage de votre premier fichier. Vous pouvez ajouter des fichiers dans l’autre langue ultérieurement.

  8. Cliquez sur Sélectionner pour créer le pipeline et ouvrir l’éditeur de pipelines Lakeflow.

Vous disposez maintenant d’un pipeline vide avec un catalogue et un schéma par défaut. Ensuite, créez les exemples de données GPS et de limite géographique.

Étape 2 : Créer l’exemple de données GPS et de limite géographique

Cette étape génère des exemples de données dans un volume : pings GPS bruts (JSON) et géofences d’entrepôt (JSON avec polygones WKT). Les points GPS sont générés dans un cadre englobant qui chevauche les deux polygones d’entrepôt, de sorte que la jointure spatiale dans une étape ultérieure retourne les lignes d’arrivée. Vous pouvez ignorer cette étape si vous avez déjà vos propres données dans un volume ou une table.

  1. Dans l’Éditeur de pipelines Lakeflow, dans le navigateur de ressources, cliquez sur l’icône Plus.Ajoutez, puis exploration.

  2. Définissez Le nom sur Setup spatial data, choisissez Python et laissez le dossier de destination par défaut.

  3. Cliquez sur Créer.

  4. Dans le nouveau bloc-notes, collez le code suivant. Remplacez <catalog> et <schema> par le catalogue et le schéma par défaut que vous avez définis à l’étape 1.

    Utilisez le code suivant dans le notebook pour générer des données GPS et de limite géographique.

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. Exécutez la cellule du bloc-notes (Maj + Entrée).

Une fois l’exécution terminée, le volume contient gps (pings bruts) et geofences (polygones dans WKT). À l’étape suivante, vous ingérez les données GPS dans une table bronze.

Étape 3 : Ingérer des données GPS dans une table de diffusion en bronze

Ingérer le JSON brut de GPS à partir du volume de manière incrémentielle avec Auto Loader et le consigner dans une table de flux de données bronze.

  1. Dans le navigateur de ressources, cliquez sur l’icône Plus.Ajouter, puis Transformation.

  2. Définissez Nom à gps_bronze, choisissez SQL ou Python, puis cliquez sur Créer.

  3. Remplacez le contenu du fichier par ce qui suit (utilisez l’onglet correspondant à votre langue). Remplacez <catalog> et <schema> par votre catalogue et votre schéma par défaut.

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. Cliquez sur l’icône Lecture.Exécutez un fichier ou un pipeline d’exécution pour exécuter une mise à jour.

Une fois la mise à jour terminée, le schéma de pipeline affiche le gps_bronze tableau. Ensuite, ajoutez une table silver qui convertit les coordonnées en point géométrique natif.

Étape 4 : Ajouter une table de diffusion en continu de type argent avec des points de géométrie

Créez une table de diffusion en continu qui lit à partir de la table bronze et ajoute une colonne GEOMETRY à l’aide de ST_Point(longitude, latitude).

  1. Dans le navigateur de ressources, cliquez sur l’icône Plus.Ajouter, puis Transformation.

  2. Définissez Nom à raw_gps_silver, choisissez SQL ou Python, puis cliquez sur Créer.

  3. Collez le code suivant dans le nouveau fichier.

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. Cliquez sur l’icône Lecture.Exécutez un fichier ou un pipeline d’exécution.

Le graphique de pipeline affiche gps_bronze et raw_gps_silver. Ensuite, ajoutez les limites géographiques de l’entrepôt en tant qu’affichage matérialisé.

Étape 5 : Créer la table des géofences gold de l’entrepôt

Créez une vue matérialisée qui lit les limites géographiques du volume et convertit la colonne WKT en colonne GEOMETRY à l’aide de ST_GeomFromWKT.

  1. Dans le navigateur de ressources, cliquez sur l’icône Plus.Ajouter, puis Transformation.

  2. Définissez Nom à warehouse_geofences_gold, choisissez SQL ou Python, puis cliquez sur Créer.

  3. Collez le code ci-dessous. Remplacez <catalog> et <schema> par votre catalogue et votre schéma par défaut.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. Cliquez sur l’icône Lecture.Exécutez un fichier ou un pipeline d’exécution.

Le pipeline inclut désormais la table des limites géographiques. Ensuite, ajoutez la jointure spatiale pour calculer les arrivées à l'entrepôt.

Étape 6 : Créer la table d’arrivées de l’entrepôt avec une jointure spatiale

Ajoutez une vue matérialisée qui joint les points GPS argentés aux limites géographiques à l’aide ST_Contains(boundary_geom, point_geom) de déterminer quand un appareil se trouve à l’intérieur d’un polygone d’entrepôt.

  1. Dans le navigateur de ressources, cliquez sur l’icône Plus.Ajouter, puis Transformation.

  2. Définissez Nom à warehouse_arrivals, choisissez SQL ou Python, puis cliquez sur Créer.

  3. Collez le code ci-dessous.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. Cliquez sur l’icône Lecture.Exécutez un fichier ou un pipeline d’exécution.

Une fois la mise à jour terminée, le graphique de pipeline affiche les quatre jeux de données : gps_bronze, , raw_gps_silverwarehouse_geofences_goldet warehouse_arrivals.

Vérifier la jointure spatiale

Vérifiez que la jointure spatiale produit des lignes : points de la table argent qui se trouvent à l’intérieur d’une limite géographique apparaissent dans warehouse_arrivals. Exécutez l’un des éléments suivants dans un bloc-notes ou un éditeur SQL (utilisez le même catalogue et le même schéma que votre cible de pipeline).

Compter les arrivées par entrepôt (SQL) :

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

Vous devriez voir des nombres non nuls pour Warehouse_A et Warehouse_B (l’exemple de données GPS chevauche les deux polygones). Pour inspecter des exemples de lignes :

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Mêmes vérifications dans Python (notebook) :

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

Si vous voyez des lignes dans warehouse_arrivals, la ST_Contains(boundary_geom, point_geom) jointure fonctionne correctement.

Étape 7 : Planifier le pipeline (facultatif)

Pour maintenir le pipeline à jour lorsque de nouvelles données GPS atterrissent dans le volume, créez un travail pour exécuter le pipeline selon une planification.

  1. En haut de l’éditeur, choisissez le bouton Planifier .
  2. Si la boîte de dialogue Planifications s’affiche, choisissez Ajouter une planification.
  3. Si vous le souhaitez, donnez un nom au travail.
  4. Par défaut, la planification s’exécute une fois par jour. Vous pouvez accepter cela ou définir votre propre. Le choix d’Avancé vous permet de définir une heure spécifique ; D’autres options vous permettent d’ajouter des notifications d’exécution.
  5. Sélectionnez Créer pour appliquer la planification.

Pour plus d’informations sur l’exécution des tâches, consultez Suivi et observabilité des tâches Lakeflow.

Ressources additionnelles