Lakeflow Spark SDP(선언적 파이프라인)에는 파이프라인에서 구체화된 뷰 및 스트리밍 테이블을 정의하기 위한 몇 가지 새로운 SQL 키워드 및 함수가 도입되었습니다. 파이프라인 개발을 위한 SQL 지원은 Spark SQL의 기본 사항을 기반으로 하며 구조적 스트리밍 기능에 대한 지원을 추가합니다.
PySpark DataFrames에 익숙한 사용자는 Python을 사용하여 파이프라인 코드를 개발하는 것을 선호할 수 있습니다. Python은 메타프로그래밍 작업과 같이 SQL로 구현하기 어려운 보다 광범위한 테스트 및 작업을 지원합니다. Python을 사용하여 파이프라인 코드 개발을 참조하세요.
파이프라인 SQL 구문에 대한 전체 참조는 파이프라인 SQL 언어 참조를 참조하세요.
파이프라인 개발을 위한 SQL의 기본 사항
파이프라인 데이터 세트를 만드는 SQL 코드는 구문을 사용하여 CREATE OR REFRESH 쿼리 결과에 대해 구체화된 뷰 및 스트리밍 테이블을 정의합니다.
키워드는 STREAM 절에서 참조되는 데이터 원본을 SELECT 스트리밍 의미 체계로 읽어야 하는지를 나타냅니다.
파이프라인 구성 중에 지정된 카탈로그 및 스키마에 대한 읽기 및 쓰기 기본값입니다. 대상 카탈로그 및 스키마를 설정하기 위해을 참조하세요.
파이프라인 소스 코드는 SQL 스크립트와 매우 다릅니다. SDP는 파이프라인에 구성된 모든 소스 코드 파일에서 모든 데이터 세트 정의를 평가하고 쿼리를 실행하기 전에 데이터 흐름 그래프를 빌드합니다. 소스 파일에 표시되는 쿼리 순서는 코드 평가 순서를 정의하지만 쿼리 실행 순서는 정의하지 않습니다.
SQL을 사용하여 구체화된 뷰 만들기
다음 코드 예제에서는 SQL을 사용하여 구체화된 뷰를 만들기 위한 기본 구문을 보여 줍니다.
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
SQL을 사용하여 스트리밍 테이블 만들기
다음 코드 예제에서는 SQL을 사용하여 스트리밍 테이블을 만들기 위한 기본 구문을 보여 줍니다. 스트리밍 테이블에 대한 원본을 읽을 때 키워드는 STREAM 원본에 대한 스트리밍 의미 체계를 사용했음을 나타냅니다. 구체화된 뷰를 만들 때는 STREAM 키워드를 사용하지 마세요.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
비고
소스에서 읽기 위해 스트리밍 의미론을 사용하려면 STREAM 키워드를 사용하세요. 기존 레코드에 대한 변경이나 삭제가 발생하면 읽기 작업 중 오류가 발생합니다. 가장 안전한 방법은 정적 또는 추가 전용 소스에서 읽는 것입니다. 변경 커밋이 포함된 데이터를 수집하려면, Python과 SkipChangeCommits 옵션을 사용하여 오류를 처리할 수 있습니다.
개체 스토리지에서 데이터 로드
파이프라인은 Azure Databricks에서 지원하는 모든 형식의 데이터 로드를 지원합니다. 데이터 형식 옵션을 참조하세요.
비고
이 예시들에서는 작업 공간에 자동으로 연결된 /databricks-datasets의 데이터를 사용할 수 있습니다. Databricks는 볼륨 경로 또는 클라우드 URI를 사용하여 클라우드 개체 스토리지에 저장된 데이터를 참조하는 것이 좋습니다.
Unity 카탈로그 볼륨이란 무엇인가?를 참조하십시오.
Databricks는 클라우드 개체 스토리지에 저장된 데이터에 대해 증분 수집 워크로드를 구성할 때 자동 로더 및 스트리밍 테이블을 사용하는 것이 좋습니다. 오토 로더란 무엇인가?를 참조하세요.
SQL은 이 read_files 함수를 사용하여 자동 로더 기능을 호출합니다.
STREAM 키워드를 사용하여 read_files와 스트리밍 읽기를 구성해야 합니다.
다음은 SQL에서 read_files의 구문을 설명합니다.
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
자동 로더에 대한 옵션은 키-값 쌍입니다. 지원되는 형식 및 옵션에 대한 자세한 내용은 옵션참조하세요.
다음 예제에서는 자동 로더를 사용하여 JSON 파일에서 스트리밍 테이블을 만듭니다.
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
read_files 함수는 물질화된 뷰를 생성하기 위한 배치 의미론도 지원합니다. 다음 예제에서는 일괄 처리 의미 체계를 사용하여 JSON 디렉터리를 읽고 구체화된 뷰를 만듭니다.
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
예상 데이터 유효성 검사
기대치를 사용하여 데이터 품질 제약 조건을 설정하고 적용할 수 있습니다. 파이프라인 기대를 사용하여 데이터 품질을 관리하기를 참조하세요.
다음 코드는 데이터 수집 중에 null인 레코드를 삭제하는 명명된 valid_data 예상을 정의합니다.
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
파이프라인에 정의된 구체화된 뷰와 스트리밍 테이블을 쿼리합니다.
다음 예제에서는 4개의 데이터 세트를 정의합니다.
- JSON 데이터를 로드하는
orders스트리밍 테이블입니다. - CSV 데이터를 로드하는
customers구체화된 뷰입니다. -
customer_orders및orders데이터 세트의 레코드를 조인하고, 주문 타임스탬프를 날짜로 캐스팅하고,customers,customer_id,order_number및state필드를 선택하는order_date명명된 구체화된 뷰입니다. - 각 상태에 대한 일일 주문 수를 집계하는
daily_orders_by_state명명된 구체화된 뷰입니다.
비고
파이프라인에서 뷰 또는 테이블을 쿼리할 때 카탈로그와 스키마를 직접 지정하거나 파이프라인에 구성된 기본값을 사용할 수 있습니다. 이 예제에서는 파이프라인에 대해 구성된 기본 카탈로그 및 스키마에서 orders, customers및 customer_orders 테이블을 작성하고 읽습니다.
레거시 게시 모드는 LIVE 스키마를 사용하여 파이프라인에 정의된 다른 구체화된 뷰 및 스트리밍 테이블을 쿼리합니다. 새 파이프라인에서 LIVE 스키마 구문은 자동으로 무시됩니다.
LIVE 스키마(레거시)를 참조하세요.
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
프라이빗 테이블 정의
구체화된 뷰 또는 스트리밍 테이블을 만들 때 PRIVATE 절을 사용할 수 있습니다. 프라이빗 테이블을 만들 때 테이블을 만들지만 테이블에 대한 메타데이터는 만들지 않습니다. 이 절은 PRIVATE 파이프라인에서 사용할 수 있지만 파이프라인 외부에서 액세스해서는 안 되는 테이블을 만들도록 SDP에 지시합니다. 처리 시간을 줄이기 위해 프라이빗 테이블은 단일 업데이트가 아니라 해당 테이블을 만드는 파이프라인의 수명 동안 유지됩니다.
프라이빗 테이블은 카탈로그의 테이블과 동일한 이름을 가질 수 있습니다. 파이프라인 내의 테이블에 대해 정규화되지 않은 이름을 지정하는 경우 프라이빗 테이블과 해당 이름의 카탈로그 테이블이 모두 있는 경우 프라이빗 테이블이 사용됩니다.
프라이빗 테이블은 이전에 임시 테이블로 참조되었습니다.
구체화된 뷰 또는 스트리밍 테이블에서 레코드를 영구적으로 삭제
GDPR 규정 준수와 같이 삭제 벡터가 활성화된 스트리밍 테이블에서 레코드를 영구적으로 삭제하려면 개체의 기본 델타 테이블에서 추가 작업을 수행해야 합니다. 스트리밍 테이블에서 레코드를 삭제하려면 스트리밍 테이블에서 레코드를 영구적으로 삭제하는 방법을참조하세요.
구체화된 뷰는 새로 고칠 때 항상 기본 테이블의 데이터를 반영합니다. 구체화된 뷰에서 데이터를 삭제하려면 원본에서 데이터를 삭제하고 구체화된 뷰를 새로 고쳐야 합니다.
SQL을 사용하여 테이블 또는 뷰를 선언할 때 사용되는 값 매개 변수화
SET를 사용하여 Spark 구성을 포함한 테이블 또는 보기를 선언하는 쿼리에서 구성 값을 지정합니다.
SET 명령문 이후 소스 파일에서 정의하는 모든 테이블이나 뷰는 정의된 값에 액세스할 수 있습니다.
SET 문을 사용하여 지정된 모든 Spark 구성은 SET 문 다음에 있는 테이블 또는 뷰에 대해 Spark 쿼리를 실행할 때 사용됩니다. 쿼리의 구성 값을 읽으려면 문자열 보간 구문 ${}를 사용합니다. 다음 예제에서는 startDate라는 Spark 구성 값을 설정하고 쿼리에서 해당 값을 사용합니다.
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
여러 구성 값을 지정하려면 각 값에 대해 별도의 SET 문을 사용합니다.
제한점
PIVOT 절은 지원되지 않습니다. Spark에서 pivot 작업을 수행하려면 출력 스키마를 계산하기 위해 입력 데이터를 즉시 로드해야 합니다. 이 기능은 파이프라인에서 지원되지 않습니다.
비고
구체화된 뷰를 만드는 CREATE OR REFRESH LIVE TABLE 구문은 더 이상 사용되지 않습니다. 대신 CREATE OR REFRESH MATERIALIZED VIEW을 사용합니다.