Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Узнайте, как создать и развернуть конвейер, который выполняет прием данных GPS, преобразует координаты в собственные пространственные типы и присоединяется к геозонам хранилища для отслеживания прибытия с помощью Lakeflow Spark Declarative Pipelines (SDP) для оркестрации данных и Auto Loader. В этом руководстве используются собственные пространственные типы (GEOMETRY, GEOGRAPHY) и встроенные пространственные функции, такие как ST_Point, ST_GeomFromWKTи ST_Contains, чтобы можно было выполнять геопространственные рабочие процессы в масштабе без внешних библиотек.
При работе с этим руководством вы сделаете следующее:
- Создайте пайплайн и сгенерируйте образцы данных GPS и геозон в разделе каталога Unity.
- Постепенно принимайте необработанные сигналы GPS с помощью Auto Loader в потоковую таблицу бронзового уровня.
- Создайте таблицу потоковой передачи silver, которая преобразует широту и долготу в собственную
GEOMETRYточку. - Создайте материализованное представление геозон хранилища из многоугольников WKT.
- Запустите пространственное соединение, чтобы создать таблицу прибытий в склад (какое устройство вошло в какую геозону).
Результатом является конвейер в стиле медальона: бронза (необработанный GPS), серебро (точки в виде геометрии) и золото (геозон и события прибытия). Дополнительные сведения см. в статье об архитектуре medallion lakehouse.
Требования
Чтобы завершить работу с этим руководством, необходимо выполнить следующие требования:
- Войдите в рабочую область Azure Databricks.
- Активируйте каталог Unity для вашей рабочей области.
- Вы можете использовать бессерверные вычислительные ресурсы в вашей рабочей области, если вы хотите использовать декларативные конвейеры Serverless Lakeflow Spark, которые включены по умолчанию в рабочих областях с Unity Catalog. Если бессерверные вычисления недоступны, шаги работают с вычислениями по умолчанию для рабочей области.
- Разрешение на создание вычислительного ресурса или доступ к вычислительному ресурсу.
- У вас есть разрешения на создание новой схемы в каталоге. Необходимые разрешения:
USE CATALOGиCREATE SCHEMA. - У вас есть разрешения на создание нового тома в существующей схеме. Необходимые разрешения:
USE SCHEMAиCREATE VOLUME. - Используйте среду выполнения, которая поддерживает собственные пространственные типы и пространственные функции.
Шаг 1. Создание конвейера
Создайте конвейер ETL и задайте каталог и схему по умолчанию для таблиц.
В рабочей области щелкните
Новое в левом верхнем углу.
Щелкните ETL-пайплайн.
Измените название потока на
Spatial pipeline tutorialили предпочитаемое вами имя.В заголовке выберите каталог и схему, для которой у вас есть разрешения на запись.
Этот каталог и схема используются по умолчанию, если в коде не указан каталог или схема. Замените
<catalog>и<schema>в следующих шагах значениями, которые вы выбрали здесь.В разделе "Дополнительные параметры" выберите "Пуск" с пустым файлом.
Выберите папку для кода. Щелкните "Обзор" , чтобы выбрать папку; для управления версиями можно использовать папку Git.
Выберите Python или SQL для языка первого файла. Вы можете добавить файлы на другом языке позже.
Щелкните "Выбрать ", чтобы создать конвейер и открыть редактор Конвейеров Lakeflow.
Теперь у вас есть пустой конвейер с каталогом и схемой по умолчанию. Затем создайте примеры данных GPS и геозон.
Шаг 2. Создание примера данных GPS и геозон
На этом шаге в определенном объеме генерируются примеры данных: необработанные пинги GPS (JSON) и геозоны для складов (JSON с многоугольниками WKT). Точки GPS создаются в ограничивающем прямоугольнике, который перекрывает два многоугольника складов, поэтому пространственное соединение в дальнейшем вернет данные о прибытии. Этот шаг можно пропустить, если у вас уже есть собственные данные в томе или таблице.
В редакторе Конвейеров Lakeflow в браузере активов щелкните
Добавить, а затем Исследование.
Задайте имя
Setup spatial data, выберите Python и оставьте папку назначения по умолчанию.Нажмите кнопку Создать.
В новой записной книжке вставьте следующий код. Замените элементы
<catalog>и<schema>на каталог и схему по умолчанию, заданные на шаге 1.Используйте следующий код в записной книжке для создания данных GPS и геозон.
from pyspark.sql import functions as F catalog = "<catalog>" # for example, "main" schema = "<schema>" # for example, "default" spark.sql(f"USE CATALOG `{catalog}`") spark.sql(f"USE SCHEMA `{schema}`") spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`") volume_base = f"/Volumes/{catalog}/{schema}/raw_data" # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area) gps_path = f"{volume_base}/gps" df_gps = ( spark.range(0, 5000) .repartition(10) .select( F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"), F.current_timestamp().alias("timestamp"), (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1 (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2 ) ) df_gps.write.format("json").mode("overwrite").save(gps_path) print(f"Wrote 5000 GPS rows to {gps_path}") # Geofences: two warehouse polygons (WKT) in the same region geofences_path = f"{volume_base}/geofences" geofences_data = [ ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"), ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"), ] df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"]) df_geo.write.format("json").mode("overwrite").save(geofences_path) print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")Запустите ячейку ноутбука (Shift + Enter).
После завершения выполнения том содержит gps (необработанные pings) и geofences (многоугольники в WKT). На следующем шаге вы будете загружать данные GPS в бронзовый слой таблицы.
Шаг 3. Прием данных GPS в бронзовую потоковую таблицу
Прием необработанного JSON GPS из тома постепенно с помощью автозагрузчика и записи в бронзовую потоковую таблицу.
В браузере активов щелкните
Добавить, и затем Трансформация.
Задайте имя
gps_bronze, выберите SQL или Python и нажмите кнопку "Создать".Замените содержимое файла следующим образом (используйте вкладку, соответствующую языку). Замените
<catalog>и<schema>на ваш каталог и схему по умолчанию.SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze COMMENT "Raw GPS pings ingested from volume using Auto Loader"; CREATE FLOW gps_bronze_ingest_flow AS INSERT INTO gps_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/<catalog>/<schema>/raw_data/gps", format => "json", inferColumnTypes => "true" )Python
from pyspark import pipelines as dp path = "/Volumes/<catalog>/<schema>/raw_data/gps" dp.create_streaming_table( name="gps_bronze", comment="Raw GPS pings ingested from volume using Auto Loader", ) @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow") def gps_bronze_ingest_flow(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(path) )Щелкните
Запустите файл или запустите конвейер чтобы запустить обновление.
После обновления график конвейера отображает таблицу gps_bronze. Затем добавьте серебряную таблицу, которая преобразует координаты в собственную геометрическую точку.
Шаг 4. Добавьте таблицу класса silver для потоковой передачи с точками геометрии
Создайте потоковую таблицу, которая считывает из бронзовой таблицы и добавляет столбец GEOMETRY, используя ST_Point(longitude, latitude).
В браузере активов щелкните
Добавить, и затем Трансформация.
Задайте имя
raw_gps_silver, выберите SQL или Python и нажмите кнопку "Создать".Вставьте следующий код в новый файл.
SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver COMMENT "GPS pings with native geometry point for spatial joins"; CREATE FLOW raw_gps_silver_flow AS INSERT INTO raw_gps_silver BY NAME SELECT device_id, timestamp, longitude, latitude, ST_Point(longitude, latitude) AS point_geom FROM STREAM(gps_bronze)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F dp.create_streaming_table( name="raw_gps_silver", comment="GPS pings with native geometry point for spatial joins", ) @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow") def raw_gps_silver_flow(): return ( spark.readStream.table("gps_bronze") .select( "device_id", "timestamp", "longitude", "latitude", F.expr("ST_Point(longitude, latitude)").alias("point_geom"), ) )Щелкните
Выполнить файл или Запуск конвейера.
График конвейера теперь показывает gps_bronze и raw_gps_silver. Затем добавьте геозоны хранилища в виде материализованного представления.
Шаг 5. Создание золотой таблицы геозон хранилища
Создайте материализованное представление, которое считывает геозоны из тома и преобразует столбец WKT в столбец GEOMETRY с помощью ST_GeomFromWKT.
В браузере активов щелкните
Добавить, и затем Трансформация.
Задайте имя
warehouse_geofences_gold, выберите SQL или Python и нажмите кнопку "Создать".Вставьте следующий код. Замените
<catalog>и<schema>на ваш каталог и схему по умолчанию.SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS SELECT warehouse_name, ST_GeomFromWKT(boundary_wkt) AS boundary_geom FROM read_files( "/Volumes/<catalog>/<schema>/raw_data/geofences", format => "json" )Python
from pyspark import pipelines as dp from pyspark.sql import functions as F path = "/Volumes/<catalog>/<schema>/raw_data/geofences" @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry") def warehouse_geofences_gold(): return ( spark.read.format("json").load(path).select( "warehouse_name", F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"), ) )Щелкните
Выполнить файл или Запуск конвейера.
Теперь конвейер включает таблицу геозон. Затем добавьте пространственное объединение для вычисления прибытий в складе данных.
Шаг 6: Создание таблицы прибытий на склад с пространственным соединением
Добавьте материализованное представление, которое присоединяет серебряные GPS точки к геозонам, используя ST_Contains(boundary_geom, point_geom) для определения того, когда устройство находится внутри многоугольника склада.
В браузере активов щелкните
Добавить, и затем Трансформация.
Задайте имя
warehouse_arrivals, выберите SQL или Python и нажмите кнопку "Создать".Вставьте следующий код.
SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS SELECT g.device_id, g.timestamp, w.warehouse_name FROM raw_gps_silver g JOIN warehouse_geofences_gold w ON ST_Contains(w.boundary_geom, g.point_geom)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence") def warehouse_arrivals(): g = spark.read.table("raw_gps_silver") w = spark.read.table("warehouse_geofences_gold") return ( g.alias("g") .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)")) .select( F.col("g.device_id").alias("device_id"), F.col("g.timestamp").alias("timestamp"), F.col("w.warehouse_name").alias("warehouse_name"), ) )Щелкните
Выполнить файл или Запуск конвейера.
После завершения обновления граф конвейера отображает все четыре набора данных: gps_bronze, raw_gps_silver, warehouse_geofences_goldи warehouse_arrivals.
Проверка пространственного соединения
Убедитесь, что пространственное соединение создало строки: точки из серебряной таблицы, которые находятся внутри геозоны и появляются в warehouse_arrivals. Выполните одно из следующих действий в записной книжке или редакторе SQL (используйте тот же каталог и схему, что и целевой объект конвейера).
Подсчет поступлений по хранилищу (SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
Вы должны увидеть ненулевые значения для Warehouse_A и Warehouse_B (В данном примере данные GPS перекрывают оба многоугольника). Чтобы проверить примеры строк, выполните следующее:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
Те же проверки в Python (записная книжка):
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
Если вы видите строки в warehouse_arrivals, то соединение ST_Contains(boundary_geom, point_geom) работает правильно.
Шаг 7. Планирование конвейера (необязательно)
Чтобы поддерживать актуальность конвейера с появлением новых данных GPS в объеме, создайте задачу для запуска конвейера по расписанию.
- В верхней части редактора нажмите кнопку "Расписание ".
- Если появится диалоговое окно "Расписания" , нажмите кнопку "Добавить расписание".
- При необходимости присвойте заданию имя.
- По умолчанию расписание выполняется один раз в день. Вы можете принять это или задать свой собственный. Выбор Расширенные позволяет установить конкретное время; Дополнительные параметры позволяют добавлять уведомления о запуске.
- Нажмите кнопку "Создать", чтобы применить расписание.
Дополнительные сведения о выполнении заданий см. в статье "Мониторинг и наблюдаемость заданий Lakeflow ".