Udostępnij za pośrednictwem


Samouczek: tworzenie potoku geoprzestrzennego przy użyciu natywnych typów przestrzennych

Dowiedz się, jak utworzyć i wdrożyć potok, który pobiera dane GPS, konwertuje współrzędne na natywne typy przestrzenne i porównuje je z geofencjami magazynu w celu śledzenia przyjazdów, korzystając z potoków deklaratywnych Lakeflow Spark (SDP) do orkiestracji danych oraz Auto Loader. W tym samouczku są używane natywne typy przestrzenne usługi Databricks (GEOMETRY, GEOGRAPHY) i wbudowane funkcje przestrzenne, takie jak ST_Point, ST_GeomFromWKTi ST_Contains, dzięki czemu można uruchamiać przepływy pracy geoprzestrzenne na dużą skalę bez bibliotek zewnętrznych.

W tym samouczku nauczysz się następujących rzeczy:

  • Utwórz potok i wygeneruj przykładowe dane GPS i geofencing w woluminie Unity Catalog.
  • Pozyskiwać surowe pingi GPS przyrostowo za pomocą Auto Loader do tabeli przesyłania strumieniowego na poziomie brązowym.
  • Utwórz srebrną tabelę przesyłania strumieniowego, która konwertuje szerokość geograficzną i długość geograficzną na natywny punkt GEOMETRY.
  • Utwórz zmaterializowany widok geofencingów magazynu na podstawie wielokątów WKT.
  • Uruchom łączenie przestrzenne, aby utworzyć tabelę przyjścia do magazynu (które urządzenie weszło w poszczególną strefę geofencingu).

Wynikiem jest potok w stylu medali: brązowy (surowe dane GPS), srebrny (punkty jako geometria) i złoty (strefy geograficzne oraz zdarzenia związane z przyjazdem). Aby uzyskać więcej informacji, zobacz Co to jest architektura medallion lakehouse?

Wymagania

Aby ukończyć ten samouczek, musisz spełnić następujące wymagania:

Krok 1: Utwórz potok

Utwórz nowy potok ETL i ustaw domyślny katalog oraz schemat dla tabel.

  1. W obszarze roboczym kliknij ikonę Plus. Nowy w lewym górnym rogu.

  2. Kliknij Potok ETL.

  3. Zmień tytuł potoku na Spatial pipeline tutorial lub preferowaną nazwę.

  4. W tytule wybierz katalog i schemat, dla którego masz uprawnienia do zapisu.

    Ten wykaz i schemat są używane domyślnie, gdy nie określasz katalogu ani schematu w kodzie. Zastąp <catalog> i <schema> w poniższych krokach wartościami wybranymi tutaj.

  5. W obszarze Opcje zaawansowane wybierz pozycję Rozpocznij od pustego pliku.

  6. Wybierz folder dla kodu. Możesz wybrać pozycję Przeglądaj , aby wybrać folder. Do kontroli wersji można użyć folderu Git.

  7. Wybierz język Python lub SQL dla języka pierwszego pliku. Możesz później dodawać pliki w innym języku.

  8. Kliknij Wybierz, aby utworzyć pipeline i otworzyć Lakeflow Pipelines Editor.

Masz teraz pustą rurę z domyślnym wykazem i strukturą. Następnie utwórz przykładowe dane GPS i geofencingu.

Krok 2. Tworzenie przykładowych danych GPS i geofencingu

Ten krok generuje przykładowe dane w woluminie: nieprzetworzone pingi GPS (JSON) i geofencje magazynu (JSON z wielokątami WKT). Punkty GPS są generowane w ramce ograniczającej, która nakłada się na dwa wielokąty magazynu, więc łączenie przestrzenne w późniejszym kroku zwróci wiersze przyjazdu. Ten krok można pominąć, jeśli masz już własne dane w woluminie lub tabeli.

  1. W Edytorze potoków Lakeflow, w przeglądarce zasobów, kliknij ikonę Plus.Dodaj, a następnie Eksplorację.

  2. Ustaw Nazwa na Setup spatial data, wybierz Python i pozostaw domyślny folder docelowy.

  3. Kliknij pozycję Utwórz.

  4. W nowym notesie wklej następujący kod. Zastąp <catalog> domyślnym katalogiem i <schema> schematem ustawionymi w kroku 1.

    Użyj poniższego kodu w notesie, aby wygenerować dane GPS i geofencingu.

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. Uruchom komórkę notatnika (Shift + Enter).

Po zakończeniu przebiegu wolumin zawiera gps (surowe pingi) i geofences (wielokąty w formacie WKT). W następnym kroku pozyskujesz dane GPS do brązowej tabeli.

Krok 3. Pozyskiwanie danych GPS do brązowej tabeli przesyłania strumieniowego

