Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Aprenda a crear e implementar una canalización que ingiere datos GPS, convierte coordenadas en tipos espaciales nativos y las cruza con geocercas de almacén para realizar un seguimiento de las llegadas mediante Lakeflow Spark Declarative Pipelines (SDP) para la orquestación de datos y Auto Loader. En este tutorial se usan los tipos espaciales nativos de Databricks (GEOMETRY, GEOGRAPHY) y las funciones espaciales integradas, como ST_Point, ST_GeomFromWKTy ST_Contains, para que pueda ejecutar flujos de trabajo geoespaciales a escala sin bibliotecas externas.
En este tutorial, aprenderá lo siguiente:
- Cree una canalización y genere datos de ejemplo de GPS y geocerca en un volumen del Catálogo de Unity.
- Ingerir incrementalmente pings de GPS sin procesar utilizando Auto Loader en una tabla de transmisión de nivel bronce.
- Cree una tabla de streaming silver que convierta la latitud y la longitud en un punto nativo
GEOMETRY. - Cree una vista materializada de geovallas de almacén a partir de polígonos WKT.
- Ejecute una combinación espacial para generar una tabla de llegadas al almacén (qué dispositivo ha entrado en la geovalla).
El resultado es una canalización de estilo medallón: bronce (GPS sin procesar), plata (puntos como geometría) y oro (geovallas y eventos de llegada). Consulte ¿Qué es la arquitectura de medallion lakehouse? para obtener más información.
Requisitos
Para completar este tutorial, debe cumplir los siguientes requisitos:
- Inicie sesión en un área de trabajo de Azure Databricks.
- Tener habilitado el catálogo de Unity para el área de trabajo.
- Tenga habilitada la computación sin servidor para su cuenta si desea usar las canalizaciones declarativas de Spark Lakeflow sin servidor. Si el proceso sin servidor no está habilitado, los pasos funcionan con el proceso predeterminado para el área de trabajo.
- Tener permiso para crear un recurso de proceso o acceso a un recurso de proceso.
- Tener permisos para crear un nuevo esquema en un catálogo. Los permisos necesarios son
USE CATALOGyCREATE SCHEMA. - Tener permisos para crear un nuevo volumen en un esquema existente. Los permisos necesarios son
USE SCHEMAyCREATE VOLUME. - Use un entorno de ejecución que admita tipos espaciales nativos y funciones espaciales.
Paso 1: crear una canalización
Cree una nueva canalización ETL y establezca el catálogo y el esquema predeterminados para las tablas.
En el área de trabajo, haga clic en
Novedad en la esquina superior izquierda.
Haga clic en Canalización de ETL.
Cambie el título de la canalización por
Spatial pipeline tutorialo un nombre que prefiera.Bajo el título, elija un catálogo y un esquema para los que tenga permisos de escritura.
Este catálogo y esquema se usan de forma predeterminada cuando no se especifica un catálogo o esquema en el código. Reemplace
<catalog>y<schema>, en los pasos siguientes, por los valores que elija aquí.En Opciones avanzadas, seleccione Iniciar con un archivo vacío.
Elija una carpeta para el código. Puede seleccionar Examinar para elegir una carpeta; puede usar una carpeta git para el control de versiones.
Elija Python o SQL para el lenguaje del primer archivo. Puede agregar archivos en el otro idioma más adelante.
Haga clic en Seleccionar para crear la canalización y abra el Editor de canalizaciones de Lakeflow.
Ahora tiene una canalización en blanco con un catálogo y un esquema predeterminados. A continuación, cree los datos GPS de ejemplo y geocerca.
Paso 2: Crear los datos GPS y geocerca de ejemplo
Este paso genera datos de ejemplo en un volumen: pings GPS sin procesar (JSON) y geocercas de almacén (JSON con polígonos WKT). Los puntos GPS se generan en una caja delimitadora que se superpone a los dos polígonos de los almacenes, por lo que la unión espacial en un paso posterior devolverá filas correspondientes a llegadas. Puede omitir este paso si ya tiene sus propios datos en un volumen o tabla.
En el Editor de canalizaciones de Lakeflow, en el explorador de recursos, haga clic en
Agregue y, a continuación, Exploración.
Establezca Nombre en
Setup spatial data, elija Python y deje la carpeta de destino predeterminada.Haga clic en Crear.
En el nuevo cuaderno, pegue el código siguiente. Reemplace
<catalog>y<schema>por el catálogo y el esquema predeterminados que establezca en el paso 1.Utiliza el siguiente código en el notebook para generar datos GPS y geocerca.
from pyspark.sql import functions as F catalog = "<catalog>" # for example, "main" schema = "<schema>" # for example, "default" spark.sql(f"USE CATALOG `{catalog}`") spark.sql(f"USE SCHEMA `{schema}`") spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`") volume_base = f"/Volumes/{catalog}/{schema}/raw_data" # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area) gps_path = f"{volume_base}/gps" df_gps = ( spark.range(0, 5000) .repartition(10) .select( F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"), F.current_timestamp().alias("timestamp"), (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1 (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2 ) ) df_gps.write.format("json").mode("overwrite").save(gps_path) print(f"Wrote 5000 GPS rows to {gps_path}") # Geofences: two warehouse polygons (WKT) in the same region geofences_path = f"{volume_base}/geofences" geofences_data = [ ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"), ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"), ] df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"]) df_geo.write.format("json").mode("overwrite").save(geofences_path) print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")Ejecute la celda del notebook (Mayús + Entrar).
Una vez completada la ejecución, el volumen contiene gps (pings sin procesar) y geofences (polígonos en WKT). En el siguiente paso, ingerirá los datos GPS en una tabla de bronce.
Paso 3: Ingesta de datos GPS en una tabla de streaming bronze
Ingiere el JSON GPS sin procesar del volumen de forma incremental mediante Auto Loader y escribe en una tabla de streaming bronze.
En el explorador de recursos, haga clic en
Agregue y, a continuación, Transformación.
Establezca Nombre en
gps_bronze, elija SQL o Python y haga clic en Crear.Reemplace el contenido del archivo por lo siguiente (use la pestaña que coincide con el idioma). Reemplace
<catalog>y<schema>por el catálogo y el esquema predeterminados.SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze COMMENT "Raw GPS pings ingested from volume using Auto Loader"; CREATE FLOW gps_bronze_ingest_flow AS INSERT INTO gps_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/<catalog>/<schema>/raw_data/gps", format => "json", inferColumnTypes => "true" )Python
from pyspark import pipelines as dp path = "/Volumes/<catalog>/<schema>/raw_data/gps" dp.create_streaming_table( name="gps_bronze", comment="Raw GPS pings ingested from volume using Auto Loader", ) @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow") def gps_bronze_ingest_flow(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(path) )Haga clic en
Ejecute el archivo o ejecute la canalización para ejecutar una actualización.
Cuando se completa la actualización, el gráfico de canalización muestra la tabla gps_bronze. A continuación, agregue una tabla silver que convierta coordenadas en un punto de geometría nativo.
Paso 4: Agregar una tabla de streaming silver con puntos de geometría
Cree una tabla de transmisión que lea de la tabla bronze y agregue una columna GEOMETRY mediante ST_Point(longitude, latitude).
En el explorador de recursos, haga clic en
Agregue y, a continuación, Transformación.
Establezca Nombre en
raw_gps_silver, elija SQL o Python y haga clic en Crear.Pegue el código siguiente en el nuevo archivo.
SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver COMMENT "GPS pings with native geometry point for spatial joins"; CREATE FLOW raw_gps_silver_flow AS INSERT INTO raw_gps_silver BY NAME SELECT device_id, timestamp, longitude, latitude, ST_Point(longitude, latitude) AS point_geom FROM STREAM(gps_bronze)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F dp.create_streaming_table( name="raw_gps_silver", comment="GPS pings with native geometry point for spatial joins", ) @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow") def raw_gps_silver_flow(): return ( spark.readStream.table("gps_bronze") .select( "device_id", "timestamp", "longitude", "latitude", F.expr("ST_Point(longitude, latitude)").alias("point_geom"), ) )Haga clic en
Ejecute el archivo o ejecute la canalización.
El gráfico de canalización ahora muestra gps_bronze y raw_gps_silver. A continuación, agregue las geocercas del almacén como una vista materializada.
Paso 5: Creación de la tabla definitiva de geovallas del almacén
Cree una vista materializada que lea las geovallas del volumen y convierta la columna WKT en una columna GEOMETRY mediante ST_GeomFromWKT.
En el explorador de recursos, haga clic en
Agregue y, a continuación, Transformación.
Establezca Nombre en
warehouse_geofences_gold, elija SQL o Python y haga clic en Crear.Pegue el código siguiente. Reemplace
<catalog>y<schema>por el catálogo y el esquema predeterminados.SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS SELECT warehouse_name, ST_GeomFromWKT(boundary_wkt) AS boundary_geom FROM read_files( "/Volumes/<catalog>/<schema>/raw_data/geofences", format => "json" )Python
from pyspark import pipelines as dp from pyspark.sql import functions as F path = "/Volumes/<catalog>/<schema>/raw_data/geofences" @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry") def warehouse_geofences_gold(): return ( spark.read.format("json").load(path).select( "warehouse_name", F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"), ) )Haga clic en
Ejecute el archivo o ejecute la canalización.
La canalización ahora incluye la tabla geofences. A continuación, agregue la unión espacial para calcular las llegadas al almacén.
Paso 6: Crear la tabla de llegadas al almacén con una combinación espacial
Agregue una vista materializada que une los puntos GPS plateados a las geovallas utilizando ST_Contains(boundary_geom, point_geom) para determinar cuándo un dispositivo está dentro de un polígono de almacenamiento.
En el explorador de recursos, haga clic en
Agregue y, a continuación, Transformación.
Establezca Nombre en
warehouse_arrivals, elija SQL o Python y haga clic en Crear.Pegue el código siguiente.
SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS SELECT g.device_id, g.timestamp, w.warehouse_name FROM raw_gps_silver g JOIN warehouse_geofences_gold w ON ST_Contains(w.boundary_geom, g.point_geom)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence") def warehouse_arrivals(): g = spark.read.table("raw_gps_silver") w = spark.read.table("warehouse_geofences_gold") return ( g.alias("g") .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)")) .select( F.col("g.device_id").alias("device_id"), F.col("g.timestamp").alias("timestamp"), F.col("w.warehouse_name").alias("warehouse_name"), ) )Haga clic en
Ejecute el archivo o ejecute la canalización.
Cuando se completa la actualización, el gráfico de canalización muestra los cuatro conjuntos de datos: gps_bronze, raw_gps_silver, warehouse_geofences_goldy warehouse_arrivals.
Verificar la unión espacial
Confirme que la combinación espacial produjo filas: los puntos de la tabla Silver que se encuentran dentro de una geovalla aparecen en warehouse_arrivals. Ejecute uno de los siguientes elementos en un cuaderno o en un editor de SQL (use el mismo catálogo y esquema que el destino de la canalización).
Recuento de llegadas por almacén (SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
Deberías observar conteos no nulos para Warehouse_A y Warehouse_B (los datos de GPS de ejemplo se superponen a ambos polígonos). Para inspeccionar las filas de ejemplo:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
Mismas comprobaciones en Python (cuaderno):
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
Si ve filas en warehouse_arrivals, la ST_Contains(boundary_geom, point_geom) unión funciona correctamente.
Paso 7: Programar la canalización (opcional)
Para mantener actualizada la tubería a medida que llegan nuevos datos GPS dentro del volumen de datos, cree un trabajo para ejecutar la tubería según una programación.
- En la parte superior del editor, elija el botón Programar .
- Si aparece el cuadro de diálogo Programaciones , elija Agregar programación.
- Opcionalmente, asigne un nombre al trabajo.
- De forma predeterminada, la programación se ejecuta una vez al día. Puede aceptar esto o establecer el suyo propio. Elegir Opciones avanzadas le permite establecer un tiempo específico; Más opciones le permiten agregar notificaciones de ejecución.
- Seleccione Crear para aplicar la programación.
Consulte Supervisión y observabilidad de trabajos de Lakeflow para más información sobre las ejecuciones de tareas.
Recursos adicionales
- Canalizaciones declarativas de Lakeflow Spark
- Conceptos de canalizaciones declarativas de Spark de Lakeflow
- Tablas de streaming
- Carga de datos en canalizaciones
-
GEOMETRYtipo - Funciones geoespaciales ST
- ¿Qué es Auto Loader?