Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Naučte se vytvářet a nasazovat kanál, který ingestuje data GPS, převádí souřadnice na nativní prostorové typy a spojuje se s geofencemi skladu ke sledování příletů pomocí Deklarativních kanálů Sparku Lakeflow (SDP) pro orchestraci dat a automatického zavaděče. Tento kurz používá nativní prostorové typy Databricks (GEOMETRY, GEOGRAPHY) a integrované prostorové funkce, jako jsou ST_Point, ST_GeomFromWKT a ST_Contains, takže můžete spouštět geoprostorové pracovní postupy ve velkém měřítku bez externích knihoven.
V tomto kurzu:
- Vytvořte potrubí a vygenerujte ukázková data GPS a geofence v objemu Unity Catalogu.
- Ingestování nezpracované GPS pingly postupně s Auto Loader do bronzové streamovací tabulky.
- Vytvořte stříbrnou streamovací tabulku, která převádí zeměpisnou šířku a délku na nativní
GEOMETRYbod. - Vytvoření materializovaného zobrazení geografických zóny skladu z polygonů WKT
- Spusťte prostorové spojení k vytvoření tabulky příjezdů do skladu (které zařízení vstoupilo do geofencingu).
Výsledkem je kanál ve stylu medailiónu: bronzový (nezpracovaný GPS), stříbro (body jako geometrie) a zlato (geofences a příletové události). Další informace najdete v tématu Co je architektura jezera medallion?
Požadavky
K dokončení tohoto kurzu musíte splnit následující požadavky:
- Přihlaste se k pracovnímu prostoru Azure Databricks.
- Mějte povolený Unity katalog pro svůj pracovní prostor.
- Mějte ve svém pracovním prostoru k dispozici výpočetní prostředí bez serveru, pokud chcete používat Serverless Lakeflow Spark Declarační Kanály (jsou ve výchozím nastavení povoleny v pracovních prostorech s katalogem Unity). Pokud výpočetní prostředky bez serveru nejsou k dispozici, kroky fungují s výchozím výpočetním prostředím vašeho pracovního prostoru.
- Máte oprávnění k vytvoření výpočetního prostředku nebo přístupu k výpočetnímu prostředku.
- Máte oprávnění k vytvoření nového schématu v katalogu. Požadovaná oprávnění jsou
USE CATALOGaCREATE SCHEMA. - Máte oprávnění k vytvoření nového svazku v existujícím schématu. Požadovaná oprávnění jsou
USE SCHEMAaCREATE VOLUME. - Použijte modul runtime, který podporuje nativní prostorové typy a prostorové funkce.
Krok 1: Vytvoření kanálu
Vytvořte nový kanál ETL a nastavte výchozí katalog a schéma pro tabulky.
V pracovním prostoru klikněte na
Nové v levém horním rohu.
Klikněte na kanál ETL.
Změňte název potrubí na
Spatial pipeline tutorialnebo na název, který dáváte přednost.Pod názvem zvolte katalog a schéma, pro které máte oprávnění k zápisu.
Tento katalog a schéma se ve výchozím nastavení používají, když v kódu nezadáte katalog nebo schéma. Nahraďte
<catalog>a<schema>v následujících krocích hodnotami, které zde zvolíte.V rozšířených možnostech vyberte Začít s prázdným souborem.
Zvolte složku pro kód. Pokud chcete vybrat složku, můžete vybrat možnost Procházet . Ke správě verzí můžete použít složku Git.
Jako jazyk vašeho prvního souboru zvolte Python nebo SQL . Soubory můžete později přidat v jiném jazyce.
Kliknutím na vybrat vytvoříte kanál a otevřete Editor kanálů Lakeflow.
Teď máte prázdný kanál s výchozím katalogem a schématem. Potom vytvořte ukázková data GPS a geofence.
Krok 2: Vytvoření ukázkových dat GPS a geofence
Tento krok vygeneruje ukázková data ve svazku: nezpracované gps ping (JSON) a skladové geofence (JSON s polygony WKT). GPS body jsou generovány v ohraničující box, který překrývaje dva skladové mnohoúhelníky, takže prostorové spojení v pozdějším kroku vrátí řádky příjezdu. Tento krok můžete přeskočit, pokud už máte vlastní data ve svazku nebo tabulce.
V Editoru kanálů Lakeflow klikněte v prohlížeči prostředků na
Přidat, a poté na Průzkum.
Nastavte název na
Setup spatial data, zvolte Python a ponechte výchozí cílovou složku.Klikněte na Vytvořit.
Do nového poznámkového bloku vložte následující kód. Nahraďte
<catalog>výchozí katalog a<schema>schéma, které jste nastavili v kroku 1.Pomocí následujícího kódu v poznámkovém bloku vygenerujte data GPS a geofence.
from pyspark.sql import functions as F catalog = "<catalog>" # for example, "main" schema = "<schema>" # for example, "default" spark.sql(f"USE CATALOG `{catalog}`") spark.sql(f"USE SCHEMA `{schema}`") spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`") volume_base = f"/Volumes/{catalog}/{schema}/raw_data" # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area) gps_path = f"{volume_base}/gps" df_gps = ( spark.range(0, 5000) .repartition(10) .select( F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"), F.current_timestamp().alias("timestamp"), (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1 (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2 ) ) df_gps.write.format("json").mode("overwrite").save(gps_path) print(f"Wrote 5000 GPS rows to {gps_path}") # Geofences: two warehouse polygons (WKT) in the same region geofences_path = f"{volume_base}/geofences" geofences_data = [ ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"), ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"), ] df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"]) df_geo.write.format("json").mode("overwrite").save(geofences_path) print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")Spusťte buňku poznámkového bloku (Shift+ Enter).
Po dokončení spuštění obsahuje gps svazek (nezpracované příkazy ping) a geofences (mnohoúhelníky ve WKT). V dalším kroku importujete data GPS do bronzové tabulky.
Krok 3: Ingestování dat GPS do bronzové streamovací tabulky
Ingestování nezpracovaného JSON GPS ze svazku přírůstkově pomocí Auto Loaderu a zapisovat do bronzové streamovací tabulky.
V prohlížeči zdrojů klikněte na
Přidat, a pak Transformace.
Nastavte název na
gps_bronze, zvolte SQL nebo Python a klikněte na Vytvořit.Obsah souboru nahraďte následujícími daty (použijte kartu, která odpovídá vašemu jazyku). Nahraďte
<catalog>a<schema>svým výchozím katalogem a schématem.SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze COMMENT "Raw GPS pings ingested from volume using Auto Loader"; CREATE FLOW gps_bronze_ingest_flow AS INSERT INTO gps_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/<catalog>/<schema>/raw_data/gps", format => "json", inferColumnTypes => "true" )Python
from pyspark import pipelines as dp path = "/Volumes/<catalog>/<schema>/raw_data/gps" dp.create_streaming_table( name="gps_bronze", comment="Raw GPS pings ingested from volume using Auto Loader", ) @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow") def gps_bronze_ingest_flow(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(path) )Klikněte na
Spustit soubor nebo Spustit kanál pro spuštění aktualizace.
Po dokončení aktualizace se v grafu potrubí zobrazí gps_bronze tabulka. Dále přidejte stříbrnou tabulku, která převádí souřadnice na nativní bod geometrie.
Krok 4: Přidání tabulky streamování typu silver, která obsahuje body geometrie
Vytvořte streamovací tabulku, která čte z bronzové tabulky a přidá GEOMETRY sloupec pomocí ST_Point(longitude, latitude).
V prohlížeči zdrojů klikněte na
Přidat, a pak Transformace.
Nastavte název na
raw_gps_silver, zvolte SQL nebo Python a klikněte na Vytvořit.Do nového souboru vložte následující kód.
SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver COMMENT "GPS pings with native geometry point for spatial joins"; CREATE FLOW raw_gps_silver_flow AS INSERT INTO raw_gps_silver BY NAME SELECT device_id, timestamp, longitude, latitude, ST_Point(longitude, latitude) AS point_geom FROM STREAM(gps_bronze)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F dp.create_streaming_table( name="raw_gps_silver", comment="GPS pings with native geometry point for spatial joins", ) @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow") def raw_gps_silver_flow(): return ( spark.readStream.table("gps_bronze") .select( "device_id", "timestamp", "longitude", "latitude", F.expr("ST_Point(longitude, latitude)").alias("point_geom"), ) )Klikněte na
Spusťte soubor nebo spusťte kanál.
Graf pipeline nyní ukazuje gps_bronze a raw_gps_silver. Dále přidejte geografické zóny skladu jako materializované zobrazení.
Krok 5: Vytvoření zlaté tabulky geofences skladu
Vytvořte materializované zobrazení, které čte geofence ze svazku a převede sloupec WKT na GEOMETRY sloupec pomocí ST_GeomFromWKT.
V prohlížeči zdrojů klikněte na
Přidat, a pak Transformace.
Nastavte název na
warehouse_geofences_gold, zvolte SQL nebo Python a klikněte na Vytvořit.Vložte následující kód. Nahraďte
<catalog>a<schema>svým výchozím katalogem a schématem.SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS SELECT warehouse_name, ST_GeomFromWKT(boundary_wkt) AS boundary_geom FROM read_files( "/Volumes/<catalog>/<schema>/raw_data/geofences", format => "json" )Python
from pyspark import pipelines as dp from pyspark.sql import functions as F path = "/Volumes/<catalog>/<schema>/raw_data/geofences" @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry") def warehouse_geofences_gold(): return ( spark.read.format("json").load(path).select( "warehouse_name", F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"), ) )Klikněte na
Spusťte soubor nebo spusťte kanál.
Datové potrubí nyní zahrnuje tabulku s geografickými ploty. Následně přidejte prostorové spojení k výpočtu příjezdu do skladu.
Krok 6: Vytvořte tabulku příjezdů do skladu s prostorovým spojením
Přidejte materializované zobrazení, které spojuje stříbrné GPS body s geofencingem, pomocí ST_Contains(boundary_geom, point_geom) k určení, kdy je zařízení uvnitř polygonu skladu.
V prohlížeči zdrojů klikněte na
Přidat, a pak Transformace.
Nastavte název na
warehouse_arrivals, zvolte SQL nebo Python a klikněte na Vytvořit.Vložte následující kód.
SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS SELECT g.device_id, g.timestamp, w.warehouse_name FROM raw_gps_silver g JOIN warehouse_geofences_gold w ON ST_Contains(w.boundary_geom, g.point_geom)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence") def warehouse_arrivals(): g = spark.read.table("raw_gps_silver") w = spark.read.table("warehouse_geofences_gold") return ( g.alias("g") .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)")) .select( F.col("g.device_id").alias("device_id"), F.col("g.timestamp").alias("timestamp"), F.col("w.warehouse_name").alias("warehouse_name"), ) )Klikněte na
Spusťte soubor nebo spusťte kanál.
Po dokončení aktualizace se v grafu kanálu zobrazí všechny čtyři datové sady: gps_bronze, raw_gps_silver, warehouse_geofences_golda warehouse_arrivals.
Ověření prostorového spojení
Ověřte, že prostorové spojení vytvořilo řádky: body, které spadají do geofence, ze stříbrné tabulky se objeví warehouse_arrivals. V poznámkovém bloku nebo editoru SQL spusťte jeden z následujících příkazů (použijte stejný katalog a schéma jako cíl kanálu).
Počet příjezdů podle skladu (SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
Měli byste vidět nenulové počty pro Warehouse_A a Warehouse_B (ukázková data GPS se překrývají s oběma mnohoúhelníky). Pro kontrolu ukázkových řádků:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
Stejné kontroly v Pythonu (poznámkovém bloku):
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
Pokud uvidíte řádky v warehouse_arrivals, spojení ST_Contains(boundary_geom, point_geom) funguje správně.
Krok 7: Naplánovat potrubí (volitelné)
Pokud chcete, aby byl kanál aktuální, protože nová data GPS přistane ve svazku, vytvořte úlohu pro spuštění kanálu podle plánu.
- V horní části editoru zvolte tlačítko Plán .
- Pokud se zobrazí dialogové okno Plány , zvolte Přidat plán.
- Volitelně můžete úlohu pojmenovat.
- Ve výchozím nastavení se plán spouští jednou denně. Můžete přijmout toto nebo nastavit vlastní. Volba Upřesnit umožňuje nastavit konkrétní čas; Další možnosti umožňují přidávat oznámení o spuštění.
- Chcete-li použít plán, vyberte Vytvořit .
Další informace o spuštěních úloh najdete v tématu Monitorování a pozorovatelnost úloh Lakeflow .