Sdílet prostřednictvím


Kurz: Vytvoření geoprostorového kanálu s nativními prostorovými typy

Naučte se vytvářet a nasazovat kanál, který ingestuje data GPS, převádí souřadnice na nativní prostorové typy a spojuje se s geofencemi skladu ke sledování příletů pomocí Deklarativních kanálů Sparku Lakeflow (SDP) pro orchestraci dat a automatického zavaděče. Tento kurz používá nativní prostorové typy Databricks (GEOMETRY, GEOGRAPHY) a integrované prostorové funkce, jako jsou ST_Point, ST_GeomFromWKT a ST_Contains, takže můžete spouštět geoprostorové pracovní postupy ve velkém měřítku bez externích knihoven.

V tomto kurzu:

  • Vytvořte potrubí a vygenerujte ukázková data GPS a geofence v objemu Unity Catalogu.
  • Ingestování nezpracované GPS pingly postupně s Auto Loader do bronzové streamovací tabulky.
  • Vytvořte stříbrnou streamovací tabulku, která převádí zeměpisnou šířku a délku na nativní GEOMETRY bod.
  • Vytvoření materializovaného zobrazení geografických zóny skladu z polygonů WKT
  • Spusťte prostorové spojení k vytvoření tabulky příjezdů do skladu (které zařízení vstoupilo do geofencingu).

Výsledkem je kanál ve stylu medailiónu: bronzový (nezpracovaný GPS), stříbro (body jako geometrie) a zlato (geofences a příletové události). Další informace najdete v tématu Co je architektura jezera medallion?

Požadavky

K dokončení tohoto kurzu musíte splnit následující požadavky:

Krok 1: Vytvoření kanálu

Vytvořte nový kanál ETL a nastavte výchozí katalog a schéma pro tabulky.

  1. V pracovním prostoru klikněte na ikonu Plus. Nové v levém horním rohu.

  2. Klikněte na kanál ETL.

  3. Změňte název potrubí na Spatial pipeline tutorial nebo na název, který dáváte přednost.

  4. Pod názvem zvolte katalog a schéma, pro které máte oprávnění k zápisu.

    Tento katalog a schéma se ve výchozím nastavení používají, když v kódu nezadáte katalog nebo schéma. Nahraďte <catalog> a <schema> v následujících krocích hodnotami, které zde zvolíte.

  5. V rozšířených možnostech vyberte Začít s prázdným souborem.

  6. Zvolte složku pro kód. Pokud chcete vybrat složku, můžete vybrat možnost Procházet . Ke správě verzí můžete použít složku Git.

  7. Jako jazyk vašeho prvního souboru zvolte Python nebo SQL . Soubory můžete později přidat v jiném jazyce.

  8. Kliknutím na vybrat vytvoříte kanál a otevřete Editor kanálů Lakeflow.

Teď máte prázdný kanál s výchozím katalogem a schématem. Potom vytvořte ukázková data GPS a geofence.

Krok 2: Vytvoření ukázkových dat GPS a geofence

Tento krok vygeneruje ukázková data ve svazku: nezpracované gps ping (JSON) a skladové geofence (JSON s polygony WKT). GPS body jsou generovány v ohraničující box, který překrývaje dva skladové mnohoúhelníky, takže prostorové spojení v pozdějším kroku vrátí řádky příjezdu. Tento krok můžete přeskočit, pokud už máte vlastní data ve svazku nebo tabulce.

  1. V Editoru kanálů Lakeflow klikněte v prohlížeči prostředků na ikonu Plus.Přidat, a poté na Průzkum.

  2. Nastavte název na Setup spatial data, zvolte Python a ponechte výchozí cílovou složku.

  3. Klikněte na Vytvořit.

  4. Do nového poznámkového bloku vložte následující kód. Nahraďte <catalog> výchozí katalog a <schema> schéma, které jste nastavili v kroku 1.

    Pomocí následujícího kódu v poznámkovém bloku vygenerujte data GPS a geofence.

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. Spusťte buňku poznámkového bloku (Shift+ Enter).

Po dokončení spuštění obsahuje gps svazek (nezpracované příkazy ping) a geofences (mnohoúhelníky ve WKT). V dalším kroku importujete data GPS do bronzové tabulky.

Krok 3: Ingestování dat GPS do bronzové streamovací tabulky

