共用方式為


教學:建立帶有原生空間類型的地理空間管線

學習如何建立並部署一條管線,此管線可匯入 GPS 資料、將座標轉換為原生空間類型,並與倉庫地理圍欄進行連結。使用 Lakeflow Spark 宣告式管線(SDP)進行資料管理和編排,以及利用 Auto Loader 來追蹤抵達。 本教學使用 Databricks 原生空間類型(GEOMETRY、) GEOGRAPHY及內建空間函數如 ST_PointST_GeomFromWKTST_Contains和 ,因此你可以在沒有外部函式庫的情況下大規模執行地理空間工作流程。

在本教學課程中,您將:

  • 建立管線並在 Unity 目錄卷中產生樣本 GPS 與地理圍欄資料。
  • 用自動載入器逐步將原始 GPS 訊號輸入青銅串流表。
  • 建立一個銀色串流表,將緯度和經度轉換成原生 GEOMETRY 點。
  • 從 WKT 多邊形建立倉庫地理圍欄的具體視圖。
  • 執行空間關聯運算,以產生倉庫到達的資料表(哪個設備進入哪個地理圍欄)。

結果是一條獎章式的管道:銅牌(原始GPS數據)、銀牌(幾何形狀的點數)、金牌(地理圍欄及抵達事件)。 如需詳細資訊,請參閱獎章湖屋架構的概念

要求

您必須滿足下列需求,才能完成本教學課程:

步驟 1:建立管線

建立新的 ETL 管線,並為你的資料表設定預設目錄和架構。

  1. 在你的工作區,點選位於左上角的Plus 圖示新增。

  2. 按一下 ETL 管線

  3. 將管線的標題變更為 Spatial pipeline tutorial 或您偏好的名稱。

  4. 在標題下,選擇您具有寫入許可權的目錄和結構描述。

    當你在程式碼中未指定目錄或結構時,預設會使用這個目錄和結構。 在接下來的步驟中,將<catalog><schema>替換為你在這裡選擇的數值。

  5. 進階選項中,選取從 空白檔案開始

  6. 為您的程式碼選擇一個資料夾。 你可以選擇 瀏覽 來選擇資料夾;你可以用 Git 資料夾來做版本控制。

  7. 選擇 PythonSQL 作為你第一個檔案的語言。 你可以之後再加其他語言的檔案。

  8. 點擊 選擇 以建立管線並開啟 Lakeflow 管線編輯器。

您現在有一個空白管道,其中包含預設目錄和結構描述。 接著,建立範例 GPS 和地理限制區資料。

步驟二:建立範例 GPS 與地理圍欄資料

此步驟會在一個範圍內產生樣本資料:原始 GPS 訊號(JSON格式)與倉庫地理範圍界線(WKT 多邊形的 JSON格式)。 GPS 點是在一個重疊兩個倉庫多邊形的邊界框中產生的,因此後續的空間連接會返回到達資料列。 如果你已經有自己的資料放在磁碟區或表格中,可以跳過這個步驟。

  1. 在 Lakeflow 管線編輯器的資產瀏覽器中,點選 Plus 圖示。添加,探索

  2. 名稱 設為 Setup spatial data,選擇 Python,並保留預設的目的地資料夾。

  3. 點擊 建立

  4. 在新筆記本中,貼上以下程式碼。 替換<catalog><schema>成你在第一步設定的預設目錄和架構。

    在筆記本中使用以下程式碼來產生 GPS 和地理圍欄資料。

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. 執行筆記本儲存格(Shift + Enter)。

執行完成後,該卷包含 gps (原始聲納訊號)和 geofences (WKT格式的多邊形)。 下一步,你會將 GPS 資料匯入青銅桌。

步驟 3:將 GPS 資料匯入青銅串流表

用 Auto Loader 逐步從卷中擷取原始 GPS JSON,然後寫入 Bronze 串流表。

  1. 在資產瀏覽器中,點選 加號圖示新增,然後選擇 轉換

  2. 名稱 設為 gps_bronze,選擇 SQLPython,然後點 選建立

  3. 請用以下格式替換檔案內容(使用與你語言相符的分頁)。 替換 <catalog><schema> 為預設目錄和架構。

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. 點擊 播放圖示。執行檔案執行管線 來執行更新。

更新完成後,管線圖會顯示該 gps_bronze 表格。 接著,新增一個銀色表格,將座標轉換成原生幾何點。

步驟 4:加入一個帶有幾何點的銀色串流表

建立一個串流表,從青銅表讀取,然後使用 GEOMETRY 加入 ST_Point(longitude, latitude) 欄位。

  1. 在資產瀏覽器中,點選 加號圖示新增,然後選擇 轉換

  2. 名稱 設為 raw_gps_silver,選擇 SQLPython,然後點 選建立

  3. 將下列程式代碼貼到新檔案中。

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. 點擊 播放圖示。執行檔案執行管線

管線圖現在顯示 gps_bronzeraw_gps_silver。 接著,把倉庫地理圍欄新增為具體化檢視。

步驟五:建立倉庫地理圍欄黃金資料表

建立一個實體化檢視,從體中讀取地理圍欄,並將 WKT 欄位轉換為 GEOMETRY 欄位,使用 ST_GeomFromWKT

  1. 在資產瀏覽器中,點選 加號圖示。新增,然後 轉換

  2. 名稱 設為 warehouse_geofences_gold,選擇 SQLPython,然後點 選建立

  3. 貼上下列程式碼。 替換 <catalog><schema> 為預設目錄和架構。

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. 點擊 播放圖示。執行檔案執行管線

管線現在包含地理圍欄表。 接著,加入空間連接以計算倉庫到達人數。

步驟 6:建立帶有空間連接的倉庫到達表

新增一個實體化視圖,將銀色 GPS 點與地理圍欄 ST_Contains(boundary_geom, point_geom) 連結起來,用以判斷裝置是否在倉庫多邊形內。

  1. 在資產瀏覽器中,點選 加號圖示新增,然後選擇 轉換

  2. 名稱 設為 warehouse_arrivals,選擇 SQLPython,然後點 選建立

  3. 貼上下列程式碼。

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. 點擊 播放圖示。執行檔案執行管線

更新完成後,管線圖會顯示所有四個資料集:gps_bronzeraw_gps_silverwarehouse_geofences_goldwarehouse_arrivals和 。

驗證空間連接

確認空間連接產生了列:銀色表格中落入地理圍欄內的點出現在 warehouse_arrivals。 在筆記本或 SQL 編輯器中執行以下其中一種(使用與你的管線目標相同的目錄和架構)。

按倉庫清點到達數(SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

你應該會看到 Warehouse_AWarehouse_B 的計數都不是零(因為樣本 GPS 資料會同時覆蓋兩個多邊形)。 檢查樣本行:

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Python (筆記本)中的檢查方式相同:

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

如果你看到行在 warehouse_arrivalsST_Contains(boundary_geom, point_geom) 聯結正常運作。

步驟7:排程流程(可選)

為了讓資料處理流程在新 GPS 資料到達時保持最新,請創建一個工作,以排程方式執行資料處理流程。

  1. 在編輯器頂端,選擇 Schedule (排程 ) 按鈕。
  2. 如果出現 Schedules (排程) 對話方塊,請選擇 Add schedule (新增排程)。
  3. 可以選擇為任務命名。
  4. 預設情況下,排程每天運行一次。 你可以接受這個,也可以自己設定。 選擇 進階 可以設定特定時間; 更多選項 可以讓你新增執行通知。
  5. 選擇 建立 以套用排程。

如需作業執行的詳細資訊,請參閱 Lakeflow 作業的監視和可觀察性

其他資源