Поделиться через


Руководство. Создание геопространственного конвейера с использованием собственных пространственных типов

Узнайте, как создать и развернуть конвейер, который выполняет прием данных GPS, преобразует координаты в собственные пространственные типы и присоединяется к геозонам хранилища для отслеживания прибытия с помощью Lakeflow Spark Declarative Pipelines (SDP) для оркестрации данных и Auto Loader. В этом руководстве используются собственные пространственные типы (GEOMETRY, GEOGRAPHY) и встроенные пространственные функции, такие как ST_Point, ST_GeomFromWKTи ST_Contains, чтобы можно было выполнять геопространственные рабочие процессы в масштабе без внешних библиотек.

При работе с этим руководством вы сделаете следующее:

  • Создайте пайплайн и сгенерируйте образцы данных GPS и геозон в разделе каталога Unity.
  • Постепенно принимайте необработанные сигналы GPS с помощью Auto Loader в потоковую таблицу бронзового уровня.
  • Создайте таблицу потоковой передачи silver, которая преобразует широту и долготу в собственную GEOMETRY точку.
  • Создайте материализованное представление геозон хранилища из многоугольников WKT.
  • Запустите пространственное соединение, чтобы создать таблицу прибытий в склад (какое устройство вошло в какую геозону).

Результатом является конвейер в стиле медальона: бронза (необработанный GPS), серебро (точки в виде геометрии) и золото (геозон и события прибытия). Дополнительные сведения см. в статье об архитектуре medallion lakehouse.

Требования

Чтобы завершить работу с этим руководством, необходимо выполнить следующие требования:

Шаг 1. Создание конвейера

Создайте конвейер ETL и задайте каталог и схему по умолчанию для таблиц.

  1. В рабочей области щелкните значок Новое в левом верхнем углу.

  2. Щелкните ETL-пайплайн.

  3. Измените название потока на Spatial pipeline tutorial или предпочитаемое вами имя.

  4. В заголовке выберите каталог и схему, для которой у вас есть разрешения на запись.

    Этот каталог и схема используются по умолчанию, если в коде не указан каталог или схема. Замените <catalog> и <schema> в следующих шагах значениями, которые вы выбрали здесь.

  5. В разделе "Дополнительные параметры" выберите "Пуск" с пустым файлом.

  6. Выберите папку для кода. Щелкните "Обзор" , чтобы выбрать папку; для управления версиями можно использовать папку Git.

  7. Выберите Python или SQL для языка первого файла. Вы можете добавить файлы на другом языке позже.

  8. Щелкните "Выбрать ", чтобы создать конвейер и открыть редактор Конвейеров Lakeflow.

Теперь у вас есть пустой конвейер с каталогом и схемой по умолчанию. Затем создайте примеры данных GPS и геозон.

Шаг 2. Создание примера данных GPS и геозон

На этом шаге в определенном объеме генерируются примеры данных: необработанные пинги GPS (JSON) и геозоны для складов (JSON с многоугольниками WKT). Точки GPS создаются в ограничивающем прямоугольнике, который перекрывает два многоугольника складов, поэтому пространственное соединение в дальнейшем вернет данные о прибытии. Этот шаг можно пропустить, если у вас уже есть собственные данные в томе или таблице.

  1. В редакторе Конвейеров Lakeflow в браузере активов щелкните значок «Плюс».Добавить, а затем Исследование.

  2. Задайте имяSetup spatial data, выберите Python и оставьте папку назначения по умолчанию.

  3. Нажмите кнопку Создать.

  4. В новой записной книжке вставьте следующий код. Замените элементы <catalog> и <schema> на каталог и схему по умолчанию, заданные на шаге 1.

    Используйте следующий код в записной книжке для создания данных GPS и геозон.

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. Запустите ячейку ноутбука (Shift + Enter).

После завершения выполнения том содержит gps (необработанные pings) и geofences (многоугольники в WKT). На следующем шаге вы будете загружать данные GPS в бронзовый слой таблицы.

Шаг 3. Прием данных GPS в бронзовую потоковую таблицу

