Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Scopri come creare e distribuire una pipeline che inserisce dati GPS, converte le coordinate in tipi spaziali nativi ed esegue join con le geofence del magazzino per tenere traccia degli arrivi utilizzando Lakeflow Spark Declarative Pipelines (SDP) per l'orchestrazione dei dati e Auto Loader. Questa esercitazione usa i tipi spaziali nativi di Databricks (GEOMETRY, GEOGRAPHY) e le funzioni spaziali predefinite, ad ST_Pointesempio , ST_GeomFromWKTe ST_Contains, in modo da poter eseguire flussi di lavoro geospaziali su larga scala senza librerie esterne.
In questa esercitazione si eseguiranno le seguenti attività:
- Creare una pipeline e generare dati di esempio GPS e geofencing in un volume di Unity Catalog.
- Inserire ping GPS non elaborati in modo incrementale con Auto Loader in una tabella di streaming bronze.
- Creare una tabella silver streaming che converte la latitudine e la longitudine in un punto nativo
GEOMETRY. - Creare una vista materializzata dei recinti virtuali del magazzino dai poligoni WKT.
- Eseguire un join spaziale per produrre una tabella degli arrivi al magazzino (quale dispositivo è entrato in quale recinto virtuale).
Il risultato è una pipeline in stile medaglione: bronzo (GPS grezzo), argento (punti come geometria) e oro (georecinzioni e eventi di arrivo). Per ulteriori informazioni, vedere Che cos'è l'architettura del lakehouse medallion?
Requisiti
Per completare questa esercitazione, è necessario soddisfare i requisiti seguenti:
- Accedere a un'area di lavoro di Azure Databricks.
- Assicurati che Unity Catalog sia abilitato per la tua area di lavoro.
- Se si vuole usare le pipeline dichiarative di Serverless Lakeflow Spark, è necessario abilitare il calcolo serverless per l'account. Se il calcolo serverless non è abilitato, i passaggi funzionano con il calcolo predefinito per l'area di lavoro.
- Disporre dell'autorizzazione per creare una risorsa di calcolo o accedere a una risorsa di calcolo.
- Disporre delle autorizzazioni per creare un nuovo schema in un catalogo. Le autorizzazioni necessarie sono
USE CATALOGeCREATE SCHEMA. - Disporre delle autorizzazioni per creare un nuovo volume in uno schema esistente. Le autorizzazioni necessarie sono
USE SCHEMAeCREATE VOLUME. - Usare un runtime che supporta tipi spaziali nativi e funzioni spaziali.
Passaggio 1: creare una pipeline
Creare una nuova pipeline ETL e impostare il catalogo e lo schema predefiniti per le tabelle.
Nell'area di lavoro fare clic
Novità nell'angolo superiore sinistro.
Fare clic su Pipeline ETL.
Modificare il titolo della pipeline in
Spatial pipeline tutorialo in un nome a tua scelta.Sotto il titolo scegliere un catalogo e uno schema per cui si dispone delle autorizzazioni di scrittura.
Questo catalogo e schema vengono usati per impostazione predefinita quando non si specifica un catalogo o uno schema nel codice. Sostituire
<catalog>e<schema>nei passaggi seguenti con i valori scelti qui.In Opzioni avanzate selezionare Inizia con un file vuoto.
Scegliere una cartella per il codice. È possibile selezionare Sfoglia per scegliere una cartella; è possibile usare una cartella Git per il controllo della versione.
Scegliere Python o SQL per il linguaggio del primo file. È possibile aggiungere file nell'altra lingua in un secondo momento.
Fare clic su Seleziona per creare la pipeline e aprire l'Editor pipeline di Lakeflow.
È ora disponibile una pipeline vuota con un catalogo e uno schema predefiniti. Creare quindi i dati GPS e geofence di esempio.
Passaggio 2: Creare i dati di esempio per GPS e georecinto
Questo passaggio genera dati di esempio in un volume: ping GPS non elaborati (JSON) e georecinzioni del magazzino (JSON con poligoni WKT). I punti GPS vengono generati in una scatola di delimitazione che si sovrappone ai due poligoni dei magazzini, quindi il join spaziale in un passaggio successivo restituirà le righe relative all'arrivo. È possibile ignorare questo passaggio se si dispone già di dati personalizzati in un volume o in una tabella.
Nel browser asset dell'editor di Lakeflow Pipelines fare clic
Aggiungi, quindi Esplora.
Impostare Nome su
Setup spatial data, scegliere Python e lasciare la cartella di destinazione predefinita.Clicca su Crea.
Nel nuovo notebook incollare il codice seguente. Sostituire
<catalog>e<schema>con il catalogo e lo schema predefiniti impostati nel passaggio 1.Usare il codice seguente nel notebook per generare dati GPS e geofence.
from pyspark.sql import functions as F catalog = "<catalog>" # for example, "main" schema = "<schema>" # for example, "default" spark.sql(f"USE CATALOG `{catalog}`") spark.sql(f"USE SCHEMA `{schema}`") spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`") volume_base = f"/Volumes/{catalog}/{schema}/raw_data" # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area) gps_path = f"{volume_base}/gps" df_gps = ( spark.range(0, 5000) .repartition(10) .select( F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"), F.current_timestamp().alias("timestamp"), (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1 (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2 ) ) df_gps.write.format("json").mode("overwrite").save(gps_path) print(f"Wrote 5000 GPS rows to {gps_path}") # Geofences: two warehouse polygons (WKT) in the same region geofences_path = f"{volume_base}/geofences" geofences_data = [ ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"), ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"), ] df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"]) df_geo.write.format("json").mode("overwrite").save(geofences_path) print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")Esegui la cella del notebook (Maiusc + Invio).
Al termine dell'esecuzione, il volume contiene gps (ping non elaborati) e geofences (poligoni in WKT). Nel passaggio successivo si inseriscono i dati GPS in una tabella bronze.
Passaggio 3: Trasferire i dati GPS in una tabella di streaming di livello bronze
Inserire il codice JSON GPS non elaborato dal volume in modo incrementale usando Auto Loader e scrivere in una tabella di streaming bronze.
Nel browser delle risorse premi
Aggiungi, quindi Trasformazione.
Impostare Nome su
gps_bronze, scegliere SQL o Python e fare clic su Crea.Sostituire il contenuto del file con il codice seguente (usare la scheda corrispondente alla lingua in uso). Sostituire
<catalog>e<schema>con il catalogo e lo schema predefiniti.SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze COMMENT "Raw GPS pings ingested from volume using Auto Loader"; CREATE FLOW gps_bronze_ingest_flow AS INSERT INTO gps_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/<catalog>/<schema>/raw_data/gps", format => "json", inferColumnTypes => "true" )Python
from pyspark import pipelines as dp path = "/Volumes/<catalog>/<schema>/raw_data/gps" dp.create_streaming_table( name="gps_bronze", comment="Raw GPS pings ingested from volume using Auto Loader", ) @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow") def gps_bronze_ingest_flow(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(path) )Fare clic
Eseguire il file o eseguire la pipeline per eseguire un aggiornamento.
Al termine dell'aggiornamento, il grafico della pipeline mostra la tabella gps_bronze. Successivamente, aggiungere una tabella silver che converte le coordinate in un punto di geometria nativa.
Passaggio 4: Aggiungere una tabella di streaming silver con punti geometrici
Creare una tabella di streaming che legge dalla tabella bronze e aggiunge una GEOMETRY colonna usando ST_Point(longitude, latitude).
Nel browser delle risorse, fare clic
Aggiungi, quindi Trasformazione.
Impostare Nome su
raw_gps_silver, scegliere SQL o Python e fare clic su Crea.Incollare il codice seguente nel nuovo file.
SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver COMMENT "GPS pings with native geometry point for spatial joins"; CREATE FLOW raw_gps_silver_flow AS INSERT INTO raw_gps_silver BY NAME SELECT device_id, timestamp, longitude, latitude, ST_Point(longitude, latitude) AS point_geom FROM STREAM(gps_bronze)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F dp.create_streaming_table( name="raw_gps_silver", comment="GPS pings with native geometry point for spatial joins", ) @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow") def raw_gps_silver_flow(): return ( spark.readStream.table("gps_bronze") .select( "device_id", "timestamp", "longitude", "latitude", F.expr("ST_Point(longitude, latitude)").alias("point_geom"), ) )Fare clic
Esegui il file o Esegui la pipeline.
Il grafico della pipeline ora mostra gps_bronze e raw_gps_silver. Successivamente, aggiungere i recinti virtuali del magazzino come vista materializzata.
Passaggio 5: Creare la tabella gold dei recinti virtuali del magazzino
Creare una vista materializzata che legge i georecinti dal volume e converte la colonna WKT in colonna GEOMETRY usando ST_GeomFromWKT.
Nel browser delle risorse, fare clic
Aggiungi, quindi Trasformazione.
Impostare Nome su
warehouse_geofences_gold, scegliere SQL o Python e fare clic su Crea.Incollare il codice seguente. Sostituire
<catalog>e<schema>con il catalogo e lo schema predefiniti.SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS SELECT warehouse_name, ST_GeomFromWKT(boundary_wkt) AS boundary_geom FROM read_files( "/Volumes/<catalog>/<schema>/raw_data/geofences", format => "json" )Python
from pyspark import pipelines as dp from pyspark.sql import functions as F path = "/Volumes/<catalog>/<schema>/raw_data/geofences" @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry") def warehouse_geofences_gold(): return ( spark.read.format("json").load(path).select( "warehouse_name", F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"), ) )Fare clic
Esegui il file o Esegui la pipeline.
La pipeline include ora la tabella recinti virtuali. Quindi, aggiungi il join spaziale per calcolare gli arrivi al magazzino.
Passaggio 6: Creare la tabella degli arrivi del magazzino con un join spaziale
Aggiungere una vista materializzata che colleghi i punti GPS silver alle geofence utilizzando ST_Contains(boundary_geom, point_geom) per determinare quando un dispositivo si trova all'interno di un poligono del magazzino.
Nel browser delle risorse, fare clic
Aggiungi, quindi Trasformazione.
Impostare Nome su
warehouse_arrivals, scegliere SQL o Python e fare clic su Crea.Incollare il codice seguente.
SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS SELECT g.device_id, g.timestamp, w.warehouse_name FROM raw_gps_silver g JOIN warehouse_geofences_gold w ON ST_Contains(w.boundary_geom, g.point_geom)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence") def warehouse_arrivals(): g = spark.read.table("raw_gps_silver") w = spark.read.table("warehouse_geofences_gold") return ( g.alias("g") .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)")) .select( F.col("g.device_id").alias("device_id"), F.col("g.timestamp").alias("timestamp"), F.col("w.warehouse_name").alias("warehouse_name"), ) )Fare clic
Esegui il file o Esegui la pipeline.
Al termine dell'aggiornamento, il grafico della pipeline mostra tutti e quattro i set di dati: gps_bronze, raw_gps_silverwarehouse_geofences_gold, e warehouse_arrivals.
Verificare l'unione spaziale
Verificare che il join spaziale produca righe: i punti della tabella silver che rientrano in un geofence vengono visualizzati in warehouse_arrivals. Eseguire una delle operazioni seguenti in un notebook o in un editor SQL (usare lo stesso catalogo e schema della destinazione della pipeline).
Conteggio degli arrivi per magazzino (SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
Dovrebbero essere visualizzati conteggi diversi da zero per Warehouse_A e Warehouse_B (i dati GPS di esempio si sovrappongono a entrambi i poligoni). Per esaminare le righe di esempio:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
Stessi controlli in Python (notebook):
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
Se vengono visualizzate righe in warehouse_arrivals, il ST_Contains(boundary_geom, point_geom) join funziona correttamente.
Passaggio 7: Pianificare la pipeline (facoltativo)
Per mantenere aggiornata la pipeline man mano che i nuovi dati GPS arrivano nel volume, creare un processo per eseguire la pipeline in base a una pianificazione.
- Nella parte superiore dell'editor scegliere il pulsante Pianifica .
- Se viene visualizzata la finestra di dialogo Pianificazioni , scegliere Aggiungi pianificazione.
- Facoltativamente, dare un nome al job.
- Per impostazione predefinita, la pianificazione viene eseguita una volta al giorno. È possibile accettarlo o impostarne uno personalizzato. Scelta avanzata consente di impostare un orario specifico; Altre opzioni consentono di aggiungere notifiche di esecuzione.
- Selezionare Crea per applicare la pianificazione.
Vedere Monitoraggio e osservabilità per i processi Lakeflow per ulteriori informazioni sulle esecuzioni dei processi.