次の方法で共有


チュートリアル: ネイティブ空間型を使用して地理空間パイプラインを構築する

GPS データを取り込み、座標をネイティブな空間型に変換し、ウェアハウスのジオフェンスに対して結合することで到着を追跡するパイプラインを作成して展開する方法について、Lakeflow Spark 宣言型パイプライン (SDP) と Auto Loader を用いたデータオーケストレーションの手法を学びます。 このチュートリアルでは、Databricks ネイティブ空間型 (GEOMETRYGEOGRAPHY) と、 ST_PointST_GeomFromWKTST_Containsなどの組み込みの空間関数を使用するため、外部ライブラリなしで大規模に地理空間ワークフローを実行できます。

このチュートリアルでは、次のことを行います。

  • パイプラインを作成し、Unity カタログ ボリュームでサンプル GPS およびジオフェンス データを生成します。
  • 自動ローダーを使用して生の GPS ping をブロンズ ストリーミング テーブルにインクリメンタルに取り込みます。
  • 緯度と経度をネイティブ GEOMETRY ポイントに変換するシルバー ストリーミング テーブルを作成します。
  • WKT ポリゴンから倉庫ジオフェンスの具体化されたビューを作成します。
  • 空間結合を実行して、倉庫到着 (どのデバイスがどのジオフェンスに入ったか) のテーブルを生成します。

その結果、メダリオンスタイルのパイプラインとして、ブロンズ (生の GPS)、シルバー (ジオメトリとしてのポイント)、ゴールド (ジオフェンスと到着イベント) が生成されます。 詳細については、「 medallion lakehouse のアーキテクチャとは」 を参照してください。

必要条件

このチュートリアルを完了するには、次の要件を満たす必要があります。

手順 1: パイプラインを作成する

新しい ETL パイプラインを作成し、テーブルの既定のカタログとスキーマを設定します。

  1. ワークスペースの左上隅で [プラス] アイコン をクリックして新規作成します。

  2. [ ETL パイプライン] をクリックします。

  3. パイプラインのタイトルを Spatial pipeline tutorial または希望する名前に変更します。

  4. タイトルの下で、書き込みアクセス許可があるカタログとスキーマを選択します。

    このカタログとスキーマは、コードでカタログまたはスキーマを指定しない場合に既定で使用されます。 次の手順の <catalog><schema> を、ここで選択した値に置き換えます。

  5. [詳細設定] オプションで、[空のファイルで開始] を選択します。

  6. コードのフォルダーを選択します。 [参照] を選択してフォルダーを選択できます。バージョン管理には Git フォルダーを使用できます。

  7. 最初のファイルの言語として Python または SQL を選択します。 後で他の言語でファイルを追加できます。

  8. [ 選択 ] をクリックしてパイプラインを作成し、Lakeflow パイプライン エディターを開きます。

これで、既定のカタログとスキーマを含む空のパイプラインが作成されました。 次に、GPS とジオフェンスのサンプル データを作成します。

手順 2: GPS とジオフェンスのサンプル データを作成する

この手順では、生の GPS ping (JSON) とウェアハウスジオフェンス (WKT ポリゴンを含む JSON) のサンプル データをボリュームに生成します。 GPS ポイントは、2 つの倉庫ポリゴンと重なる境界ボックスで生成されるため、後の手順で空間結合を行うと到着行が返されます。 ボリュームまたはテーブルに独自のデータが既にある場合は、この手順をスキップできます。

  1. Lakeflow Pipelines エディターのアセット ブラウザーで、Plus iconをクリックし、追加、次に探索を選択します。

  2. [名前] を [Setup spatial data] に設定し、[Python] を選択し、既定の保存先フォルダーのままにします。

  3. Create をクリックしてください。

  4. 新しいノートブックに、次のコードを貼り付けます。 <catalog><schema>を、手順 1 で設定した既定のカタログとスキーマに置き換えます。

    ノートブックで次のコードを使用して、GPS データとジオフェンス データを生成します。

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. ノートブック のセルを実行します (Shift + Enter キーを押します)。

実行が完了すると、ボリュームに gps (生 ping) と geofences (WKT の多角形) が含まれます。 次の手順では、GPS データをブロンズ テーブルに取り込みます。

手順 3: ブロンズ ストリーミング テーブルに GPS データを取り込む

