Delen via


Zelfstudie: Een georuimtelijke pijplijn bouwen met systeemeigen ruimtelijke typen

Meer informatie over het maken en implementeren van een pijplijn die GPS-gegevens opneemt, coördinaten converteert naar systeemeigen ruimtelijke typen en samenvoegt met magazijngeofences om aankomsten bij te houden met behulp van Lakeflow Spark Declarative Pipelines (SDP) voor gegevensorkestratie en Auto Loader. In deze zelfstudie worden systeemeigen ruimtelijke typen (GEOMETRY,) GEOGRAPHYen ingebouwde ruimtelijke functies van Databricks gebruikt, zoals ST_Point, ST_GeomFromWKTen ST_Contains, zodat u georuimtelijke werkstromen op schaal kunt uitvoeren zonder externe bibliotheken.

In deze tutorial, zul je:

  • Maak een pijplijn en genereer voorbeeld-GPS- en geofence-gegevens in een Unity Catalog-volume.
  • Verwerk ruwe GPS-pings incrementeel met Auto Loader in een bruine streamingtafel.
  • Bouw een silver streaming-tabel die breedtegraad en lengtegraad converteert naar een systeemeigen GEOMETRY punt.
  • Maak een gerealiseerde weergave van magazijngeofences van WKT-veelhoeken.
  • Voer een ruimtelijke koppeling uit om een tabel te creëren van magazijnaankomsten (welk apparaat welke geofence is binnengegaan).

Het resultaat is een pijplijn in de medaillon-stijl: brons (ruwe GPS), zilver (punten als geometrische vormen) en goud (geofences en aankomstgebeurtenissen). Zie Wat is de medallion lakehouse-architectuur? voor meer informatie.

Requirements

Als u deze zelfstudie wilt voltooien, moet u aan de volgende vereisten voldoen:

Stap 1: Een pijplijn maken

Maak een nieuwe ETL-pijplijn en stel de standaardcatalogus en het standaardschema voor uw tabellen in.

  1. Klik in uw werkruimte op pluspictogram. Nieuw in de linkerbovenhoek.

  2. Klik op ETL-pijplijn.

  3. Wijzig de titel van de pijplijn in Spatial pipeline tutorial of een naam die u wilt gebruiken.

  4. Kies onder de titel een catalogus en schema waarvoor u schrijfmachtigingen hebt.

    Deze catalogus en dit schema worden standaard gebruikt wanneer u geen catalogus of schema opgeeft in uw code. Vervang <catalog> en <schema> door de waarden die u hier kiest en voer vervolgens de volgende stappen uit.

  5. Selecteer Beginnen met een leeg bestand in Geavanceerde opties.

  6. Kies een map voor uw code. U kunt Bladeren selecteren om een map te kiezen; u kunt een Git-map gebruiken voor versiebeheer.

  7. Kies Python of SQL voor de taal van uw eerste bestand. U kunt later bestanden toevoegen in de andere taal.

  8. Klik op Selecteren om de pijplijn te maken en open de Lakeflow Pipelines Editor.

U hebt nu een lege pijplijn met een standaardcatalogus en schema. Maak vervolgens de voorbeeld-GPS- en geofence-gegevens.

Stap 2: De voorbeeld-GPS- en geofence-gegevens maken

Met deze stap worden voorbeeldgegevens gegenereerd in een volume: onbewerkte GPS-pings (JSON) en warehouse geofences (JSON met WKT-polygonen). De GPS-punten worden gegenereerd in een beperkingsgebied dat de twee magazijnpolygonen overlapt, opdat de ruimtelijke koppeling in een latere stap aankomstrijen retourneert. U kunt deze stap overslaan als u al uw eigen gegevens in een volume of tabel hebt.

  1. Klik in de Lakeflow Pipelines-editor in de asset browser op Plus-icoon.Toevoegen, en dan Verkenning.

  2. Stel Name in Setup spatial data, kies Python en laat de standaardbestemmingsmap staan.

  3. Klik op Create.

  4. Plak de volgende code in het nieuwe notebook. Vervang <catalog> en <schema> door de standaardcatalogus en het standaardschema dat u in stap 1 hebt ingesteld.

    Gebruik de volgende code in het notebook om GPS- en geofence-gegevens te genereren.

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. Het cellenblok van de notebook uitvoeren (Shift + Enter).

Nadat de uitvoering is voltooid, bevat het volume gps (onbewerkte pings) en geofences (veelhoeken in WKT). In de volgende stap neemt u de GPS-gegevens op in een bronzen tabel.

Stap 3: GPS-gegevens opnemen in een bronsstreamingtabel

