Bagikan melalui


Tutorial: Membangun alur geospasial dengan jenis spasial bawaan

Pelajari cara membuat dan menyebarkan alur yang menyerap data GPS, mengonversi koordinat ke jenis spasial asli, dan bergabung dengan geofence gudang untuk melacak kedatangan menggunakan Lakeflow Spark Declarative Pipelines (SDP) untuk orkestrasi data dan Auto Loader. Tutorial ini menggunakan jenis spasial asli Databricks (GEOMETRY, GEOGRAPHY) dan fungsi spasial bawaan seperti ST_Point, , ST_GeomFromWKTdan ST_Contains, sehingga Anda dapat menjalankan alur kerja geospasial dalam skala besar tanpa pustaka eksternal.

Dalam tutorial ini, Anda akan:

  • Buat alur dan hasilkan contoh data GPS dan geofence dalam volume Unity Catalog.
  • Serap ping GPS mentah secara bertahap dengan Auto Loader ke dalam tabel streaming perunggu.
  • Bangun tabel streaming berwarna perak yang mengonversi garis lintang dan bujur menjadi titik asli GEOMETRY.
  • Buat tampilan materialis dari geofence gudang berdasarkan poligon WKT.
  • Jalankan gabungan spasial untuk menghasilkan tabel kedatangan ke gudang (perangkat mana yang telah memasuki geofence tertentu).

Hasilnya adalah alur bergaya medali: perunggu (GPS mentah), perak (titik sebagai geometri), dan emas (geofence dan peristiwa kedatangan). Lihat Apa arsitektur medali lakehouse? untuk informasi selengkapnya.

Persyaratan

Untuk menyelesaikan tutorial ini, Anda harus memenuhi persyaratan berikut:

Langkah 1: Membuat alur

Buat alur ETL baru dan atur katalog dan skema default untuk tabel Anda.

  1. Di ruang kerja Anda, klik ikon Plus. Baru di sudut kiri atas.

  2. Klik Pipeline ETL.

  3. Ubah judul alur menjadi Spatial pipeline tutorial atau nama yang Anda inginkan.

  4. Di bawah judul, pilih katalog dan skema di mana Anda memiliki izin menulis.

    Katalog dan skema ini digunakan secara default ketika Anda tidak menentukan katalog atau skema dalam kode Anda. Ganti <catalog> dan <schema> dalam langkah-langkah berikut dengan nilai yang Anda pilih di sini.

  5. Dari Opsi tingkat lanjut, pilih Mulai dengan file kosong.

  6. Pilih folder untuk kode Anda. Anda dapat memilih Telusuri untuk memilih folder; Anda dapat menggunakan folder Git untuk kontrol versi.

  7. Pilih Python atau SQL untuk bahasa file pertama Anda. Anda dapat menambahkan file dalam bahasa lain nanti.

  8. Klik Pilih untuk membuat alur dan membuka Editor Alur Lakeflow.

Anda sekarang memiliki jalur tanpa isi dengan katalog dan skema default. Selanjutnya, buat contoh data GPS dan geofence.

Langkah 2: Membuat contoh data GPS dan geofence

Langkah ini menghasilkan data sampel dalam volume: ping GPS mentah (JSON) dan geofence gudang (JSON dengan poligon WKT). Titik GPS dihasilkan dalam kotak pembatas yang tumpang tindih dengan dua poligon gudang, sehingga penggabungan spasial pada langkah berikutnya akan mengembalikan baris data pengiriman. Anda dapat melewati langkah ini jika Anda sudah memiliki data Anda sendiri dalam volume atau tabel.

  1. Di Editor Alur Lakeflow, di browser aset, klik ikon Plus.Tambahkan, lalu Eksplorasi.

  2. Atur Nama ke Setup spatial data, pilih Python, dan biarkan folder tujuan default.

  3. Klik Buat.

  4. Di buku catatan baru, tempelkan kode berikut. Ganti <catalog> dan <schema> dengan katalog dan skema default yang Anda tetapkan di Langkah 1.

    Gunakan kode berikut di notebook untuk menghasilkan data GPS dan geofence.

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. Jalankan sel buku catatan (Shift + Enter).

Setelah proses berjalan selesai, volume berisi gps (data ping mentah) dan geofences (poligon dalam WKT). Pada langkah berikutnya, Anda mengimpor data GPS ke tabel perunggu.

Langkah 3: Menyerap data GPS ke dalam tabel streaming perunggu

