Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Dowiedz się, jak utworzyć i wdrożyć potok, który pobiera dane GPS, konwertuje współrzędne na natywne typy przestrzenne i porównuje je z geofencjami magazynu w celu śledzenia przyjazdów, korzystając z potoków deklaratywnych Lakeflow Spark (SDP) do orkiestracji danych oraz Auto Loader. W tym samouczku są używane natywne typy przestrzenne usługi Databricks (GEOMETRY, GEOGRAPHY) i wbudowane funkcje przestrzenne, takie jak ST_Point, ST_GeomFromWKTi ST_Contains, dzięki czemu można uruchamiać przepływy pracy geoprzestrzenne na dużą skalę bez bibliotek zewnętrznych.
W tym samouczku nauczysz się następujących rzeczy:
- Utwórz potok i wygeneruj przykładowe dane GPS i geofencing w woluminie Unity Catalog.
- Pozyskiwać surowe pingi GPS przyrostowo za pomocą Auto Loader do tabeli przesyłania strumieniowego na poziomie brązowym.
- Utwórz srebrną tabelę przesyłania strumieniowego, która konwertuje szerokość geograficzną i długość geograficzną na natywny punkt
GEOMETRY. - Utwórz zmaterializowany widok geofencingów magazynu na podstawie wielokątów WKT.
- Uruchom łączenie przestrzenne, aby utworzyć tabelę przyjścia do magazynu (które urządzenie weszło w poszczególną strefę geofencingu).
Wynikiem jest potok w stylu medali: brązowy (surowe dane GPS), srebrny (punkty jako geometria) i złoty (strefy geograficzne oraz zdarzenia związane z przyjazdem). Aby uzyskać więcej informacji, zobacz Co to jest architektura medallion lakehouse?
Wymagania
Aby ukończyć ten samouczek, musisz spełnić następujące wymagania:
- Zaloguj się do obszaru roboczego usługi Azure Databricks.
- Włącz Unity Catalog dla swojego obszaru roboczego.
- Jeśli chcesz używać deklaratywnych potoków Serverless Lakeflow Spark, musisz mieć włączone bezserwerowe obliczenia na swoim koncie. Jeśli przetwarzanie bezserwerowe nie jest włączone, kroki działają z domyślnym obliczeniami dla obszaru roboczego.
- Mieć uprawnienia do tworzenia zasobu obliczeniowego lub dostępu do zasobu obliczeniowego.
- Mieć uprawnienia do tworzenia nowego schematu w wykazie. Wymagane uprawnienia to
USE CATALOGiCREATE SCHEMA. - Mieć uprawnienia do tworzenia nowego woluminu w istniejącym schemacie. Wymagane uprawnienia to
USE SCHEMAiCREATE VOLUME. - Użyj środowiska uruchomieniowego obsługującego natywne typy przestrzenne i funkcje przestrzenne.
Krok 1: Utwórz potok
Utwórz nowy potok ETL i ustaw domyślny katalog oraz schemat dla tabel.
W obszarze roboczym kliknij
Nowy w lewym górnym rogu.
Kliknij Potok ETL.
Zmień tytuł potoku na
Spatial pipeline tutoriallub preferowaną nazwę.W tytule wybierz katalog i schemat, dla którego masz uprawnienia do zapisu.
Ten wykaz i schemat są używane domyślnie, gdy nie określasz katalogu ani schematu w kodzie. Zastąp
<catalog>i<schema>w poniższych krokach wartościami wybranymi tutaj.W obszarze Opcje zaawansowane wybierz pozycję Rozpocznij od pustego pliku.
Wybierz folder dla kodu. Możesz wybrać pozycję Przeglądaj , aby wybrać folder. Do kontroli wersji można użyć folderu Git.
Wybierz język Python lub SQL dla języka pierwszego pliku. Możesz później dodawać pliki w innym języku.
Kliknij Wybierz, aby utworzyć pipeline i otworzyć Lakeflow Pipelines Editor.
Masz teraz pustą rurę z domyślnym wykazem i strukturą. Następnie utwórz przykładowe dane GPS i geofencingu.
Krok 2. Tworzenie przykładowych danych GPS i geofencingu
Ten krok generuje przykładowe dane w woluminie: nieprzetworzone pingi GPS (JSON) i geofencje magazynu (JSON z wielokątami WKT). Punkty GPS są generowane w ramce ograniczającej, która nakłada się na dwa wielokąty magazynu, więc łączenie przestrzenne w późniejszym kroku zwróci wiersze przyjazdu. Ten krok można pominąć, jeśli masz już własne dane w woluminie lub tabeli.
W Edytorze potoków Lakeflow, w przeglądarce zasobów, kliknij
Dodaj, a następnie Eksplorację.
Ustaw Nazwa na
Setup spatial data, wybierz Python i pozostaw domyślny folder docelowy.Kliknij pozycję Utwórz.
W nowym notesie wklej następujący kod. Zastąp
<catalog>domyślnym katalogiem i<schema>schematem ustawionymi w kroku 1.Użyj poniższego kodu w notesie, aby wygenerować dane GPS i geofencingu.
from pyspark.sql import functions as F catalog = "<catalog>" # for example, "main" schema = "<schema>" # for example, "default" spark.sql(f"USE CATALOG `{catalog}`") spark.sql(f"USE SCHEMA `{schema}`") spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`") volume_base = f"/Volumes/{catalog}/{schema}/raw_data" # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area) gps_path = f"{volume_base}/gps" df_gps = ( spark.range(0, 5000) .repartition(10) .select( F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"), F.current_timestamp().alias("timestamp"), (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1 (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2 ) ) df_gps.write.format("json").mode("overwrite").save(gps_path) print(f"Wrote 5000 GPS rows to {gps_path}") # Geofences: two warehouse polygons (WKT) in the same region geofences_path = f"{volume_base}/geofences" geofences_data = [ ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"), ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"), ] df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"]) df_geo.write.format("json").mode("overwrite").save(geofences_path) print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")Uruchom komórkę notatnika (Shift + Enter).
Po zakończeniu przebiegu wolumin zawiera gps (surowe pingi) i geofences (wielokąty w formacie WKT). W następnym kroku pozyskujesz dane GPS do brązowej tabeli.
Krok 3. Pozyskiwanie danych GPS do brązowej tabeli przesyłania strumieniowego
Pozyskiwanie nieprzetworzonego kodu GPS JSON z woluminu przyrostowo przy użyciu automatycznego modułu ładującego i zapisu w brązowej tabeli przesyłania strumieniowego.
W przeglądarce zasobów kliknij
Dodaj, a następnie Przekształcenie.
Ustaw Nazwa na
gps_bronze, wybierz opcję SQL lub Python, a następnie kliknij Utwórz.Zastąp zawartość pliku następującą zawartością (użyj zakładki, która odpowiada Twojemu językowi). Zastąp
<catalog>i<schema>domyślnym katalogiem i schematem.SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze COMMENT "Raw GPS pings ingested from volume using Auto Loader"; CREATE FLOW gps_bronze_ingest_flow AS INSERT INTO gps_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/<catalog>/<schema>/raw_data/gps", format => "json", inferColumnTypes => "true" )Python
from pyspark import pipelines as dp path = "/Volumes/<catalog>/<schema>/raw_data/gps" dp.create_streaming_table( name="gps_bronze", comment="Raw GPS pings ingested from volume using Auto Loader", ) @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow") def gps_bronze_ingest_flow(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(path) )Kliknij
Uruchom plik lub Uruchom potok, aby uruchomić aktualizację.
Po zakończeniu aktualizacji wykres potoku przedstawia tabelę gps_bronze . Następnie dodaj srebrną tabelę, która konwertuje współrzędne na natywny punkt geometryczny.
Krok 4. Dodaj srebrną tabelę przesyłania strumieniowego z punktami geometrycznymi
Utwórz tabelę przesyłania strumieniowego odczytaną z brązowej tabeli i dodaj kolumnę GEOMETRY przy użyciu polecenia ST_Point(longitude, latitude).
W przeglądarce zasobów kliknij
Dodaj, a następnie Przekształcenie.
Ustaw Nazwa na
raw_gps_silver, wybierz opcję SQL lub Python, a następnie kliknij Utwórz.Wklej następujący kod do nowego pliku.
SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver COMMENT "GPS pings with native geometry point for spatial joins"; CREATE FLOW raw_gps_silver_flow AS INSERT INTO raw_gps_silver BY NAME SELECT device_id, timestamp, longitude, latitude, ST_Point(longitude, latitude) AS point_geom FROM STREAM(gps_bronze)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F dp.create_streaming_table( name="raw_gps_silver", comment="GPS pings with native geometry point for spatial joins", ) @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow") def raw_gps_silver_flow(): return ( spark.readStream.table("gps_bronze") .select( "device_id", "timestamp", "longitude", "latitude", F.expr("ST_Point(longitude, latitude)").alias("point_geom"), ) )Kliknij
Uruchom plik lub Uruchom potok.
Wykres potoku pokazuje teraz elementy gps_bronze i raw_gps_silver. Następnie dodaj geofencje magazynu jako zmaterializowany widok.
Krok 5. Utworzenie najlepszej tabeli geofencjonowania magazynu
Utwórz zmaterializowany widok, który odczytuje geofensy z woluminu i konwertuje kolumnę WKT na kolumnę GEOMETRY przy użyciu ST_GeomFromWKT.
W przeglądarce zasobów kliknij
Dodaj, a następnie Przekształcenie.
Ustaw Nazwa na
warehouse_geofences_gold, wybierz opcję SQL lub Python, a następnie kliknij Utwórz.Wklej następujący kod. Zastąp
<catalog>i<schema>domyślnym katalogiem i schematem.SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS SELECT warehouse_name, ST_GeomFromWKT(boundary_wkt) AS boundary_geom FROM read_files( "/Volumes/<catalog>/<schema>/raw_data/geofences", format => "json" )Python
from pyspark import pipelines as dp from pyspark.sql import functions as F path = "/Volumes/<catalog>/<schema>/raw_data/geofences" @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry") def warehouse_geofences_gold(): return ( spark.read.format("json").load(path).select( "warehouse_name", F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"), ) )Kliknij
Uruchom plik lub Uruchom potok.
Potok przetwarzania zawiera teraz tabelę geostref. Następnie dodaj sprzężenie przestrzenne do obliczania przybyć do magazynu danych.
Krok 6. Tworzenie tabeli przylotów magazynu za pomocą sprzężenia przestrzennego
Dodaj materializowany widok, który łączy srebrne punkty GPS z geofencjami, używając ST_Contains(boundary_geom, point_geom) do określenia, kiedy urządzenie znajduje się wewnątrz wielokąta magazynu.
W przeglądarce zasobów kliknij
Dodaj, a następnie Przekształcenie.
Ustaw Nazwa na
warehouse_arrivals, wybierz opcję SQL lub Python, a następnie kliknij Utwórz.Wklej następujący kod.
SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS SELECT g.device_id, g.timestamp, w.warehouse_name FROM raw_gps_silver g JOIN warehouse_geofences_gold w ON ST_Contains(w.boundary_geom, g.point_geom)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence") def warehouse_arrivals(): g = spark.read.table("raw_gps_silver") w = spark.read.table("warehouse_geofences_gold") return ( g.alias("g") .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)")) .select( F.col("g.device_id").alias("device_id"), F.col("g.timestamp").alias("timestamp"), F.col("w.warehouse_name").alias("warehouse_name"), ) )Kliknij
Uruchom plik lub Uruchom potok.
Po zakończeniu aktualizacji wykres pokazuje wszystkie cztery zestawy danych: gps_bronze, raw_gps_silver, warehouse_geofences_gold i warehouse_arrivals.
Weryfikowanie sprzężenia przestrzennego
Upewnij się, że połączenie przestrzenne wygenerowało wiersze: punkty ze srebrnej tabeli, które znajdują się wewnątrz geofencji, pojawiają się w warehouse_arrivals. Uruchom jedną z następujących operacji w notesie lub edytorze SQL (używając tego samego katalogu i schematu co docelowy potok).
Liczba przylotów według magazynu (SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
Powinny zostać wyświetlone liczby niezerowe dla Warehouse_A i Warehouse_B (przykładowe dane GPS nakładają się na oba wielokąty). Aby sprawdzić przykładowe wiersze:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
Te same sprawdzenia w Pythonie (notebook):
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
Jeśli zobaczysz wiersze w warehouse_arrivals, łączenie ST_Contains(boundary_geom, point_geom) działa poprawnie.
Krok 7. Zaplanowanie potoku (opcjonalnie)
Aby zapewnić aktualność pipeline'u, gdy nowe dane GPS są umieszczane w woluminie, utwórz zadanie uruchamiania pipeline'u zgodnie z harmonogramem.
- W górnej części edytora wybierz przycisk Harmonogram .
- Jeśli zostanie wyświetlone okno dialogowe Harmonogramy , wybierz pozycję Dodaj harmonogram.
- Opcjonalnie nadaj zadaniu nazwę.
- Domyślnie harmonogram jest uruchamiany raz dziennie. Możesz to zaakceptować lub ustawić własne. Wybranie opcji Zaawansowane umożliwia ustawienie określonego czasu; Więcej opcji umożliwia dodawanie powiadomień uruchamiania.
- Wybierz pozycję Utwórz , aby zastosować harmonogram.
Aby uzyskać więcej informacji na temat przebiegów zadań, zobacz Monitorowanie i obserwacja dla zadań Lakeflow.