了解如何创建和部署一个管道,用于引入 GPS 数据、将坐标转换为本机空间类型,并与仓库地理围栏进行联接,从而使用 Lakeflow Spark 声明性管道(SDP)进行数据编排并通过自动数据加载工具跟踪到达。 本教程使用 Databricks 本机空间类型(GEOMETRY、GEOGRAPHY)和内置空间函数(例如ST_PointST_GeomFromWKT,以及ST_Contains),以便无需外部库即可大规模运行地理空间工作流。
在本教程中,你将:
- 在 Unity Catalog 数据卷中创建一个管道,并生成 GPS 和地理围栏示例数据。
- 使用自动加载程序以增量方式将原始 GPS ping 引入青铜流式处理表。
- 生成一个将纬度和经度转换为本机
GEOMETRY点的银流式处理表。 - 从 WKT 多边形创建仓库地理栅栏的物化视图。
- 运行空间关联操作以生成记录仓库到达信息的表格(即设备进入特定地理围栏的信息)。
结果是奖牌式管道:铜牌(原始GPS)、银牌(几何图形点)和金牌(地理围栏和到达事件)。 有关详细信息,请参阅什么是奖牌湖屋体系结构?
要求
若要完成此教程,必须满足以下要求:
- 登录到 Azure Databricks 工作区。
- 为工作区启用 Unity 目录 。
- 如果您想使用无服务器 Lakeflow Spark 声明性管道(在具有 Unity 目录的工作区中默认启用),请确保工作区中有无 服务器计算 可用。 如果无服务器计算不可用,则这些步骤适用于工作区的默认计算。
- 有权 创建计算资源 或访问计算资源。
- 有权 在目录中创建新架构。 所需的权限是
USE CATALOG和CREATE SCHEMA。 - 有权在现有架构中创建新卷。 所需的权限是
USE SCHEMA和CREATE VOLUME。 - 使用支持本机空间类型和空间函数的运行时。
步骤 1:创建管道
创建新的 ETL 管道,并为表设置默认目录和架构。
在工作区中,单击
,位于左上角的“新建”。
单击 ETL 管道。
将管道的标题更改为
Spatial pipeline tutorial或您首选的名称。在标题下,选择具有写入权限的目录和架构。
如果未在代码中指定目录或架构,则默认使用此目录和架构。 用您在此处选择的值替换以下步骤中的
<catalog>和<schema>。在 “高级”选项中,选择 “从空文件开始”。
为代码选择文件夹。 可以选择 “浏览 ”以选择文件夹;可以使用 Git 文件夹进行版本控制。
选择 Python 或 SQL 作为第一个文件的语言。 稍后可以添加其他语言的文件。
单击 “选择” 以创建管道并打开 Lakeflow 管道编辑器。
现在,你有一个具有默认目录和架构的空白管道。 接下来,创建示例 GPS 和地理围栏数据。
步骤 2:创建示例 GPS 和地理围栏数据
此步骤在数据卷中生成示例数据:原始 GPS 信号(JSON)和仓库地理围栏(带有 WKT 多边形的 JSON)。 GPS 点在与两个仓库多边形重叠的边界框中生成,因此后续步骤中的空间联接将返回到达行。 如果卷或表中已有自己的数据,则可以跳过此步骤。
在 Lakeflow 管道编辑器的资产浏览器中,单击
、添加,然后选择 探索。
将“名称
Setup spatial data”设置为“,选择”Python“,并保留默认目标文件夹。单击 “创建” 。
在新笔记本中,粘贴以下代码。 将
<catalog>和<schema>替换为您在步骤 1 中设置的默认目录和架构。在笔记本中使用以下代码生成 GPS 和地理围栏数据。
from pyspark.sql import functions as F catalog = "<catalog>" # for example, "main" schema = "<schema>" # for example, "default" spark.sql(f"USE CATALOG `{catalog}`") spark.sql(f"USE SCHEMA `{schema}`") spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`") volume_base = f"/Volumes/{catalog}/{schema}/raw_data" # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area) gps_path = f"{volume_base}/gps" df_gps = ( spark.range(0, 5000) .repartition(10) .select( F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"), F.current_timestamp().alias("timestamp"), (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1 (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2 ) ) df_gps.write.format("json").mode("overwrite").save(gps_path) print(f"Wrote 5000 GPS rows to {gps_path}") # Geofences: two warehouse polygons (WKT) in the same region geofences_path = f"{volume_base}/geofences" geofences_data = [ ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"), ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"), ] df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"]) df_geo.write.format("json").mode("overwrite").save(geofences_path) print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")运行笔记本单元格(Shift + Enter)。
运行完成后,数据卷包含 gps (原始 pings)和 geofences (WKT 中的多边形)。 在下一步中,你将 GPS 数据导入铜表。
步骤 3:将 GPS 数据引入青铜流数据表
使用自动加载程序以增量方式从卷引入原始 GPS JSON,并写入青铜流式处理表。
在资产浏览器中,单击
添加,然后 转换。
将名称
gps_bronze设置为,选择 SQL 或 Python,然后单击“创建”。将文件内容替换为以下内容(使用与语言匹配的选项卡)。 将
<catalog>和<schema>替换为默认目录和架构。SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze COMMENT "Raw GPS pings ingested from volume using Auto Loader"; CREATE FLOW gps_bronze_ingest_flow AS INSERT INTO gps_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/<catalog>/<schema>/raw_data/gps", format => "json", inferColumnTypes => "true" )Python
from pyspark import pipelines as dp path = "/Volumes/<catalog>/<schema>/raw_data/gps" dp.create_streaming_table( name="gps_bronze", comment="Raw GPS pings ingested from volume using Auto Loader", ) @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow") def gps_bronze_ingest_flow(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(path) )单击“
运行文件 或 运行管道 以运行更新。
更新完成后,管道图会显示 gps_bronze 表。 接下来,添加一个将坐标转换为本机几何点的银表。
步骤 4:添加带几何点的银流式处理表
创建一个流式表,该表从青铜表读取,并且使用GEOMETRY和ST_Point(longitude, latitude)添加列。
在资产浏览器中,单击
添加,然后 转换。
将名称
raw_gps_silver设置为,选择 SQL 或 Python,然后单击“创建”。将以下代码粘贴到新文件中。
SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver COMMENT "GPS pings with native geometry point for spatial joins"; CREATE FLOW raw_gps_silver_flow AS INSERT INTO raw_gps_silver BY NAME SELECT device_id, timestamp, longitude, latitude, ST_Point(longitude, latitude) AS point_geom FROM STREAM(gps_bronze)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F dp.create_streaming_table( name="raw_gps_silver", comment="GPS pings with native geometry point for spatial joins", ) @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow") def raw_gps_silver_flow(): return ( spark.readStream.table("gps_bronze") .select( "device_id", "timestamp", "longitude", "latitude", F.expr("ST_Point(longitude, latitude)").alias("point_geom"), ) )单击“
运行文件 或 运行管道。
管道图现在显示 gps_bronze 和 raw_gps_silver。 接下来,将仓库地理围栏添加为具体化视图。
步骤 5:创建仓库地理围栏黄金表
创建一个具体化视图,该视图从卷中读取地理围栏,并使用 GEOMETRY 将 WKT 列转换为 ST_GeomFromWKT 列。
在资产浏览器中,单击
添加,然后 转换。
将名称
warehouse_geofences_gold设置为,选择 SQL 或 Python,然后单击“创建”。粘贴以下代码。 将
<catalog>和<schema>替换为默认目录和架构。SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS SELECT warehouse_name, ST_GeomFromWKT(boundary_wkt) AS boundary_geom FROM read_files( "/Volumes/<catalog>/<schema>/raw_data/geofences", format => "json" )Python
from pyspark import pipelines as dp from pyspark.sql import functions as F path = "/Volumes/<catalog>/<schema>/raw_data/geofences" @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry") def warehouse_geofences_gold(): return ( spark.read.format("json").load(path).select( "warehouse_name", F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"), ) )单击“
运行文件 或 运行管道。
管道现在包括地理围栏表。 接下来,添加空间连接以计算仓库收货。
步骤 6:使用空间联接创建仓库到达表
添加一个物化视图,通过 ST_Contains(boundary_geom, point_geom) 将银色 GPS 点与地理围栏联接,以确定设备何时处于仓库多边形内。
在资产浏览器中,单击
添加,然后 转换。
将名称
warehouse_arrivals设置为,选择 SQL 或 Python,然后单击“创建”。粘贴以下代码。
SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS SELECT g.device_id, g.timestamp, w.warehouse_name FROM raw_gps_silver g JOIN warehouse_geofences_gold w ON ST_Contains(w.boundary_geom, g.point_geom)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence") def warehouse_arrivals(): g = spark.read.table("raw_gps_silver") w = spark.read.table("warehouse_geofences_gold") return ( g.alias("g") .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)")) .select( F.col("g.device_id").alias("device_id"), F.col("g.timestamp").alias("timestamp"), F.col("w.warehouse_name").alias("warehouse_name"), ) )单击“
运行文件 或 运行管道。
更新完成后,管道图将显示所有四个数据集: gps_bronze、 raw_gps_silver、 warehouse_geofences_gold和 warehouse_arrivals。
验证空间联接
确认空间联接生成的行:Silver 表中位于地理围栏内的点显示在 warehouse_arrivals。 在笔记本或 SQL 编辑器中运行以下命令之一(使用与管道目标相同的目录和架构)。
按仓库计算到达次数(SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
应该看到 Warehouse_A 和 Warehouse_B 的非零计数(示例 GPS 数据与这两个多边形重叠)。 用于检查的示例行:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
Python 中的相同检查(笔记本):
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
如果在warehouse_arrivals中看到行,说明ST_Contains(boundary_geom, point_geom)连接正常工作。
步骤 7:计划管道(可选)
若要使管道保持最新状态,因为新的 GPS 数据位于卷中,请创建一个作业以按计划运行管道。
- 在编辑器顶部,选择“ 计划 ”按钮。
- 如果出现“ 计划 ”对话框,请选择“ 添加计划”。
- (可选)为作业命名。
- 默认情况下,计划每天运行一次。 可以接受此项或自行设置。 选择 “高级 ”可设置特定时间; 通过更多选项 ,可以添加运行通知。
- 选择“ 创建 ”以应用计划。
有关作业运行的详细信息,请参阅 Lakeflow 作业的监视和可观测性 。