Partilhar via


Tutorial: Constrói um pipeline geoespacial com tipos espaciais nativos

Aprenda a criar e implementar um pipeline que incorpore dados GPS, converta coordenadas em tipos espaciais nativos e realize junções com geofences de armazéns para rastrear chegadas, usando Lakeflow Spark Declarative Pipelines (SDP) para orquestração de dados e Auto Loader. Este tutorial utiliza os tipos espaciais nativos do Databrick (GEOMETRY, GEOGRAPHY) e funções espaciais incorporadas como ST_Point, ST_GeomFromWKT, e ST_Contains, para que possa executar fluxos de trabalho geoespaciais em escala sem bibliotecas externas.

Neste tutorial, você irá:

  • Crie um pipeline e gere dados de amostra de GPS e geofence num volume do Unity Catalog.
  • Ingerir pings GPS brutos incrementalmente com o Auto Loader numa tabela de streaming de bronze.
  • Constrói uma tabela de streaming prateada que converta latitude e longitude para um ponto nativo GEOMETRY .
  • Crie uma visão materializada das cercas geográficas de armazéns a partir dos polígonos WKT.
  • Executar uma associação espacial para produzir uma tabela das chegadas ao armazém (qual dispositivo entrou em qual geocerca).

O resultado é um pipeline ao estilo medalhão: bronze (GPS bruto), prata (pontos como geometria) e ouro (geofences e eventos de chegada). Consulte O que é a arquitetura de medallion lakehouse? para obter mais informações.

Requisitos

Para concluir este tutorial, você deve atender aos seguintes requisitos:

Etapa 1: Criar um pipeline

Crie um novo pipeline ETL e defina o catálogo e esquema predefinidos para as suas tabelas.

  1. No seu espaço de trabalho, clique no ícone Mais. Novo no canto superior esquerdo.

  2. Clique em Pipeline ETL.

  3. Altere o título do pipeline para Spatial pipeline tutorial ou um nome de sua preferência.

  4. Sob o título, escolha um catálogo e esquema para o qual tenhas permissões de escrita.

    Este catálogo e esquema são utilizados por defeito quando não especifica um catálogo ou esquema no seu código. Substitua <catalog> e <schema> nos passos seguintes pelos valores que escolher aqui.

  5. Em Opções avançadas, selecione Iniciar com um arquivo vazio.

  6. Escolha uma pasta para o seu código. Pode selecionar Explorar para escolher uma pasta; podes usar uma pasta Git para controlo de versões.

  7. Escolha Python ou SQL como linguagem do seu primeiro ficheiro. Podes adicionar ficheiros na outra língua mais tarde.

  8. Clique em Select para criar o oleoduto e abra o Editor de Pipelines Lakeflow.

Agora você tem um pipeline em branco com um catálogo e esquema padrão. De seguida, crie os dados de GPS e geofence de exemplo.

Passo 2: Criar os dados de amostra de GPS e geofence

Esta etapa gera dados de amostra num volume: pings GPS brutos (JSON) e geofences de armazém (JSON com polígonos WKT). Os pontos GPS são gerados numa caixa delimitadora que sobrepõe os dois polígonos do armazém, pelo que a junção espacial numa etapa posterior devolverá as linhas de chegada. Pode saltar este passo se já tiver os seus próprios dados num volume ou tabela.

  1. No Editor de Pipelines Lakeflow, no gestor de ativos, clique no ícone Mais, Adicionar, depois Exploração.

  2. Defina o Nome para Setup spatial data, escolha Python e saia da pasta de destino predefinida.

  3. Clique em Criar.

  4. No novo caderno, cole o seguinte código. Substitui <catalog> e <schema> pelo catálogo e esquema padrão que definiste no Passo 1.

    Use o código seguinte no caderno para gerar dados de GPS e geofence.

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. Executa a célula do caderno (Shift + Enter).

Após a execução terminar, o volume contém gps (pings brutos) e geofences (polígonos em WKT). No passo seguinte, ingere os dados GPS numa tabela de bronze.

Passo 3: Importar dados GPS numa tabela de streaming bronze

