Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Pelajari cara membuat dan menyebarkan alur yang menyerap data GPS, mengonversi koordinat ke jenis spasial asli, dan bergabung dengan geofence gudang untuk melacak kedatangan menggunakan Lakeflow Spark Declarative Pipelines (SDP) untuk orkestrasi data dan Auto Loader. Tutorial ini menggunakan jenis spasial asli Databricks (GEOMETRY, GEOGRAPHY) dan fungsi spasial bawaan seperti ST_Point, , ST_GeomFromWKTdan ST_Contains, sehingga Anda dapat menjalankan alur kerja geospasial dalam skala besar tanpa pustaka eksternal.
Dalam tutorial ini, Anda akan:
- Buat alur dan hasilkan contoh data GPS dan geofence dalam volume Unity Catalog.
- Serap ping GPS mentah secara bertahap dengan Auto Loader ke dalam tabel streaming perunggu.
- Bangun tabel streaming berwarna perak yang mengonversi garis lintang dan bujur menjadi titik asli
GEOMETRY. - Buat tampilan materialis dari geofence gudang berdasarkan poligon WKT.
- Jalankan gabungan spasial untuk menghasilkan tabel kedatangan ke gudang (perangkat mana yang telah memasuki geofence tertentu).
Hasilnya adalah alur bergaya medali: perunggu (GPS mentah), perak (titik sebagai geometri), dan emas (geofence dan peristiwa kedatangan). Lihat Apa arsitektur medali lakehouse? untuk informasi selengkapnya.
Persyaratan
Untuk menyelesaikan tutorial ini, Anda harus memenuhi persyaratan berikut:
- Masuk ke ruang kerja Azure Databricks.
- Mengaktifkan Unity Catalog untuk ruang kerja Anda.
- Memiliki komputasi tanpa server yang tersedia di ruang kerja Anda jika Anda ingin menggunakan Alur Deklaratif Lakeflow Spark Tanpa Server (diaktifkan secara default di ruang kerja dengan Katalog Unity). Jika komputasi tanpa server tidak tersedia, langkah-langkahnya berfungsi dengan komputasi default untuk ruang kerja Anda.
- Memiliki izin untuk membuat sumber daya komputasi atau akses ke sumber daya komputasi.
- Memiliki izin untuk membuat skema baru dalam katalog. Izin yang diperlukan adalah
USE CATALOGdanCREATE SCHEMA. - Memiliki izin untuk membuat volume baru dalam skema yang ada. Izin yang diperlukan adalah
USE SCHEMAdanCREATE VOLUME. - Gunakan runtime yang mendukung jenis spasial asli dan fungsi spasial.
Langkah 1: Membuat alur
Buat alur ETL baru dan atur katalog dan skema default untuk tabel Anda.
Di ruang kerja Anda, klik
Baru di sudut kiri atas.
Klik Pipeline ETL.
Ubah judul alur menjadi
Spatial pipeline tutorialatau nama yang Anda inginkan.Di bawah judul, pilih katalog dan skema di mana Anda memiliki izin menulis.
Katalog dan skema ini digunakan secara default ketika Anda tidak menentukan katalog atau skema dalam kode Anda. Ganti
<catalog>dan<schema>dalam langkah-langkah berikut dengan nilai yang Anda pilih di sini.Dari Opsi tingkat lanjut, pilih Mulai dengan file kosong.
Pilih folder untuk kode Anda. Anda dapat memilih Telusuri untuk memilih folder; Anda dapat menggunakan folder Git untuk kontrol versi.
Pilih Python atau SQL untuk bahasa file pertama Anda. Anda dapat menambahkan file dalam bahasa lain nanti.
Klik Pilih untuk membuat alur dan membuka Editor Alur Lakeflow.
Anda sekarang memiliki jalur tanpa isi dengan katalog dan skema default. Selanjutnya, buat contoh data GPS dan geofence.
Langkah 2: Membuat contoh data GPS dan geofence
Langkah ini menghasilkan data sampel dalam volume: ping GPS mentah (JSON) dan geofence gudang (JSON dengan poligon WKT). Titik GPS dihasilkan dalam kotak pembatas yang tumpang tindih dengan dua poligon gudang, sehingga penggabungan spasial pada langkah berikutnya akan mengembalikan baris data pengiriman. Anda dapat melewati langkah ini jika Anda sudah memiliki data Anda sendiri dalam volume atau tabel.
Di Editor Alur Lakeflow, di browser aset, klik
Tambahkan, lalu Eksplorasi.
Atur Nama ke
Setup spatial data, pilih Python, dan biarkan folder tujuan default.Klik Buat.
Di buku catatan baru, tempelkan kode berikut. Ganti
<catalog>dan<schema>dengan katalog dan skema default yang Anda tetapkan di Langkah 1.Gunakan kode berikut di notebook untuk menghasilkan data GPS dan geofence.
from pyspark.sql import functions as F catalog = "<catalog>" # for example, "main" schema = "<schema>" # for example, "default" spark.sql(f"USE CATALOG `{catalog}`") spark.sql(f"USE SCHEMA `{schema}`") spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`") volume_base = f"/Volumes/{catalog}/{schema}/raw_data" # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area) gps_path = f"{volume_base}/gps" df_gps = ( spark.range(0, 5000) .repartition(10) .select( F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"), F.current_timestamp().alias("timestamp"), (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1 (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2 ) ) df_gps.write.format("json").mode("overwrite").save(gps_path) print(f"Wrote 5000 GPS rows to {gps_path}") # Geofences: two warehouse polygons (WKT) in the same region geofences_path = f"{volume_base}/geofences" geofences_data = [ ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"), ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"), ] df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"]) df_geo.write.format("json").mode("overwrite").save(geofences_path) print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")Jalankan sel buku catatan (Shift + Enter).
Setelah proses berjalan selesai, volume berisi gps (data ping mentah) dan geofences (poligon dalam WKT). Pada langkah berikutnya, Anda mengimpor data GPS ke tabel perunggu.
Langkah 3: Menyerap data GPS ke dalam tabel streaming perunggu
Serap JSON GPS mentah dari volume secara bertahap menggunakan "Auto Loader" dan tuliskan ke dalam tabel streaming 'bronze'.
Di browser aset, klik
Tambahkan, lalu Transformasi.
Atur Nama ke
gps_bronze, pilih SQL atau Python, dan klik Buat.Ganti konten file dengan yang berikut ini (gunakan tab yang cocok dengan bahasa Anda). Ganti
<catalog>dan<schema>dengan katalog dan skema default Anda.SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze COMMENT "Raw GPS pings ingested from volume using Auto Loader"; CREATE FLOW gps_bronze_ingest_flow AS INSERT INTO gps_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/<catalog>/<schema>/raw_data/gps", format => "json", inferColumnTypes => "true" )Python
from pyspark import pipelines as dp path = "/Volumes/<catalog>/<schema>/raw_data/gps" dp.create_streaming_table( name="gps_bronze", comment="Raw GPS pings ingested from volume using Auto Loader", ) @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow") def gps_bronze_ingest_flow(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(path) )Klik
Jalankan file atau Jalankan alur untuk menjalankan pembaruan.
Saat pembaruan selesai, grafik alur memperlihatkan tabel gps_bronze. Selanjutnya, tambahkan tabel perak yang mengonversi koordinat ke titik geometri asli.
Langkah 4: Tambahkan tabel streaming abu-abu dengan titik geometris
Buat tabel streaming yang membaca dari tabel bronze dan tambahkan kolom GEOMETRY menggunakan ST_Point(longitude, latitude).
Di browser aset, klik
Tambahkan, lalu Transformasi.
Atur Nama ke
raw_gps_silver, pilih SQL atau Python, dan klik Buat.Tempelkan kode berikut ke dalam file baru.
SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver COMMENT "GPS pings with native geometry point for spatial joins"; CREATE FLOW raw_gps_silver_flow AS INSERT INTO raw_gps_silver BY NAME SELECT device_id, timestamp, longitude, latitude, ST_Point(longitude, latitude) AS point_geom FROM STREAM(gps_bronze)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F dp.create_streaming_table( name="raw_gps_silver", comment="GPS pings with native geometry point for spatial joins", ) @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow") def raw_gps_silver_flow(): return ( spark.readStream.table("gps_bronze") .select( "device_id", "timestamp", "longitude", "latitude", F.expr("ST_Point(longitude, latitude)").alias("point_geom"), ) )Klik
Jalankan file atau Jalankan alur.
Grafik alur sekarang menunjukkan gps_bronze dan raw_gps_silver. Selanjutnya, tambahkan geofence gudang sebagai tampilan berwujud.
Langkah 5: Buat tabel emas geofences gudang
Buat tampilan termaterialisasi yang membaca geofence dari volume dan mengonversi kolom WKT menjadi kolom GEOMETRY menggunakan ST_GeomFromWKT.
Di browser aset, klik
Tambahkan, lalu Transformasi.
Atur Nama ke
warehouse_geofences_gold, pilih SQL atau Python, dan klik Buat.Tempelkan kode berikut. Ganti
<catalog>dan<schema>dengan katalog dan skema default Anda.SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS SELECT warehouse_name, ST_GeomFromWKT(boundary_wkt) AS boundary_geom FROM read_files( "/Volumes/<catalog>/<schema>/raw_data/geofences", format => "json" )Python
from pyspark import pipelines as dp from pyspark.sql import functions as F path = "/Volumes/<catalog>/<schema>/raw_data/geofences" @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry") def warehouse_geofences_gold(): return ( spark.read.format("json").load(path).select( "warehouse_name", F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"), ) )Klik
Jalankan file atau Jalankan alur.
Alur sekarang menyertakan tabel zona geografis. Selanjutnya, tambahkan gabungan spasial untuk menghitung kedatangan di gudang.
Langkah 6: Buat tabel kedatangan gudang dengan gabungan spasial
Tambahkan tampilan materialisasi yang menggabungkan titik GPS perak ke geofences menggunakan ST_Contains(boundary_geom, point_geom) untuk menentukan kapan perangkat berada di dalam poligon gudang.
Di browser aset, klik
Tambahkan, lalu Transformasi.
Atur Nama ke
warehouse_arrivals, pilih SQL atau Python, dan klik Buat.Tempelkan kode berikut.
SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS SELECT g.device_id, g.timestamp, w.warehouse_name FROM raw_gps_silver g JOIN warehouse_geofences_gold w ON ST_Contains(w.boundary_geom, g.point_geom)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence") def warehouse_arrivals(): g = spark.read.table("raw_gps_silver") w = spark.read.table("warehouse_geofences_gold") return ( g.alias("g") .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)")) .select( F.col("g.device_id").alias("device_id"), F.col("g.timestamp").alias("timestamp"), F.col("w.warehouse_name").alias("warehouse_name"), ) )Klik
Jalankan file atau Jalankan alur.
Ketika pembaruan selesai, grafik alur menunjukkan keempat himpunan data: gps_bronze, , raw_gps_silverwarehouse_geofences_gold, dan warehouse_arrivals.
Memastikan penggabungan spasial
Konfirmasikan bahwa gabungan spasial menghasilkan baris: poin dari tabel perak yang berada di dalam geofence muncul di warehouse_arrivals. Jalankan salah satu hal berikut ini di notebook atau editor SQL (gunakan katalog dan skema yang sama dengan target alur Anda).
Hitung kedatangan menurut gudang (SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
Anda akan melihat jumlah non-nol untuk Warehouse_A dan Warehouse_B (data GPS sampel tumpang tindih dengan kedua poligon). Untuk memeriksa baris sampel:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
Pemeriksaan yang sama di Python (buku catatan):
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
Jika Anda melihat baris di warehouse_arrivals, maka gabungan ST_Contains(boundary_geom, point_geom) berfungsi dengan baik.
Langkah 7: Jadwalkan alur (opsional)
Untuk menjaga alur tetap terbarui saat data GPS baru mendarat dalam volume, buat pekerjaan untuk menjalankan alur sesuai jadwal.
- Di bagian atas editor, pilih tombol Jadwalkan .
- Jika dialog Jadwal muncul, pilih Tambahkan jadwal.
- Secara opsional, beri nama pekerjaan.
- Secara default, jadwal berjalan sekali per hari. Anda dapat menerima ini atau mengatur sendiri. Memilih Tingkat Lanjut memungkinkan Anda mengatur waktu tertentu; Opsi lainnya memungkinkan Anda menambahkan pemberitahuan eksekusi.
- Pilih Buat untuk menerapkan jadwal.
Lihat Pemantauan dan observabilitas untuk Pekerjaan Lakeflow untuk informasi selengkapnya tentang pelaksanaan pekerjaan.