Ingestování nezpracovaného JSON GPS ze svazku přírůstkově pomocí Auto Loaderu a zapisovat do bronzové streamovací tabulky.

  1. V prohlížeči zdrojů klikněte na ikonu Plus.Přidat, a pak Transformace.

  2. Nastavte název na gps_bronze, zvolte SQL nebo Python a klikněte na Vytvořit.

  3. Obsah souboru nahraďte následujícími daty (použijte kartu, která odpovídá vašemu jazyku). Nahraďte <catalog> a <schema> svým výchozím katalogem a schématem.

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. Klikněte na ikonu Přehrát.Spustit soubor nebo Spustit kanál pro spuštění aktualizace.

Po dokončení aktualizace se v grafu potrubí zobrazí gps_bronze tabulka. Dále přidejte stříbrnou tabulku, která převádí souřadnice na nativní bod geometrie.

Krok 4: Přidání tabulky streamování typu silver, která obsahuje body geometrie

Vytvořte streamovací tabulku, která čte z bronzové tabulky a přidá GEOMETRY sloupec pomocí ST_Point(longitude, latitude).

  1. V prohlížeči zdrojů klikněte na ikonu Plus.Přidat, a pak Transformace.

  2. Nastavte název na raw_gps_silver, zvolte SQL nebo Python a klikněte na Vytvořit.

  3. Do nového souboru vložte následující kód.

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. Klikněte na ikonu Přehrát.Spusťte soubor nebo spusťte kanál.

Graf pipeline nyní ukazuje gps_bronze a raw_gps_silver. Dále přidejte geografické zóny skladu jako materializované zobrazení.

Krok 5: Vytvoření zlaté tabulky geofences skladu

Vytvořte materializované zobrazení, které čte geofence ze svazku a převede sloupec WKT na GEOMETRY sloupec pomocí ST_GeomFromWKT.

  1. V prohlížeči zdrojů klikněte na ikonu Plus.Přidat, a pak Transformace.

  2. Nastavte název na warehouse_geofences_gold, zvolte SQL nebo Python a klikněte na Vytvořit.

  3. Vložte následující kód. Nahraďte <catalog> a <schema> svým výchozím katalogem a schématem.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. Klikněte na ikonu Přehrát.Spusťte soubor nebo spusťte kanál.

Datové potrubí nyní zahrnuje tabulku s geografickými ploty. Následně přidejte prostorové spojení k výpočtu příjezdu do skladu.

Krok 6: Vytvořte tabulku příjezdů do skladu s prostorovým spojením

Přidejte materializované zobrazení, které spojuje stříbrné GPS body s geofencingem, pomocí ST_Contains(boundary_geom, point_geom) k určení, kdy je zařízení uvnitř polygonu skladu.

  1. V prohlížeči zdrojů klikněte na ikonu Plus.Přidat, a pak Transformace.

  2. Nastavte název na warehouse_arrivals, zvolte SQL nebo Python a klikněte na Vytvořit.

  3. Vložte následující kód.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. Klikněte na ikonu Přehrát.Spusťte soubor nebo spusťte kanál.

Po dokončení aktualizace se v grafu kanálu zobrazí všechny čtyři datové sady: gps_bronze, raw_gps_silver, warehouse_geofences_golda warehouse_arrivals.

Ověření prostorového spojení

Ověřte, že prostorové spojení vytvořilo řádky: body, které spadají do geofence, ze stříbrné tabulky se objeví warehouse_arrivals. V poznámkovém bloku nebo editoru SQL spusťte jeden z následujících příkazů (použijte stejný katalog a schéma jako cíl kanálu).

Počet příjezdů podle skladu (SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

Měli byste vidět nenulové počty pro Warehouse_A a Warehouse_B (ukázková data GPS se překrývají s oběma mnohoúhelníky). Pro kontrolu ukázkových řádků:

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Stejné kontroly v Pythonu (poznámkovém bloku):

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

Pokud uvidíte řádky v warehouse_arrivals, spojení ST_Contains(boundary_geom, point_geom) funguje správně.

Krok 7: Naplánovat potrubí (volitelné)

Pokud chcete, aby byl kanál aktuální, protože nová data GPS přistane ve svazku, vytvořte úlohu pro spuštění kanálu podle plánu.

  1. V horní části editoru zvolte tlačítko Plán .
  2. Pokud se zobrazí dialogové okno Plány , zvolte Přidat plán.
  3. Volitelně můžete úlohu pojmenovat.
  4. Ve výchozím nastavení se plán spouští jednou denně. Můžete přijmout toto nebo nastavit vlastní. Volba Upřesnit umožňuje nastavit konkrétní čas; Další možnosti umožňují přidávat oznámení o spuštění.
  5. Chcete-li použít plán, vyberte Vytvořit .

Další informace o spuštěních úloh najdete v tématu Monitorování a pozorovatelnost úloh Lakeflow .

Dodatečné zdroje