Serap JSON GPS mentah dari volume secara bertahap menggunakan "Auto Loader" dan tuliskan ke dalam tabel streaming 'bronze'.

  1. Di browser aset, klik ikon Plus.Tambahkan, lalu Transformasi.

  2. Atur Nama ke gps_bronze, pilih SQL atau Python, dan klik Buat.

  3. Ganti konten file dengan yang berikut ini (gunakan tab yang cocok dengan bahasa Anda). Ganti <catalog> dan <schema> dengan katalog dan skema default Anda.

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. Klik ikon Putar.Jalankan file atau Jalankan alur untuk menjalankan pembaruan.

Saat pembaruan selesai, grafik alur memperlihatkan tabel gps_bronze. Selanjutnya, tambahkan tabel perak yang mengonversi koordinat ke titik geometri asli.

Langkah 4: Tambahkan tabel streaming abu-abu dengan titik geometris

Buat tabel streaming yang membaca dari tabel bronze dan tambahkan kolom GEOMETRY menggunakan ST_Point(longitude, latitude).

  1. Di browser aset, klik ikon Plus.Tambahkan, lalu Transformasi.

  2. Atur Nama ke raw_gps_silver, pilih SQL atau Python, dan klik Buat.

  3. Tempelkan kode berikut ke dalam file baru.

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. Klik ikon Putar.Jalankan file atau Jalankan alur.

Grafik alur sekarang menunjukkan gps_bronze dan raw_gps_silver. Selanjutnya, tambahkan geofence gudang sebagai tampilan berwujud.

Langkah 5: Buat tabel emas geofences gudang

Buat tampilan termaterialisasi yang membaca geofence dari volume dan mengonversi kolom WKT menjadi kolom GEOMETRY menggunakan ST_GeomFromWKT.

  1. Di browser aset, klik ikon Plus.Tambahkan, lalu Transformasi.

  2. Atur Nama ke warehouse_geofences_gold, pilih SQL atau Python, dan klik Buat.

  3. Tempelkan kode berikut. Ganti <catalog> dan <schema> dengan katalog dan skema default Anda.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. Klik ikon Putar.Jalankan file atau Jalankan alur.

Alur sekarang menyertakan tabel zona geografis. Selanjutnya, tambahkan gabungan spasial untuk menghitung kedatangan di gudang.

Langkah 6: Buat tabel kedatangan gudang dengan gabungan spasial

Tambahkan tampilan materialisasi yang menggabungkan titik GPS perak ke geofences menggunakan ST_Contains(boundary_geom, point_geom) untuk menentukan kapan perangkat berada di dalam poligon gudang.

  1. Di browser aset, klik ikon Plus.Tambahkan, lalu Transformasi.

  2. Atur Nama ke warehouse_arrivals, pilih SQL atau Python, dan klik Buat.

  3. Tempelkan kode berikut.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. Klik ikon Putar.Jalankan file atau Jalankan alur.

Ketika pembaruan selesai, grafik alur menunjukkan keempat himpunan data: gps_bronze, , raw_gps_silverwarehouse_geofences_gold, dan warehouse_arrivals.

Memastikan penggabungan spasial

Konfirmasikan bahwa gabungan spasial menghasilkan baris: poin dari tabel perak yang berada di dalam geofence muncul di warehouse_arrivals. Jalankan salah satu hal berikut ini di notebook atau editor SQL (gunakan katalog dan skema yang sama dengan target alur Anda).

Hitung kedatangan menurut gudang (SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

Anda akan melihat jumlah non-nol untuk Warehouse_A dan Warehouse_B (data GPS sampel tumpang tindih dengan kedua poligon). Untuk memeriksa baris sampel:

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Pemeriksaan yang sama di Python (buku catatan):

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

Jika Anda melihat baris di warehouse_arrivals, maka gabungan ST_Contains(boundary_geom, point_geom) berfungsi dengan baik.

Langkah 7: Jadwalkan alur (opsional)

Untuk menjaga alur tetap terbarui saat data GPS baru mendarat dalam volume, buat pekerjaan untuk menjalankan alur sesuai jadwal.

  1. Di bagian atas editor, pilih tombol Jadwalkan .
  2. Jika dialog Jadwal muncul, pilih Tambahkan jadwal.
  3. Secara opsional, beri nama pekerjaan.
  4. Secara default, jadwal berjalan sekali per hari. Anda dapat menerima ini atau mengatur sendiri. Memilih Tingkat Lanjut memungkinkan Anda mengatur waktu tertentu; Opsi lainnya memungkinkan Anda menambahkan pemberitahuan eksekusi.
  5. Pilih Buat untuk menerapkan jadwal.

Lihat Pemantauan dan observabilitas untuk Pekerjaan Lakeflow untuk informasi selengkapnya tentang pelaksanaan pekerjaan.

Sumber daya tambahan