Pozyskiwanie nieprzetworzonego kodu GPS JSON z woluminu przyrostowo przy użyciu automatycznego modułu ładującego i zapisu w brązowej tabeli przesyłania strumieniowego.

  1. W przeglądarce zasobów kliknij ikonę Plus.Dodaj, a następnie Przekształcenie.

  2. Ustaw Nazwa na gps_bronze, wybierz opcję SQL lub Python, a następnie kliknij Utwórz.

  3. Zastąp zawartość pliku następującą zawartością (użyj zakładki, która odpowiada Twojemu językowi). Zastąp <catalog> i <schema> domyślnym katalogiem i schematem.

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. Kliknij ikonę Odtwórz.Uruchom plik lub Uruchom potok, aby uruchomić aktualizację.

Po zakończeniu aktualizacji wykres potoku przedstawia tabelę gps_bronze . Następnie dodaj srebrną tabelę, która konwertuje współrzędne na natywny punkt geometryczny.

Krok 4. Dodaj srebrną tabelę przesyłania strumieniowego z punktami geometrycznymi

Utwórz tabelę przesyłania strumieniowego odczytaną z brązowej tabeli i dodaj kolumnę GEOMETRY przy użyciu polecenia ST_Point(longitude, latitude).

  1. W przeglądarce zasobów kliknij ikonę Plusa.Dodaj, a następnie Przekształcenie.

  2. Ustaw Nazwa na raw_gps_silver, wybierz opcję SQL lub Python, a następnie kliknij Utwórz.

  3. Wklej następujący kod do nowego pliku.

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. Kliknij ikonę Odtwórz.Uruchom plik lub Uruchom potok.

Wykres potoku pokazuje teraz elementy gps_bronze i raw_gps_silver. Następnie dodaj geofencje magazynu jako zmaterializowany widok.

Krok 5. Utworzenie najlepszej tabeli geofencjonowania magazynu

Utwórz zmaterializowany widok, który odczytuje geofensy z woluminu i konwertuje kolumnę WKT na kolumnę GEOMETRY przy użyciu ST_GeomFromWKT.

  1. W przeglądarce zasobów kliknij ikonę Plusa.Dodaj, a następnie Przekształcenie.

  2. Ustaw Nazwa na warehouse_geofences_gold, wybierz opcję SQL lub Python, a następnie kliknij Utwórz.

  3. Wklej następujący kod. Zastąp <catalog> i <schema> domyślnym katalogiem i schematem.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. Kliknij ikonę Odtwórz.Uruchom plik lub Uruchom potok.

Potok przetwarzania zawiera teraz tabelę geostref. Następnie dodaj sprzężenie przestrzenne do obliczania przybyć do magazynu danych.

Krok 6. Tworzenie tabeli przylotów magazynu za pomocą sprzężenia przestrzennego

Dodaj materializowany widok, który łączy srebrne punkty GPS z geofencjami, używając ST_Contains(boundary_geom, point_geom) do określenia, kiedy urządzenie znajduje się wewnątrz wielokąta magazynu.

  1. W przeglądarce zasobów kliknij ikonę Plusa.Dodaj, a następnie Przekształcenie.

  2. Ustaw Nazwa na warehouse_arrivals, wybierz opcję SQL lub Python, a następnie kliknij Utwórz.

  3. Wklej następujący kod.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. Kliknij ikonę Odtwórz.Uruchom plik lub Uruchom potok.

Po zakończeniu aktualizacji wykres pokazuje wszystkie cztery zestawy danych: gps_bronze, raw_gps_silver, warehouse_geofences_gold i warehouse_arrivals.

Weryfikowanie sprzężenia przestrzennego

Upewnij się, że połączenie przestrzenne wygenerowało wiersze: punkty ze srebrnej tabeli, które znajdują się wewnątrz geofencji, pojawiają się w warehouse_arrivals. Uruchom jedną z następujących operacji w notesie lub edytorze SQL (używając tego samego katalogu i schematu co docelowy potok).

Liczba przylotów według magazynu (SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

Powinny zostać wyświetlone liczby niezerowe dla Warehouse_A i Warehouse_B (przykładowe dane GPS nakładają się na oba wielokąty). Aby sprawdzić przykładowe wiersze:

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Te same sprawdzenia w Pythonie (notebook):

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

Jeśli zobaczysz wiersze w warehouse_arrivals, łączenie ST_Contains(boundary_geom, point_geom) działa poprawnie.

Krok 7. Zaplanowanie potoku (opcjonalnie)

Aby zapewnić aktualność pipeline'u, gdy nowe dane GPS są umieszczane w woluminie, utwórz zadanie uruchamiania pipeline'u zgodnie z harmonogramem.

  1. W górnej części edytora wybierz przycisk Harmonogram .
  2. Jeśli zostanie wyświetlone okno dialogowe Harmonogramy , wybierz pozycję Dodaj harmonogram.
  3. Opcjonalnie nadaj zadaniu nazwę.
  4. Domyślnie harmonogram jest uruchamiany raz dziennie. Możesz to zaakceptować lub ustawić własne. Wybranie opcji Zaawansowane umożliwia ustawienie określonego czasu; Więcej opcji umożliwia dodawanie powiadomień uruchamiania.
  5. Wybierz pozycję Utwórz , aby zastosować harmonogram.

Aby uzyskać więcej informacji na temat przebiegów zadań, zobacz Monitorowanie i obserwacja dla zadań Lakeflow.

Dodatkowe zasoby