Dela via


Självstudie: Skapa en geospatial pipeline med inbyggda rumsliga typer

Lär dig hur du skapar och distribuerar en pipeline som matar in GPS-data, konverterar koordinater till ursprungliga rumsliga typer och jämför med lagergeofences för att spåra ankomster med Lakeflow Spark Deklarativa Pipelines (SDP) för dataorkestrering och Auto Loader. I den här handledningen används Databricks inbyggda rumsliga datatyper (GEOMETRY, GEOGRAPHY) och inbyggda rumsliga funktioner som ST_Point, ST_GeomFromWKT och ST_Contains, så att du kan köra geospatiala arbetsflöden i stor skala utan externa bibliotek.

I den här handledningen kommer du att:

  • Skapa en pipeline och generera exempel på GPS- och geofence-data i en Unity Catalog-volym.
  • Ingestera rå GPS-signaler inkrementellt med hjälp av Auto Loader till en bronsströmningstabell.
  • Skapa en silver-strömningstabell som konverterar latitud och longitud till en inbyggd GEOMETRY punkt.
  • Skapa en materialiserad vy av lagergeofences från WKT-polygoner.
  • Kör en rumslig koppling för att skapa en tabell över ankomster till lagret (vilken enhet som gick in i vilken geofence).

Resultatet är en pipeline i medaljstil: brons (rådata från GPS), silver (punkter som geometri) och guld (geofences och ankomsthändelser). För mer information, se Vad innebär arkitekturen med medallion lakehouse?

Requirements

För att slutföra den här självstudien måste du uppfylla följande krav:

Steg 1: Skapa en pipeline

Skapa en ny ETL-pipeline och ange standardkatalogen och schemat för dina tabeller.

  1. På din arbetsyta klickar du på plusikonen. Nytt i det övre vänstra hörnet.

  2. Klicka på ETL-pipeline.

  3. Ändra rubriken på pipelinen till Spatial pipeline tutorial eller ett namn som du föredrar.

  4. Under rubriken väljer du en katalog och ett schema som du har skrivbehörighet för.

    Den här katalogen och schemat används som standard när du inte anger någon katalog eller ett schema i koden. Ersätt <catalog> och <schema> i följande steg med de värden du väljer här.

  5. Från Avancerade alternativ väljer du Starta med en tom fil.

  6. Välj en mapp för koden. Du kan välja Bläddra för att välja en mapp. du kan använda en Git-mapp för versionskontroll.

  7. Välj Python eller SQL som språk för din första fil. Du kan lägga till filer på det andra språket senare.

  8. Klicka på Välj för att skapa pipelinen och öppna Lakeflow Pipelines Editor.

Nu har du en tom pipeline med en standardkatalog och ett schema. Skapa sedan exempeldata för GPS och geofence.

Steg 2: Skapa exempeldata för GPS och geofence

Det här steget genererar exempeldata i en datamängd: råa GPS-pingar (JSON) och lagergeofences (JSON med WKT-polygoner). GPS-punkterna genereras i en avgränsningsruta som överlappar de två lagerpolygonerna, så den rumsliga kopplingen i ett senare steg returnerar ankomstrader. Du kan hoppa över det här steget om du redan har egna data i en volym eller tabell.

  1. Klicka på Plus-ikonen i tillgångsläsaren i Lakeflow Pipelines-redigeraren.Lägg till och sedan Utforskning.

  2. Ange Namn till Setup spatial data, välj Python och lämna standardmålmappen.

  3. Klicka på Skapa.

  4. I den nya notebook-filen klistrar du in följande kod. Ersätt <catalog> och <schema> med standardkatalogen och schemat som du angav i steg 1.

    Använd följande kod i notebook-filen för att generera GPS- och geofence-data.

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. Kör cellen i anteckningsboken (Shift + Enter).

När körningen är klar innehåller volymen gps (råa pingar) och geofences (polygoner i WKT). I nästa steg matar du in GPS-data i en bronstabell.

Steg 3: Läs in GPS-data i en bronsströmningstabell

