學習如何建立並部署一條管線,此管線可匯入 GPS 資料、將座標轉換為原生空間類型,並與倉庫地理圍欄進行連結。使用 Lakeflow Spark 宣告式管線(SDP)進行資料管理和編排,以及利用 Auto Loader 來追蹤抵達。 本教學使用 Databricks 原生空間類型(GEOMETRY、) GEOGRAPHY及內建空間函數如 ST_Point、 ST_GeomFromWKT、 ST_Contains和 ,因此你可以在沒有外部函式庫的情況下大規模執行地理空間工作流程。
在本教學課程中,您將:
- 建立管線並在 Unity 目錄卷中產生樣本 GPS 與地理圍欄資料。
- 用自動載入器逐步將原始 GPS 訊號輸入青銅串流表。
- 建立一個銀色串流表,將緯度和經度轉換成原生
GEOMETRY點。 - 從 WKT 多邊形建立倉庫地理圍欄的具體視圖。
- 執行空間關聯運算,以產生倉庫到達的資料表(哪個設備進入哪個地理圍欄)。
結果是一條獎章式的管道:銅牌(原始GPS數據)、銀牌(幾何形狀的點數)、金牌(地理圍欄及抵達事件)。 如需詳細資訊,請參閱獎章湖屋架構的概念。
要求
您必須滿足下列需求,才能完成本教學課程:
- 登入 Azure Databricks 工作區。
- 為您的工作區啟用 Unity 目錄 。
- 如果你想使用 Serverless Lakeflow Spark 宣告式管線,請為你的帳戶啟用伺服器無運算功能。 如果沒有啟用無伺服器運算,這些步驟會用你工作空間的預設運算方式運作。
- 擁有 建立運算資源 或存取運算資源的權限。
- 具有在 目錄中建立新架構的許可權。 所需的權限為
USE CATALOG和CREATE SCHEMA。 - 具有在 現有架構中建立新磁碟區的許可權。 所需的權限為
USE SCHEMA和CREATE VOLUME。 - 使用支援 原生空間型別 與 空間函式的執行環境。
步驟 1:建立管線
建立新的 ETL 管線,並為你的資料表設定預設目錄和架構。
在你的工作區,點選位於左上角的
新增。Plus 圖示 按一下 ETL 管線。
將管線的標題變更為
Spatial pipeline tutorial或您偏好的名稱。在標題下,選擇您具有寫入許可權的目錄和結構描述。
當你在程式碼中未指定目錄或結構時,預設會使用這個目錄和結構。 在接下來的步驟中,將
<catalog>和<schema>替換為你在這裡選擇的數值。從 進階選項中,選取從 空白檔案開始。
為您的程式碼選擇一個資料夾。 你可以選擇 瀏覽 來選擇資料夾;你可以用 Git 資料夾來做版本控制。
選擇 Python 或 SQL 作為你第一個檔案的語言。 你可以之後再加其他語言的檔案。
點擊 選擇 以建立管線並開啟 Lakeflow 管線編輯器。
您現在有一個空白管道,其中包含預設目錄和結構描述。 接著,建立範例 GPS 和地理限制區資料。
步驟二:建立範例 GPS 與地理圍欄資料
此步驟會在一個範圍內產生樣本資料:原始 GPS 訊號(JSON格式)與倉庫地理範圍界線(WKT 多邊形的 JSON格式)。 GPS 點是在一個重疊兩個倉庫多邊形的邊界框中產生的,因此後續的空間連接會返回到達資料列。 如果你已經有自己的資料放在磁碟區或表格中,可以跳過這個步驟。
在 Lakeflow 管線編輯器的資產瀏覽器中,點選
添加,探索。
將 名稱 設為
Setup spatial data,選擇 Python,並保留預設的目的地資料夾。點擊 建立。
在新筆記本中,貼上以下程式碼。 替換
<catalog><schema>成你在第一步設定的預設目錄和架構。在筆記本中使用以下程式碼來產生 GPS 和地理圍欄資料。
from pyspark.sql import functions as F catalog = "<catalog>" # for example, "main" schema = "<schema>" # for example, "default" spark.sql(f"USE CATALOG `{catalog}`") spark.sql(f"USE SCHEMA `{schema}`") spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`") volume_base = f"/Volumes/{catalog}/{schema}/raw_data" # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area) gps_path = f"{volume_base}/gps" df_gps = ( spark.range(0, 5000) .repartition(10) .select( F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"), F.current_timestamp().alias("timestamp"), (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1 (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2 ) ) df_gps.write.format("json").mode("overwrite").save(gps_path) print(f"Wrote 5000 GPS rows to {gps_path}") # Geofences: two warehouse polygons (WKT) in the same region geofences_path = f"{volume_base}/geofences" geofences_data = [ ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"), ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"), ] df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"]) df_geo.write.format("json").mode("overwrite").save(geofences_path) print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")執行筆記本儲存格(Shift + Enter)。
執行完成後,該卷包含 gps (原始聲納訊號)和 geofences (WKT格式的多邊形)。 下一步,你會將 GPS 資料匯入青銅桌。
步驟 3:將 GPS 資料匯入青銅串流表
用 Auto Loader 逐步從卷中擷取原始 GPS JSON,然後寫入 Bronze 串流表。
在資產瀏覽器中,點選
新增,然後選擇 轉換。
將 名稱 設為
gps_bronze,選擇 SQL 或 Python,然後點 選建立。請用以下格式替換檔案內容(使用與你語言相符的分頁)。 替換
<catalog>和<schema>為預設目錄和架構。SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze COMMENT "Raw GPS pings ingested from volume using Auto Loader"; CREATE FLOW gps_bronze_ingest_flow AS INSERT INTO gps_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/<catalog>/<schema>/raw_data/gps", format => "json", inferColumnTypes => "true" )Python
from pyspark import pipelines as dp path = "/Volumes/<catalog>/<schema>/raw_data/gps" dp.create_streaming_table( name="gps_bronze", comment="Raw GPS pings ingested from volume using Auto Loader", ) @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow") def gps_bronze_ingest_flow(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(path) )點擊
執行檔案 或 執行管線 來執行更新。
更新完成後,管線圖會顯示該 gps_bronze 表格。 接著,新增一個銀色表格,將座標轉換成原生幾何點。
步驟 4:加入一個帶有幾何點的銀色串流表
建立一個串流表,從青銅表讀取,然後使用 GEOMETRY 加入 ST_Point(longitude, latitude) 欄位。
在資產瀏覽器中,點選
新增,然後選擇 轉換。
將 名稱 設為
raw_gps_silver,選擇 SQL 或 Python,然後點 選建立。將下列程式代碼貼到新檔案中。
SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver COMMENT "GPS pings with native geometry point for spatial joins"; CREATE FLOW raw_gps_silver_flow AS INSERT INTO raw_gps_silver BY NAME SELECT device_id, timestamp, longitude, latitude, ST_Point(longitude, latitude) AS point_geom FROM STREAM(gps_bronze)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F dp.create_streaming_table( name="raw_gps_silver", comment="GPS pings with native geometry point for spatial joins", ) @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow") def raw_gps_silver_flow(): return ( spark.readStream.table("gps_bronze") .select( "device_id", "timestamp", "longitude", "latitude", F.expr("ST_Point(longitude, latitude)").alias("point_geom"), ) )點擊
執行檔案 或 執行管線。
管線圖現在顯示 gps_bronze 和 raw_gps_silver。 接著,把倉庫地理圍欄新增為具體化檢視。
步驟五:建立倉庫地理圍欄黃金資料表
建立一個實體化檢視,從體中讀取地理圍欄,並將 WKT 欄位轉換為 GEOMETRY 欄位,使用 ST_GeomFromWKT。
在資產瀏覽器中,點選
新增,然後 轉換。
將 名稱 設為
warehouse_geofences_gold,選擇 SQL 或 Python,然後點 選建立。貼上下列程式碼。 替換
<catalog>和<schema>為預設目錄和架構。SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS SELECT warehouse_name, ST_GeomFromWKT(boundary_wkt) AS boundary_geom FROM read_files( "/Volumes/<catalog>/<schema>/raw_data/geofences", format => "json" )Python
from pyspark import pipelines as dp from pyspark.sql import functions as F path = "/Volumes/<catalog>/<schema>/raw_data/geofences" @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry") def warehouse_geofences_gold(): return ( spark.read.format("json").load(path).select( "warehouse_name", F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"), ) )點擊
執行檔案 或 執行管線。
管線現在包含地理圍欄表。 接著,加入空間連接以計算倉庫到達人數。
步驟 6:建立帶有空間連接的倉庫到達表
新增一個實體化視圖,將銀色 GPS 點與地理圍欄 ST_Contains(boundary_geom, point_geom) 連結起來,用以判斷裝置是否在倉庫多邊形內。
在資產瀏覽器中,點選
新增,然後選擇 轉換。
將 名稱 設為
warehouse_arrivals,選擇 SQL 或 Python,然後點 選建立。貼上下列程式碼。
SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS SELECT g.device_id, g.timestamp, w.warehouse_name FROM raw_gps_silver g JOIN warehouse_geofences_gold w ON ST_Contains(w.boundary_geom, g.point_geom)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence") def warehouse_arrivals(): g = spark.read.table("raw_gps_silver") w = spark.read.table("warehouse_geofences_gold") return ( g.alias("g") .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)")) .select( F.col("g.device_id").alias("device_id"), F.col("g.timestamp").alias("timestamp"), F.col("w.warehouse_name").alias("warehouse_name"), ) )點擊
執行檔案 或 執行管線。
更新完成後,管線圖會顯示所有四個資料集:gps_bronze、 raw_gps_silverwarehouse_geofences_goldwarehouse_arrivals和 。
驗證空間連接
確認空間連接產生了列:銀色表格中落入地理圍欄內的點出現在 warehouse_arrivals。 在筆記本或 SQL 編輯器中執行以下其中一種(使用與你的管線目標相同的目錄和架構)。
按倉庫清點到達數(SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
你應該會看到 Warehouse_A 和 Warehouse_B 的計數都不是零(因為樣本 GPS 資料會同時覆蓋兩個多邊形)。 檢查樣本行:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
Python (筆記本)中的檢查方式相同:
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
如果你看到行在 warehouse_arrivals,ST_Contains(boundary_geom, point_geom) 聯結正常運作。
步驟7:排程流程(可選)
為了讓資料處理流程在新 GPS 資料到達時保持最新,請創建一個工作,以排程方式執行資料處理流程。
- 在編輯器頂端,選擇 Schedule (排程 ) 按鈕。
- 如果出現 Schedules (排程) 對話方塊,請選擇 Add schedule (新增排程)。
- 可以選擇為任務命名。
- 預設情況下,排程每天運行一次。 你可以接受這個,也可以自己設定。 選擇 進階 可以設定特定時間; 更多選項 可以讓你新增執行通知。
- 選擇 建立 以套用排程。
如需作業執行的詳細資訊,請參閱 Lakeflow 作業的監視和可觀察性 。