Прием необработанного JSON GPS из тома постепенно с помощью автозагрузчика и записи в бронзовую потоковую таблицу.

  1. В браузере активов щелкните значок Добавить, и затем Трансформация.

  2. Задайте имяgps_bronze, выберите SQL или Python и нажмите кнопку "Создать".

  3. Замените содержимое файла следующим образом (используйте вкладку, соответствующую языку). Замените <catalog> и <schema> на ваш каталог и схему по умолчанию.

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. Щелкните значок Запустите файл или запустите конвейер чтобы запустить обновление.

После обновления график конвейера отображает таблицу gps_bronze. Затем добавьте серебряную таблицу, которая преобразует координаты в собственную геометрическую точку.

Шаг 4. Добавьте таблицу класса silver для потоковой передачи с точками геометрии

Создайте потоковую таблицу, которая считывает из бронзовой таблицы и добавляет столбец GEOMETRY, используя ST_Point(longitude, latitude).

  1. В браузере активов щелкните значок Добавить, и затем Трансформация.

  2. Задайте имяraw_gps_silver, выберите SQL или Python и нажмите кнопку "Создать".

  3. Вставьте следующий код в новый файл.

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. Щелкните значок воспроизведения.Выполнить файл или Запуск конвейера.

График конвейера теперь показывает gps_bronze и raw_gps_silver. Затем добавьте геозоны хранилища в виде материализованного представления.

Шаг 5. Создание золотой таблицы геозон хранилища

Создайте материализованное представление, которое считывает геозоны из тома и преобразует столбец WKT в столбец GEOMETRY с помощью ST_GeomFromWKT.

  1. В браузере активов щелкните значок Добавить, и затем Трансформация.

  2. Задайте имяwarehouse_geofences_gold, выберите SQL или Python и нажмите кнопку "Создать".

  3. Вставьте следующий код. Замените <catalog> и <schema> на ваш каталог и схему по умолчанию.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. Щелкните значок воспроизведения.Выполнить файл или Запуск конвейера.

Теперь конвейер включает таблицу геозон. Затем добавьте пространственное объединение для вычисления прибытий в складе данных.

Шаг 6: Создание таблицы прибытий на склад с пространственным соединением

Добавьте материализованное представление, которое присоединяет серебряные GPS точки к геозонам, используя ST_Contains(boundary_geom, point_geom) для определения того, когда устройство находится внутри многоугольника склада.

  1. В браузере активов щелкните значок Добавить, и затем Трансформация.

  2. Задайте имяwarehouse_arrivals, выберите SQL или Python и нажмите кнопку "Создать".

  3. Вставьте следующий код.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. Щелкните значок воспроизведения.Выполнить файл или Запуск конвейера.

После завершения обновления граф конвейера отображает все четыре набора данных: gps_bronze, raw_gps_silver, warehouse_geofences_goldи warehouse_arrivals.

Проверка пространственного соединения

Убедитесь, что пространственное соединение создало строки: точки из серебряной таблицы, которые находятся внутри геозоны и появляются в warehouse_arrivals. Выполните одно из следующих действий в записной книжке или редакторе SQL (используйте тот же каталог и схему, что и целевой объект конвейера).

Подсчет поступлений по хранилищу (SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

Вы должны увидеть ненулевые значения для Warehouse_A и Warehouse_B (В данном примере данные GPS перекрывают оба многоугольника). Чтобы проверить примеры строк, выполните следующее:

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Те же проверки в Python (записная книжка):

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

Если вы видите строки в warehouse_arrivals, то соединение ST_Contains(boundary_geom, point_geom) работает правильно.

Шаг 7. Планирование конвейера (необязательно)

Чтобы поддерживать актуальность конвейера с появлением новых данных GPS в объеме, создайте задачу для запуска конвейера по расписанию.

  1. В верхней части редактора нажмите кнопку "Расписание ".
  2. Если появится диалоговое окно "Расписания" , нажмите кнопку "Добавить расписание".
  3. При необходимости присвойте заданию имя.
  4. По умолчанию расписание выполняется один раз в день. Вы можете принять это или задать свой собственный. Выбор Расширенные позволяет установить конкретное время; Дополнительные параметры позволяют добавлять уведомления о запуске.
  5. Нажмите кнопку "Создать", чтобы применить расписание.

Дополнительные сведения о выполнении заданий см. в статье "Мониторинг и наблюдаемость заданий Lakeflow ".

Дополнительные ресурсы