다음을 통해 공유


자습서: 네이티브 공간 형식을 사용하여 지리 공간 파이프라인 빌드

GPS 데이터를 수집하고, 좌표를 네이티브 공간 형식으로 변환하고, 웨어하우스 지오펜스에 조인하여 데이터 오케스트레이션 및 자동 로더에 대해 SDP(Lakeflow Spark 선언적 파이프라인)를 사용하여 도착을 추적하는 파이프라인을 만들고 배포하는 방법을 알아봅니다. 이 자습서에서는 Databricks 네이티브 공간 형식(GEOMETRY, GEOGRAPHY) 및 기본 제공 공간 함수(예: ST_Point, ST_GeomFromWKTST_Contains)를 사용하므로 외부 라이브러리 없이 지리 공간 워크플로를 대규모로 실행할 수 있습니다.

이 자습서에서는 다음을 수행합니다.

  • 파이프라인을 만들고 Unity 카탈로그 볼륨에서 샘플 GPS 및 지오펜스 데이터를 생성합니다.
  • Auto Loader를 사용하여 원시 GPS 핑을 점진적으로 브론즈 스트리밍 테이블로 수집합니다.
  • 위도 및 경도를 네이티브 GEOMETRY 지점으로 변환하는 실버 스트리밍 테이블을 빌드합니다.
  • WKT 다각형에서 웨어하우스 지오펜스의 구체화된 뷰를 만듭니다.
  • 공간 조인을 실행하여 웨어하우스 도착 테이블(어떤 디바이스가 어떤 지오펜스에 들어갔는가)을 생성합니다.

그 결과 브론즈(원시 GPS), 실버(기하학적 점), 골드(지오펜스 및 도착 이벤트)로 구성된 메달리온 스타일의 파이프라인이 생성됩니다. 자세한 내용은 메달리온 레이크하우스 아키텍처가 무엇인지에 대해 참조하세요.

요구 사항

이 자습서를 완료하려면 다음 요구 사항을 충족해야 합니다.

1단계: 파이프라인 만들기

새 ETL 파이프라인을 만들고 테이블에 대한 기본 카탈로그 및 스키마를 설정합니다.

  1. 작업 영역에서 더하기 아이콘 을 클릭합니다. 왼쪽 위 모서리에 새로 만들기

  2. ETL 파이프라인을 클릭합니다.

  3. 파이프라인 Spatial pipeline tutorial 의 제목을 원하는 이름으로 변경합니다.

  4. 제목 아래에서 쓰기 권한이 있는 카탈로그 및 스키마를 선택합니다.

    이 카탈로그 및 스키마는 코드에서 카탈로그 또는 스키마를 지정하지 않을 때 기본적으로 사용됩니다. <catalog>다음 단계에서 이곳에서 선택한 값으로 바꿉니다<schema>.

  5. 고급 옵션에서 빈 파일로 시작을 선택합니다.

  6. 코드에 대한 폴더를 선택합니다. 찾아보기를 선택하여 폴더를 선택할 수 있습니다. 버전 제어에 Git 폴더를 사용할 수 있습니다.

  7. 첫 번째 파일의 언어로 Python 또는 SQL 을 선택합니다. 나중에 다른 언어로 파일을 추가할 수 있습니다.

  8. [선택]을 클릭하여 파이프라인을 만들고 Lakeflow 파이프라인 편집기를 엽니다.

이제 기본 카탈로그 및 스키마가 있는 빈 파이프라인이 있습니다. 다음으로 샘플 GPS 및 지오펜스 데이터를 만듭니다.

2단계: 샘플 GPS 및 지오펜스 데이터 만들기

이 단계에서는 JSON(원시 GPS ping) 및 웨어하우스 지오펜스(WKT 다각형이 있는 JSON)의 샘플 데이터를 볼륨에 생성합니다. GPS 지점은 두 웨어하우스 다각형과 겹치는 경계 상자에서 생성되므로 이후 단계의 공간 조인은 도착 행을 반환합니다. 볼륨 또는 테이블에 고유한 데이터가 이미 있는 경우 이 단계를 건너뛸 수 있습니다.

  1. Lakeflow 파이프라인 편집기에서 자산 브라우저에서 더하기 아이콘을 클릭합니다.추가한 다음 탐색합니다.

  2. 이름을 설정하여 Setup spatial data선택하고 기본 대상 폴더를 그대로 둡니다.

  3. 만들기를 클릭합니다.

  4. 새 Notebook에 다음 코드를 붙여넣습니다. <catalog> 1단계에서 설정한 기본 카탈로그 및 스키마로 대체 <schema> 합니다.

    Notebook에서 다음 코드를 사용하여 GPS 및 지오펜스 데이터를 생성합니다.

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. Notebook 셀을 실행합니다(Shift + Enter).

실행이 완료되면 볼륨에는 gps (원시 ping)과 geofences (WKT의 다각형)이 포함됩니다. 다음 단계에서는 GPS 데이터를 브론즈 테이블로 적재합니다.

3단계: GPS 데이터를 브론즈 스트리밍 테이블에 수집

