GPS 데이터를 수집하고, 좌표를 네이티브 공간 형식으로 변환하고, 웨어하우스 지오펜스에 조인하여 데이터 오케스트레이션 및 자동 로더에 대해 SDP(Lakeflow Spark 선언적 파이프라인)를 사용하여 도착을 추적하는 파이프라인을 만들고 배포하는 방법을 알아봅니다. 이 자습서에서는 Databricks 네이티브 공간 형식(GEOMETRY, GEOGRAPHY) 및 기본 제공 공간 함수(예: ST_Point, ST_GeomFromWKT및 ST_Contains)를 사용하므로 외부 라이브러리 없이 지리 공간 워크플로를 대규모로 실행할 수 있습니다.
이 자습서에서는 다음을 수행합니다.
- 파이프라인을 만들고 Unity 카탈로그 볼륨에서 샘플 GPS 및 지오펜스 데이터를 생성합니다.
- Auto Loader를 사용하여 원시 GPS 핑을 점진적으로 브론즈 스트리밍 테이블로 수집합니다.
- 위도 및 경도를 네이티브
GEOMETRY지점으로 변환하는 실버 스트리밍 테이블을 빌드합니다. - WKT 다각형에서 웨어하우스 지오펜스의 구체화된 뷰를 만듭니다.
- 공간 조인을 실행하여 웨어하우스 도착 테이블(어떤 디바이스가 어떤 지오펜스에 들어갔는가)을 생성합니다.
그 결과 브론즈(원시 GPS), 실버(기하학적 점), 골드(지오펜스 및 도착 이벤트)로 구성된 메달리온 스타일의 파이프라인이 생성됩니다. 자세한 내용은 메달리온 레이크하우스 아키텍처가 무엇인지에 대해 참조하세요.
요구 사항
이 자습서를 완료하려면 다음 요구 사항을 충족해야 합니다.
- Azure Databricks 작업 영역에 로그인합니다.
- 작업 영역에 Unity 카탈로그 를 사용하도록 설정합니다.
- 서버리스 Lakeflow Spark 선언적 파이프라인을 사용하려는 경우 계정에 대해 서버 리스 컴퓨팅 을 사용하도록 설정해야 합니다. 서버리스 컴퓨팅을 사용하도록 설정하지 않은 경우 단계는 작업 영역에 대한 기본 컴퓨팅과 함께 작동합니다.
- 컴퓨팅 리소스를 만들거나 컴퓨팅 리소스에 액세스할 수 있는 권한이 있습니다.
-
카탈로그에 새 스키마를 만들 수 있는 권한이 있습니다. 필요한 권한은 다음과
USE CATALOG같습니다CREATE SCHEMA. -
기존 스키마에 새 볼륨을 만들 수 있는 권한이 있습니다. 필요한 권한은 다음과
USE SCHEMA같습니다CREATE VOLUME. - 네이티브 공간 형식 및 공간 함수를 지원하는 런타임을 사용합니다.
1단계: 파이프라인 만들기
새 ETL 파이프라인을 만들고 테이블에 대한 기본 카탈로그 및 스키마를 설정합니다.
작업 영역에서
을 클릭합니다. 왼쪽 위 모서리에 새로 만들기
ETL 파이프라인을 클릭합니다.
파이프라인
Spatial pipeline tutorial의 제목을 원하는 이름으로 변경합니다.제목 아래에서 쓰기 권한이 있는 카탈로그 및 스키마를 선택합니다.
이 카탈로그 및 스키마는 코드에서 카탈로그 또는 스키마를 지정하지 않을 때 기본적으로 사용됩니다.
<catalog>다음 단계에서 이곳에서 선택한 값으로 바꿉니다<schema>.고급 옵션에서 빈 파일로 시작을 선택합니다.
코드에 대한 폴더를 선택합니다. 찾아보기를 선택하여 폴더를 선택할 수 있습니다. 버전 제어에 Git 폴더를 사용할 수 있습니다.
첫 번째 파일의 언어로 Python 또는 SQL 을 선택합니다. 나중에 다른 언어로 파일을 추가할 수 있습니다.
[선택]을 클릭하여 파이프라인을 만들고 Lakeflow 파이프라인 편집기를 엽니다.
이제 기본 카탈로그 및 스키마가 있는 빈 파이프라인이 있습니다. 다음으로 샘플 GPS 및 지오펜스 데이터를 만듭니다.
2단계: 샘플 GPS 및 지오펜스 데이터 만들기
이 단계에서는 JSON(원시 GPS ping) 및 웨어하우스 지오펜스(WKT 다각형이 있는 JSON)의 샘플 데이터를 볼륨에 생성합니다. GPS 지점은 두 웨어하우스 다각형과 겹치는 경계 상자에서 생성되므로 이후 단계의 공간 조인은 도착 행을 반환합니다. 볼륨 또는 테이블에 고유한 데이터가 이미 있는 경우 이 단계를 건너뛸 수 있습니다.
Lakeflow 파이프라인 편집기에서 자산 브라우저에서
을 클릭합니다.추가한 다음 탐색합니다.
이름을 설정하여
Setup spatial data을 선택하고 기본 대상 폴더를 그대로 둡니다.만들기를 클릭합니다.
새 Notebook에 다음 코드를 붙여넣습니다.
<catalog>1단계에서 설정한 기본 카탈로그 및 스키마로 대체<schema>합니다.Notebook에서 다음 코드를 사용하여 GPS 및 지오펜스 데이터를 생성합니다.
from pyspark.sql import functions as F catalog = "<catalog>" # for example, "main" schema = "<schema>" # for example, "default" spark.sql(f"USE CATALOG `{catalog}`") spark.sql(f"USE SCHEMA `{schema}`") spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`") volume_base = f"/Volumes/{catalog}/{schema}/raw_data" # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area) gps_path = f"{volume_base}/gps" df_gps = ( spark.range(0, 5000) .repartition(10) .select( F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"), F.current_timestamp().alias("timestamp"), (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1 (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2 ) ) df_gps.write.format("json").mode("overwrite").save(gps_path) print(f"Wrote 5000 GPS rows to {gps_path}") # Geofences: two warehouse polygons (WKT) in the same region geofences_path = f"{volume_base}/geofences" geofences_data = [ ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"), ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"), ] df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"]) df_geo.write.format("json").mode("overwrite").save(geofences_path) print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")Notebook 셀을 실행합니다(Shift + Enter).
실행이 완료되면 볼륨에는 gps (원시 ping)과 geofences (WKT의 다각형)이 포함됩니다. 다음 단계에서는 GPS 데이터를 브론즈 테이블로 적재합니다.
3단계: GPS 데이터를 브론즈 스트리밍 테이블에 수집
자동 로더를 사용하여 볼륨에서 원시 GPS JSON을 증분 방식으로 수집하고 브론즈 스트리밍 테이블에 씁니다.
자산 브라우저에서
을 클릭합니다.추가한 다음 변환합니다.
이름을 설정하여
gps_bronzeSQL 또는 Python을 선택하고 만들기를 클릭합니다.파일 내용을 다음으로 바꿉니다(언어와 일치하는 탭 사용).
<catalog>및<schema>을 사용자의 기본 카탈로그와 스키마로 대체하세요.SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze COMMENT "Raw GPS pings ingested from volume using Auto Loader"; CREATE FLOW gps_bronze_ingest_flow AS INSERT INTO gps_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/<catalog>/<schema>/raw_data/gps", format => "json", inferColumnTypes => "true" )파이썬
from pyspark import pipelines as dp path = "/Volumes/<catalog>/<schema>/raw_data/gps" dp.create_streaming_table( name="gps_bronze", comment="Raw GPS pings ingested from volume using Auto Loader", ) @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow") def gps_bronze_ingest_flow(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(path) )을 클릭합니다.파일을 실행하거나 파이프라인을 실행하여 업데이트를 실행합니다.
업데이트가 완료되면 파이프라인 그래프에 테이블이 gps_bronze 표시됩니다. 다음으로 좌표를 네이티브 기하 도형 지점으로 변환하는 실버 테이블을 추가합니다.
4단계: 지오메트리 포인트가 있는 실버 데이터 스트리밍 테이블 추가
브론즈 테이블에서 읽고 ST_Point(longitude, latitude)를 사용하여 GEOMETRY 열을 추가하는 스트리밍 테이블을 만듭니다.
자산 브라우저에서
을 클릭합니다.추가한 다음 변환합니다.
이름을 설정하여
raw_gps_silverSQL 또는 Python을 선택하고 만들기를 클릭합니다.다음 코드를 새 파일에 붙여넣습니다.
SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver COMMENT "GPS pings with native geometry point for spatial joins"; CREATE FLOW raw_gps_silver_flow AS INSERT INTO raw_gps_silver BY NAME SELECT device_id, timestamp, longitude, latitude, ST_Point(longitude, latitude) AS point_geom FROM STREAM(gps_bronze)파이썬
from pyspark import pipelines as dp from pyspark.sql import functions as F dp.create_streaming_table( name="raw_gps_silver", comment="GPS pings with native geometry point for spatial joins", ) @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow") def raw_gps_silver_flow(): return ( spark.readStream.table("gps_bronze") .select( "device_id", "timestamp", "longitude", "latitude", F.expr("ST_Point(longitude, latitude)").alias("point_geom"), ) )을 클릭합니다.파일을 실행하거나 파이프라인을 실행합니다.
이제 파이프라인 그래프가 gps_bronze 및 raw_gps_silver를 보여줍니다. 다음으로, 웨어하우스 지오펜스를 구체화된 뷰로 추가합니다.
5단계: 웨어하우스 지오펜스 골드 테이블 만들기
볼륨에서 지오펜스를 읽고 WKT 열을 GEOMETRY 열로 변환하는 구체화된 뷰를 ST_GeomFromWKT을 사용하여 만듭니다.
자산 브라우저에서
을 클릭합니다.추가한 다음 변환합니다.
이름을 설정하여
warehouse_geofences_goldSQL 또는 Python을 선택하고 만들기를 클릭합니다.다음 코드를 붙여넣으세요.
<catalog>및<schema>을 사용자의 기본 카탈로그와 스키마로 대체하세요.SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS SELECT warehouse_name, ST_GeomFromWKT(boundary_wkt) AS boundary_geom FROM read_files( "/Volumes/<catalog>/<schema>/raw_data/geofences", format => "json" )파이썬
from pyspark import pipelines as dp from pyspark.sql import functions as F path = "/Volumes/<catalog>/<schema>/raw_data/geofences" @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry") def warehouse_geofences_gold(): return ( spark.read.format("json").load(path).select( "warehouse_name", F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"), ) )을 클릭합니다.파일을 실행하거나 파이프라인을 실행합니다.
이제 파이프라인에 지오펜스 테이블이 포함됩니다. 다음으로, 창고 도착 계산을 위해 공간 조인을 추가합니다.
6단계: 공간 조인을 사용하여 웨어하우스 도착 테이블 만들기
디바이스가 창고 다각형 내에 있는지를 결정하기 위해 ST_Contains(boundary_geom, point_geom)을 사용하여 실버 GPS 지점을 지오펜스와 조인하는 구체화된 뷰를 추가합니다.
자산 브라우저에서
을 클릭합니다.추가한 다음 변환합니다.
이름을 설정하여
warehouse_arrivalsSQL 또는 Python을 선택하고 만들기를 클릭합니다.다음 코드를 붙여넣으세요.
SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS SELECT g.device_id, g.timestamp, w.warehouse_name FROM raw_gps_silver g JOIN warehouse_geofences_gold w ON ST_Contains(w.boundary_geom, g.point_geom)파이썬
from pyspark import pipelines as dp from pyspark.sql import functions as F @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence") def warehouse_arrivals(): g = spark.read.table("raw_gps_silver") w = spark.read.table("warehouse_geofences_gold") return ( g.alias("g") .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)")) .select( F.col("g.device_id").alias("device_id"), F.col("g.timestamp").alias("timestamp"), F.col("w.warehouse_name").alias("warehouse_name"), ) )을 클릭합니다.파일을 실행하거나 파이프라인을 실행합니다.
업데이트가 완료되면 파이프라인 그래프에 네 개의 데이터 세트gps_bronze( , , raw_gps_silverwarehouse_geofences_gold및 warehouse_arrivals)가 모두 표시됩니다.
공간 조인 확인
공간 병합이 행을 생성했는지 확인하십시오: 실버 테이블의 점이 지오펜스 내에 있을 경우 warehouse_arrivals에 나타납니다. Notebook 또는 SQL 편집기에서 다음 중 하나를 실행합니다(파이프라인 대상과 동일한 카탈로그 및 스키마 사용).
웨어하우스별 도착 횟수(SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
Warehouse_A 및 Warehouse_B에 대해 0이 아닌 개수가 표시되어야 합니다 (샘플 GPS 데이터가 두 다각형과 겹치기 때문입니다). 샘플 행을 검사하려면 다음을 수행합니다.
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
Python에서 동일한 검사(Notebook):
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
warehouse_arrivals에 행이 표시되면, ST_Contains(boundary_geom, point_geom) 결합이 제대로 작동하고 있습니다.
7단계: 파이프라인 예약(선택 사항)
새 GPS 데이터가 볼륨에 배치됨에 따라 파이프라인을 최신 상태로 유지하려면 일정에 따라 파이프라인을 실행하는 작업을 만듭니다.
- 편집기의 맨 위에서 일정 단추를 선택합니다.
- 일정 대화 상자 가 나타나면 일정 추가를 선택합니다.
- 필요에 따라 작업에 이름을 지정합니다.
- 기본적으로 일정은 하루에 한 번 실행됩니다. 이를 수락하거나 직접 설정할 수 있습니다. 고급을 선택하면 특정 시간을 설정할 수 있습니다. 추가 옵션을 사용하면 실행 알림을 추가할 수 있습니다.
- 만들기를 선택하여 일정을 적용합니다.
작업 실행에 대한 자세한 내용은 Lakeflow 작업의 모니터링 및 관찰 가능성을 참조하세요.