Mata in den råa GPS JSON-filen från volymen stegvis med hjälp av Auto Loader och skriv till en bronsuppspelningstabell.

  1. Klicka på Plus-ikonen i tillgångsläsaren.Lägg till och sedan Transformering.

  2. Ange Namn till gps_bronze, välj SQL eller Python och klicka på Skapa.

  3. Ersätt filinnehållet med följande (använd fliken som matchar ditt språk). Ersätt <catalog> och <schema> med standardkatalogen och schemat.

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. Klicka på Spel-ikonen.Kör filen eller Kör pipelinen för att köra en uppdatering.

När uppdateringen är klar visar gps_bronze pipelinediagrammet tabellen. Lägg sedan till en silvertabell som konverterar koordinater till en intern geometripunkt.

Steg 4: Lägg till en silverströmnings-tabell med geometripunkter

Skapa en strömmande tabell som läser från bronstabellen och lägger till en GEOMETRY kolumn med hjälp av ST_Point(longitude, latitude).

  1. Klicka på Plus-ikonen i tillgångsläsaren.Lägg till och sedan Transformering.

  2. Ange Namn till raw_gps_silver, välj SQL eller Python och klicka på Skapa.

  3. Klistra in följande kod i den nya filen.

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. Klicka på Spelikon.Starta fil eller Starta pipeline.

Pipelinediagrammet visar nu gps_bronze och raw_gps_silver. Lägg sedan till lagergeofences som en materialiserad vy.

Steg 5: Skapa guldtabellen för lagergeofences

Skapa en materialiserad vy som läser geofences från volymen och konverterar WKT-kolumnen till en GEOMETRY kolumn med .ST_GeomFromWKT

  1. Klicka på Plus-ikonen i tillgångsläsaren.Lägg till och sedan Transformering.

  2. Ange Namn till warehouse_geofences_gold, välj SQL eller Python och klicka på Skapa.

  3. Klistra in följande kod. Ersätt <catalog> och <schema> med standardkatalogen och schemat.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. Klicka på Spela upp-ikonen.Kör fil eller Kör pipeline.

Pipelinen innehåller nu geofences-tabellen. Lägg sedan till den rumsliga kopplingen för att beräkna ankomster till lager.

Steg 6: Skapa lagerankomsttabellen med en geospatial sammanfogning

Lägg till en materialiserad vy som kopplar de silverfärgade GPS-punkterna till geofences med hjälp av ST_Contains(boundary_geom, point_geom) för att avgöra när en enhet finns i en lagerpolygon.

  1. Klicka på Plus-ikonen i tillgångsläsaren.Lägg till och sedan Transformering.

  2. Ange Namn till warehouse_arrivals, välj SQL eller Python och klicka på Skapa.

  3. Klistra in följande kod.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. Klicka på Spela upp-ikonen.Kör fil eller Kör pipeline.

När uppdateringen är klar visar pipelinediagrammet alla fyra datauppsättningarna: gps_bronze, raw_gps_silver, warehouse_geofences_goldoch warehouse_arrivals.

Kontrollera den rumsliga kopplingen

Bekräfta att den rumsliga kopplingen skapade rader: punkter från silvertabellen som faller inuti en geofence visas i warehouse_arrivals. Kör något av följande i en notebook- eller SQL-redigerare (använd samma katalog och schema som pipelinemålet).

Räkna ankomster per lager (SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

Du bör se antal som inte är noll för Warehouse_A och Warehouse_B (exempel-GPS-data överlappar båda polygonerna). Så här inspekterar du exempelrader:

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Samma kontroller i Python (notebook-fil):

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

Om du ser rader i warehouse_arrivalsST_Contains(boundary_geom, point_geom) fungerar kopplingen korrekt.

Steg 7: Schemalägg pipelinen (valfritt)

Om du vill hålla pipelinen uppdaterad när nya GPS-data hamnar i volymen skapar du ett jobb för att köra pipelinen enligt ett schema.

  1. Välj knappen Schema överst i redigeraren.
  2. Om dialogrutan Scheman visas väljer du Lägg till schema.
  3. Du kan också ge jobbet ett namn.
  4. Som standard körs schemat en gång per dag. Du kan acceptera detta eller ange ditt eget. Genom att välja Avancerat kan du ange en viss tid. Med fler alternativ kan du lägga till körningsmeddelanden.
  5. Välj Skapa för att tillämpa schemat.

För mer information om jobbkörningar, se Övervakning och observerbarhet för Lakeflow-jobb.

Ytterligare resurser