通过


教程:构建一个地理空间管道,并使用原生空间类型

了解如何创建和部署一个管道,用于引入 GPS 数据、将坐标转换为本机空间类型,并与仓库地理围栏进行联接,从而使用 Lakeflow Spark 声明性管道(SDP)进行数据编排并通过自动数据加载工具跟踪到达。 本教程使用 Databricks 本机空间类型(GEOMETRYGEOGRAPHY)和内置空间函数(例如ST_PointST_GeomFromWKT,以及ST_Contains),以便无需外部库即可大规模运行地理空间工作流。

在本教程中,你将:

  • 在 Unity Catalog 数据卷中创建一个管道,并生成 GPS 和地理围栏示例数据。
  • 使用自动加载程序以增量方式将原始 GPS ping 引入青铜流式处理表。
  • 生成一个将纬度和经度转换为本机 GEOMETRY 点的银流式处理表。
  • 从 WKT 多边形创建仓库地理栅栏的物化视图。
  • 运行空间关联操作以生成记录仓库到达信息的表格(即设备进入特定地理围栏的信息)。

结果是奖牌式管道:铜牌(原始GPS)、银牌(几何图形点)和金牌(地理围栏和到达事件)。 有关详细信息,请参阅什么是奖牌湖屋体系结构?

要求

若要完成此教程,必须满足以下要求:

步骤 1:创建管道

创建新的 ETL 管道,并为表设置默认目录和架构。

  1. 在工作区中,单击“加号”图标,位于左上角的“新建”。

  2. 单击 ETL 管道

  3. 将管道的标题更改为 Spatial pipeline tutorial 或您首选的名称。

  4. 在标题下,选择具有写入权限的目录和架构。

    如果未在代码中指定目录或架构,则默认使用此目录和架构。 用您在此处选择的值替换以下步骤中的<catalog><schema>

  5. “高级”选项中,选择 “从空文件开始”。

  6. 为代码选择文件夹。 可以选择 “浏览 ”以选择文件夹;可以使用 Git 文件夹进行版本控制。

  7. 选择 PythonSQL 作为第一个文件的语言。 稍后可以添加其他语言的文件。

  8. 单击 “选择” 以创建管道并打开 Lakeflow 管道编辑器。

现在,你有一个具有默认目录和架构的空白管道。 接下来,创建示例 GPS 和地理围栏数据。

步骤 2:创建示例 GPS 和地理围栏数据

此步骤在数据卷中生成示例数据:原始 GPS 信号(JSON)和仓库地理围栏(带有 WKT 多边形的 JSON)。 GPS 点在与两个仓库多边形重叠的边界框中生成,因此后续步骤中的空间联接将返回到达行。 如果卷或表中已有自己的数据,则可以跳过此步骤。

  1. 在 Lakeflow 管道编辑器的资产浏览器中,单击 加号图标添加,然后选择 探索

  2. “名称Setup spatial data”设置为“,选择”Python“,并保留默认目标文件夹。

  3. 单击 “创建”

  4. 在新笔记本中,粘贴以下代码。 将 <catalog><schema> 替换为您在步骤 1 中设置的默认目录和架构。

    在笔记本中使用以下代码生成 GPS 和地理围栏数据。

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. 运行笔记本单元格(Shift + Enter)。

运行完成后,数据卷包含 gps (原始 pings)和 geofences (WKT 中的多边形)。 在下一步中,你将 GPS 数据导入铜表。

步骤 3:将 GPS 数据引入青铜流数据表

使用自动加载程序以增量方式从卷引入原始 GPS JSON,并写入青铜流式处理表。

  1. 在资产浏览器中,单击 “加号”图标。添加,然后 转换

  2. 名称gps_bronze设置为,选择 SQLPython,然后单击“创建”。

  3. 将文件内容替换为以下内容(使用与语言匹配的选项卡)。 将<catalog><schema>替换为默认目录和架构。

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. 单击“ 播放”图标。运行文件运行管道 以运行更新。

更新完成后,管道图会显示 gps_bronze 表。 接下来,添加一个将坐标转换为本机几何点的银表。

步骤 4:添加带几何点的银流式处理表

创建一个流式表,该表从青铜表读取,并且使用GEOMETRYST_Point(longitude, latitude)添加列。

  1. 在资产浏览器中,单击 “加号”图标。添加,然后 转换

  2. 名称raw_gps_silver设置为,选择 SQLPython,然后单击“创建”。

  3. 将以下代码粘贴到新文件中。

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. 单击“ 播放”图标。运行文件运行管道

管道图现在显示 gps_bronzeraw_gps_silver。 接下来,将仓库地理围栏添加为具体化视图。

步骤 5:创建仓库地理围栏黄金表

创建一个具体化视图,该视图从卷中读取地理围栏,并使用 GEOMETRY 将 WKT 列转换为 ST_GeomFromWKT 列。

  1. 在资产浏览器中,单击 “加号”图标。添加,然后 转换

  2. 名称warehouse_geofences_gold设置为,选择 SQLPython,然后单击“创建”。

  3. 粘贴以下代码。 将<catalog><schema>替换为默认目录和架构。

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. 单击“ 播放”图标。运行文件运行管道

管道现在包括地理围栏表。 接下来,添加空间连接以计算仓库收货。

步骤 6:使用空间联接创建仓库到达表

添加一个物化视图,通过 ST_Contains(boundary_geom, point_geom) 将银色 GPS 点与地理围栏联接,以确定设备何时处于仓库多边形内。

  1. 在资产浏览器中,单击 “加号”图标。添加,然后 转换

  2. 名称warehouse_arrivals设置为,选择 SQLPython,然后单击“创建”。

  3. 粘贴以下代码。

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. 单击“ 播放”图标。运行文件运行管道

更新完成后,管道图将显示所有四个数据集: gps_bronzeraw_gps_silverwarehouse_geofences_goldwarehouse_arrivals

验证空间联接

确认空间联接生成的行:Silver 表中位于地理围栏内的点显示在 warehouse_arrivals。 在笔记本或 SQL 编辑器中运行以下命令之一(使用与管道目标相同的目录和架构)。

按仓库计算到达次数(SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

应该看到 Warehouse_AWarehouse_B 的非零计数(示例 GPS 数据与这两个多边形重叠)。 用于检查的示例行:

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Python 中的相同检查(笔记本):

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

如果在warehouse_arrivals中看到行,说明ST_Contains(boundary_geom, point_geom)连接正常工作。

步骤 7:计划管道(可选)

若要使管道保持最新状态,因为新的 GPS 数据位于卷中,请创建一个作业以按计划运行管道。

  1. 在编辑器顶部,选择“ 计划 ”按钮。
  2. 如果出现“ 计划 ”对话框,请选择“ 添加计划”。
  3. (可选)为作业命名。
  4. 默认情况下,计划每天运行一次。 可以接受此项或自行设置。 选择 “高级 ”可设置特定时间; 通过更多选项 ,可以添加运行通知。
  5. 选择“ 创建 ”以应用计划。

有关作业运行的详细信息,请参阅 Lakeflow 作业的监视和可观测性

其他资源