自動ローダーを使用してボリュームから生の GPS JSON を増分的に取り込み、ブロンズ ストリーミング テーブルに書き込みます。

  1. 資産ブラウザーで、[プラス] アイコンをクリックし、次に[追加]、それから[変換]の順に選択します。

  2. [名前] を [gps_bronzeに設定し、SQL または Python を選択して、[作成] をクリックします。

  3. ファイルの内容を次のように置き換えます (言語に一致するタブを使用してください)。 <catalog><schema>を既定のカタログとスキーマに置き換えます。

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. [ 再生] アイコンをクリックします。ファイルを実行 するか 、パイプラインを実行 して更新プログラムを実行します。

更新が完了すると、パイプライン グラフに gps_bronze テーブルが表示されます。 次に、座標をネイティブ ジオメトリ ポイントに変換するシルバー テーブルを追加します。

手順 4: ジオメトリ ポイントを含むシルバー ストリーミング テーブルを追加する

ブロンズ テーブルから読み取り、GEOMETRYを使用してST_Point(longitude, latitude)列を追加するストリーミング テーブルを作成します。

  1. 資産ブラウザーで、[プラス] アイコンをクリックし、次に[追加]、それから[変換]の順に選択します。

  2. [名前] を [raw_gps_silverに設定し、SQL または Python を選択して、[作成] をクリックします。

  3. 次のコードを新しいファイルに貼り付けます。

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. [ 再生] アイコンをクリックします。ファイルを実行 するか 、パイプラインを実行します

パイプライン グラフに gps_bronzeraw_gps_silverが表示されるようになりました。 次に、倉庫ジオフェンスを具体化されたビューとして追加します。

手順 5: 倉庫ジオフェンスのゴールド テーブルを作成する

ボリュームからジオフェンスを読み取り、GEOMETRYを使用して WKT 列をST_GeomFromWKT列に変換する具体化されたビューを作成します。

  1. 資産ブラウザーで、[プラス] アイコンをクリックします。 [追加]、[変換]の順に選択します。

  2. [名前] を [warehouse_geofences_goldに設定し、SQL または Python を選択して、[作成] をクリックします。

  3. 次のコードを貼り付けます。 <catalog><schema>を既定のカタログとスキーマに置き換えます。

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. [ 再生] アイコンをクリックします。ファイルを実行 するか 、パイプラインを実行します

パイプラインにジオフェンス テーブルが含まれるようになりました。 次に、倉庫への到着を計算するために空間結合を追加します。

手順 6: 空間結合を使用して倉庫到着テーブルを作成する

ST_Contains(boundary_geom, point_geom)を使用してシルバー GPS ポイントをジオフェンスに結合する具体化されたビューを追加して、デバイスが倉庫のポリゴン内にあるかどうかを判断します。

  1. 資産ブラウザーで、[プラス] アイコンをクリックし、次に[追加]、それから[変換]の順に選択します。

  2. [名前] を [warehouse_arrivalsに設定し、SQL または Python を選択して、[作成] をクリックします。

  3. 次のコードを貼り付けます。

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. [ 再生] アイコンをクリックします。ファイルを実行 するか 、パイプラインを実行します

更新が完了すると、パイプライン グラフには、 gps_bronzeraw_gps_silverwarehouse_geofences_goldwarehouse_arrivalsの 4 つのデータセットがすべて表示されます。

空間結合を確認する

空間結合によって行が生成されたことを確認します。ジオフェンス内にあるシルバー テーブルのポイントが warehouse_arrivalsに表示されます。 ノートブックまたは SQL エディターで次のいずれかを実行します (パイプライン ターゲットと同じカタログとスキーマを使用します)。

倉庫別の到着数 (SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

Warehouse_AWarehouse_Bの 0 以外のカウントが表示されます (サンプル GPS データは両方のポリゴンに重なります)。 サンプル行を検査するには:

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Python (ノートブック) での同じチェック:

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

warehouse_arrivalsに行が表示された場合、ST_Contains(boundary_geom, point_geom)結合は正常に動作しています。

手順 7: パイプラインをスケジュールする (省略可能)

新しい GPS データがボリュームに格納された時点でパイプラインを最新の状態に保つには、スケジュールに従ってパイプラインを実行するジョブを作成します。

  1. エディターの上部にある [ スケジュール ] ボタンを選択します。
  2. [スケジュール] ダイアログが表示されたら、[スケジュールの追加] を選択します。
  3. 必要に応じて、ジョブに名前を付けます。
  4. 既定では、スケジュールは 1 日に 1 回実行されます。 これを受け入れるか、独自に設定することができます。 [詳細設定] を選択すると、特定の時刻を設定できます。その他のオプションでは、実行通知を追加できます。
  5. [ 作成] を選択してスケジュールを適用します。

ジョブの実行の詳細については、Lakeflow ジョブの監視と可観測性を参照してください。

その他のリソース