Verwerk de onbewerkte GPS JSON van het volume incrementeel met behulp van Auto Loader en schrijf deze naar een bronzen streamingtabel.

  1. Klik in de asset browser op pluspictogram.VoegTransformatie vervolgens toe.

  2. Stel Namegps_bronze in, kies SQL of Python en klik op Creëren.

  3. Vervang de bestandsinhoud door het volgende (gebruik het tabblad dat overeenkomt met uw taal). Vervang <catalog> en <schema> door uw standaardcatalogus en schema.

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. Klik op pictogram Afspelen.Voer een bestand of pijplijn uit om een update uit te voeren.

Wanneer de update is voltooid, wordt in de pijplijngrafiek de gps_bronze tabel weergegeven. Voeg vervolgens een zilveren tabel toe waarmee coördinaten worden geconverteerd naar een systeemeigen geometriepunt.

Stap 4: Een zilveren streamingtabel met geometriepunten toevoegen

Maak een streamingtabel die wordt gelezen uit de bronstabel en voegt een GEOMETRY kolom toe met behulp van ST_Point(longitude, latitude).

  1. Klik in de asset browser op pluspictogram.VoegTransformatie vervolgens toe.

  2. Stel Nameraw_gps_silver in, kies SQL of Python en klik op Creëren.

  3. Plak de volgende code in het nieuwe bestand.

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. Klik op pictogram Afspelen.Voer een bestand of een pijplijn uit.

De pijplijngrafiek toont nu gps_bronze en raw_gps_silver. Voeg vervolgens de geofences van het magazijn toe als gerealiseerde weergave.

Stap 5: De gouden tabel met magazijngefences maken

Maak een gerealiseerde weergave die de geofences van het volume leest en de WKT-kolom converteert naar een GEOMETRY kolom met behulp van ST_GeomFromWKT.

  1. Klik in de asset browser op Pluspictogram.Voeg toe, dan Transformatie.

  2. Stel Name inwarehouse_geofences_gold, kies SQL of Python en klik op Maken.

  3. Plakt de onderstaande code. Vervang <catalog> en <schema> door uw standaardcatalogus en schema.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. Klik op pictogram Afspelen.Voer een bestand of een pijplijn uit.

De gegevensstroom omvat nu de geofences-tabel. Voeg vervolgens de spatial join toe om de aankomsten van het compute warehouse te berekenen.

Stap 6: Maak de magazijn ontvangsttabel met een ruimtelijke join

Voeg een gematerialiseerde weergave toe die de zilveren GPS-punten aan de geofences koppelt met behulp van ST_Contains(boundary_geom, point_geom) om te bepalen wanneer een apparaat zich binnen een magazijnpolygon bevindt.

  1. Klik in de asset browser op pluspictogram.VoegTransformatie vervolgens toe.

  2. Stel Namewarehouse_arrivals in, kies SQL of Python en klik op Creëren.

  3. Plakt de onderstaande code.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. Klik op pictogram Afspelen.Voer een bestand of een pijplijn uit.

Wanneer de update is voltooid, worden in de pijplijngrafiek alle vier de gegevenssets weergegeven: gps_bronze, raw_gps_silver, warehouse_geofences_golden warehouse_arrivals.

De ruimtelijke join verifiëren

Controleer of er ruimtelijke samenvoegingen zijn gemaakt: punten uit de zilveren tabel die binnen een geofence vallen, worden weergegeven in warehouse_arrivals. Voer een van de volgende handelingen uit in een notebook of SQL-editor (gebruik dezelfde catalogus en hetzelfde schema als uw pijplijndoel).

Aantal aankomsten per magazijn (SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

U zou niet-nulwaarden zien voor Warehouse_A en Warehouse_B (de voorbeeld-GPS-gegevens overlappen beide polygonen). Voorbeeldrijen inspecteren:

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Dezelfde controles in Python (notebook):

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

Als u rijen ziet in warehouse_arrivals, werkt de ST_Contains(boundary_geom, point_geom)-join correct.

Stap 7: De pijplijn plannen (optioneel)

Als u de pijplijn up-to-date wilt houden als nieuwe GPS-gegevens in het volume terechtkomen, maakt u een taak om de pijplijn volgens een schema uit te voeren.

  1. Kies boven aan de editor de knop Planning .
  2. Als het dialoogvenster Planningen wordt weergegeven, kiest u Planning toevoegen.
  3. Geef desgewenst de taak een naam.
  4. Standaard wordt het schema eenmaal per dag uitgevoerd. U kunt dit accepteren of uw eigen instellingen instellen. Als u Geavanceerd kiest, kunt u een specifieke tijd instellen; Met meer opties kunt u uitvoeringsmeldingen toevoegen.
  5. Selecteer Maken om de planning toe te passen.

Zie Bewaking en waarneembaarheid voor Lakeflow-taken voor meer informatie over taakuitvoeringen.

Aanvullende bronnen