자동 로더를 사용하여 볼륨에서 원시 GPS JSON을 증분 방식으로 수집하고 브론즈 스트리밍 테이블에 씁니다.

  1. 자산 브라우저에서 더하기 아이콘을 클릭합니다.추가한 다음 변환합니다.

  2. 이름을 설정하여 gps_bronzeSQL 또는 Python을 선택하고 만들기를 클릭합니다.

  3. 파일 내용을 다음으로 바꿉니다(언어와 일치하는 탭 사용). <catalog><schema>을 사용자의 기본 카탈로그와 스키마로 대체하세요.

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    파이썬

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. 재생 아이콘 을 클릭합니다.파일을 실행하거나 파이프라인을 실행하여 업데이트를 실행합니다.

업데이트가 완료되면 파이프라인 그래프에 테이블이 gps_bronze 표시됩니다. 다음으로 좌표를 네이티브 기하 도형 지점으로 변환하는 실버 테이블을 추가합니다.

4단계: 지오메트리 포인트가 있는 실버 데이터 스트리밍 테이블 추가

브론즈 테이블에서 읽고 ST_Point(longitude, latitude)를 사용하여 GEOMETRY 열을 추가하는 스트리밍 테이블을 만듭니다.

  1. 자산 브라우저에서 더하기 아이콘을 클릭합니다.추가한 다음 변환합니다.

  2. 이름을 설정하여 raw_gps_silverSQL 또는 Python을 선택하고 만들기를 클릭합니다.

  3. 다음 코드를 새 파일에 붙여넣습니다.

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    파이썬

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. 재생 아이콘 을 클릭합니다.파일을 실행하거나 파이프라인을 실행합니다.

이제 파이프라인 그래프가 gps_bronzeraw_gps_silver를 보여줍니다. 다음으로, 웨어하우스 지오펜스를 구체화된 뷰로 추가합니다.

5단계: 웨어하우스 지오펜스 골드 테이블 만들기

볼륨에서 지오펜스를 읽고 WKT 열을 GEOMETRY 열로 변환하는 구체화된 뷰를 ST_GeomFromWKT을 사용하여 만듭니다.

  1. 자산 브라우저에서 더하기 아이콘을 클릭합니다.추가한 다음 변환합니다.

  2. 이름을 설정하여 warehouse_geofences_goldSQL 또는 Python을 선택하고 만들기를 클릭합니다.

  3. 다음 코드를 붙여넣으세요. <catalog><schema>을 사용자의 기본 카탈로그와 스키마로 대체하세요.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    파이썬

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. 재생 아이콘 을 클릭합니다.파일을 실행하거나 파이프라인을 실행합니다.

이제 파이프라인에 지오펜스 테이블이 포함됩니다. 다음으로, 창고 도착 계산을 위해 공간 조인을 추가합니다.

6단계: 공간 조인을 사용하여 웨어하우스 도착 테이블 만들기

디바이스가 창고 다각형 내에 있는지를 결정하기 위해 ST_Contains(boundary_geom, point_geom)을 사용하여 실버 GPS 지점을 지오펜스와 조인하는 구체화된 뷰를 추가합니다.

  1. 자산 브라우저에서 더하기 아이콘을 클릭합니다.추가한 다음 변환합니다.

  2. 이름을 설정하여 warehouse_arrivalsSQL 또는 Python을 선택하고 만들기를 클릭합니다.

  3. 다음 코드를 붙여넣으세요.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    파이썬

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. 재생 아이콘 을 클릭합니다.파일을 실행하거나 파이프라인을 실행합니다.

업데이트가 완료되면 파이프라인 그래프에 네 개의 데이터 세트gps_bronze( , , raw_gps_silverwarehouse_geofences_goldwarehouse_arrivals)가 모두 표시됩니다.

공간 조인 확인

공간 병합이 행을 생성했는지 확인하십시오: 실버 테이블의 점이 지오펜스 내에 있을 경우 warehouse_arrivals에 나타납니다. Notebook 또는 SQL 편집기에서 다음 중 하나를 실행합니다(파이프라인 대상과 동일한 카탈로그 및 스키마 사용).

웨어하우스별 도착 횟수(SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

Warehouse_AWarehouse_B에 대해 0이 아닌 개수가 표시되어야 합니다 (샘플 GPS 데이터가 두 다각형과 겹치기 때문입니다). 샘플 행을 검사하려면 다음을 수행합니다.

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Python에서 동일한 검사(Notebook):

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

warehouse_arrivals에 행이 표시되면, ST_Contains(boundary_geom, point_geom) 결합이 제대로 작동하고 있습니다.

7단계: 파이프라인 예약(선택 사항)

새 GPS 데이터가 볼륨에 배치됨에 따라 파이프라인을 최신 상태로 유지하려면 일정에 따라 파이프라인을 실행하는 작업을 만듭니다.

  1. 편집기의 맨 위에서 일정 단추를 선택합니다.
  2. 일정 대화 상자 나타나면 일정 추가를 선택합니다.
  3. 필요에 따라 작업에 이름을 지정합니다.
  4. 기본적으로 일정은 하루에 한 번 실행됩니다. 이를 수락하거나 직접 설정할 수 있습니다. 고급을 선택하면 특정 시간을 설정할 수 있습니다. 추가 옵션을 사용하면 실행 알림을 추가할 수 있습니다.
  5. 만들기를 선택하여 일정을 적용합니다.

작업 실행에 대한 자세한 내용은 Lakeflow 작업의 모니터링 및 관찰 가능성을 참조하세요.

추가 리소스