Ingere o JSON GPS bruto do volume de forma incremental usando o Auto Loader e escreva numa tabela bronze de streaming.

  1. No browser de ativos, clique no ícone Plus.Soma, depois Transformação.

  2. Definagps_bronze para , escolha SQL ou Python, e clique em Criar.

  3. Substitua os conteúdos do ficheiro pelo seguinte (utilize o separador de idioma que corresponder à sua língua). Substitui <catalog> e <schema> pelo teu catálogo e esquema padrão.

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. Clica no ícone Play.Executa ficheiro ou Executa pipeline para correr uma atualização.

Quando a atualização termina, o gráfico do pipeline mostra a gps_bronze tabela. De seguida, adiciona uma tabela prateada que converte coordenadas para um ponto geométrico nativo.

Passo 4: Adicione uma tabela de transmissão de dados prateada com pontos geométricos

Crie uma tabela de streaming que leia a partir da tabela bronze e adicione uma GEOMETRY coluna usando ST_Point(longitude, latitude).

  1. No browser de ativos, clique no ícone Plus.Soma, depois Transformação.

  2. Definaraw_gps_silver para , escolha SQL ou Python, e clique em Criar.

  3. Cole o código a seguir no novo arquivo.

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. Clique no ícone Play.Executar ficheiro ou Executar pipeline.

Agora o gráfico do pipeline mostra gps_bronze e raw_gps_silver. Em seguida, adicione as delimitações geográficas do armazém como uma vista materializada.

Passo 5: Criar a tabela mestre de geodelimitações de armazém

Crie uma vista materializada que leia as geofences a partir do volume e converta a coluna WKT numa GEOMETRY coluna usando ST_GeomFromWKT.

  1. No browser de ativos, clique no ícone Plus.Soma, depois Transformação.

  2. Definawarehouse_geofences_gold para , escolha SQL ou Python, e clique em Criar.

  3. Cole o código a seguir. Substitui <catalog> e <schema> pelo teu catálogo e esquema padrão.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. Clique no ícone Play.Executar ficheiro ou Executar pipeline.

O pipeline inclui agora a tabela de geofences. De seguida, adiciona a junção espacial para calcular as chegadas ao armazém.

Passo 6: Crie a tabela de chegadas do armazém com uma junção espacial

Adicione uma vista materializada que une os pontos GPS Silver às geofences, utilizando ST_Contains(boundary_geom, point_geom) para determinar quando um dispositivo está dentro de um polígono de armazém.

  1. No browser de ativos, clique no ícone Plus.Soma, depois Transformação.

  2. Definawarehouse_arrivals para , escolha SQL ou Python, e clique em Criar.

  3. Cole o código a seguir.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. Clique no ícone Play.Executar ficheiro ou Executar pipeline.

Quando a atualização termina, o grafo do pipeline mostra os quatro conjuntos de dados: gps_bronze, raw_gps_silver, warehouse_geofences_gold, e warehouse_arrivals.

Verificar a junção espacial

Confirme que a junção espacial produziu linhas: pontos da tabela prateada que caem dentro de uma geocerca aparecem em warehouse_arrivals. Executa um dos seguintes num caderno ou editor SQL (usa o mesmo catálogo e esquema que o teu destino do pipeline).

Contar as chegadas por armazém (SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

Deverá ver contagens não nulas para Warehouse_A e Warehouse_B (os dados GPS de amostra sobrepõem-se a ambos os polígonos). Para inspecionar as filas de amostras:

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Mesmas verificações em Python (notebook):

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

Se vires linhas em warehouse_arrivals, a ST_Contains(boundary_geom, point_geom) junção está a funcionar corretamente.

Passo 7: Programar o pipeline (opcional)

Para manter o pipeline atualizado à medida que novos dados GPS chegam ao volume, crie uma tarefa para executar o pipeline de forma programada.

  1. Na parte superior do editor, escolha o botão Agendar .
  2. Se a caixa de diálogo Agendas for exibida, escolha Adicionar agenda.
  3. Opcionalmente, dê um nome ao trabalho.
  4. Por padrão, a programação é executada uma vez por dia. Podes aceitar isto ou definir o teu próprio. Escolher Avançado permite-te definir um tempo específico; Mais opções permitem adicionar notificações de corrida.
  5. Selecione Criar para aplicar o horário.

Consulte Monitorização e observabilidade de tarefas do Lakeflow para obter mais informações sobre execuções de tarefas.

Recursos adicionais