다음을 통해 공유


파이프라인을 사용하여 데이터 변환

이 문서에서는 파이프라인을 사용하여 데이터 세트에 대한 변환을 선언하고 쿼리 논리를 통해 레코드를 처리하는 방법을 지정하는 방법을 설명합니다. 또한 파이프라인을 빌드하기 위한 일반적인 변환 패턴의 예도 포함되어 있습니다.

DataFrame을 반환하는 모든 쿼리에 대해 데이터 세트를 정의할 수 있습니다. Apache Spark 기본 제공 작업, UDF, 사용자 지정 논리 및 MLflow 모델을 Lakeflow Spark 선언적 파이프라인의 변환으로 사용할 수 있습니다. 파이프라인에 데이터를 수집한 후에는 업스트림 데이터 소스에 대해 새 데이터 세트를 정의하여 새 스트리밍 테이블, 구체화 뷰 및 일반 뷰를 만들 수 있습니다.

파이프라인에서 상태 저장 처리를 효과적으로 수행하는 방법을 알아보려면 워터마크를 사용하여 상태 저장 처리 최적화를 참조하세요.

뷰, 물리적 뷰 및 실시간 데이터 테이블의 사용 시기

파이프라인 쿼리를 구현할 때 최상의 데이터 세트 형식을 선택하여 효율적이고 유지 관리가 가능한지 확인합니다.

뷰를 사용해 다음 작업을 수행하세요.

  • 원하는 크거나 복잡한 쿼리를 관리하기 쉬운 쿼리로 분할합니다.
  • 기대치를 사용하여 중간 결과의 유효성을 검사합니다.
  • 유지할 필요가 없는 결과에 대한 스토리지 및 컴퓨팅 비용을 줄입니다. 테이블이 구체화되므로 추가 계산 및 스토리지 리소스가 필요합니다.

다음과 같은 경우 구체화된 뷰를 사용하는 것이 좋습니다.

  • 여러 다운스트림 쿼리는 테이블을 사용합니다. 뷰는 요청 시 계산되므로 뷰를 쿼리할 때마다 뷰가 다시 계산됩니다.
  • 다른 파이프라인, 작업 또는 쿼리는 테이블을 사용합니다. 뷰는 구체화되지 않으므로 동일한 파이프라인에서만 사용할 수 있습니다.
  • 개발 중에 쿼리 결과를 보려고 합니다. 테이블은 구체화되고 파이프라인 외부에서 보고 쿼리할 수 있으므로 개발 중에 테이블을 사용하면 계산의 정확성을 확인하는 데 도움이 될 수 있습니다. 유효성을 검사한 후 구체화할 필요가 없는 쿼리를 뷰로 변환합니다.

다음과 같은 경우 스트리밍 테이블을 사용하는 것이 좋습니다.

  • 쿼리는 지속적으로 또는 증분적으로 증가하는 데이터 원본에 대해 정의됩니다.
  • 쿼리 결과는 증분 방식으로 계산되어야 합니다.
  • 파이프라인에는 높은 처리량과 짧은 대기 시간이 필요합니다.

비고

스트리밍 테이블은 항상 스트리밍 원본에 대해 정의됩니다. AUTO CDC ... INTO 스트리밍 원본을 사용하여 CDC 피드의 업데이트를 적용할 수도 있습니다. AUTO CDC API: 파이프라인을 사용하여 변경 데이터 캡처 간소화를 참조하세요.

대상 스키마에서 테이블 제외

외부 사용을 위한 것이 아닌 중간 테이블을 계산해야 하는 경우 TEMPORARY 키워드를 사용하여 스키마에 게시되지 않도록 할 수 있습니다. 임시 테이블은 여전히 Lakeflow Spark 선언적 파이프라인 의미 체계에 따라 데이터를 저장하고 처리하지만 현재 파이프라인 외부에서 액세스해서는 안 됩니다. 임시 테이블은 생성되는 파이프라인의 수명 동안 유지됩니다. 다음 구문을 사용하여 임시 테이블을 선언합니다.

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

파이썬

@dp.table(
  temporary=True)
def temp_table():
  return ("...")

단일 파이프라인에서 스트리밍 테이블 및 구체화된 뷰 결합

스트리밍 테이블은 Apache Spark 구조적 스트리밍의 처리 보장을 상속하며 추가 전용 데이터 원본에서 쿼리를 처리하도록 구성됩니다. 여기서 새 행은 항상 수정되지 않고 원본 테이블에 삽입됩니다.

비고

기본적으로 스트리밍 테이블은 추가 전용 데이터 소스를 요구하지만, 스트리밍 소스가 수정 또는 삭제를 필요로 하는 다른 스트리밍 테이블인 경우 skipChangeCommits flag로 이 동작을 무시할 수 있습니다.

일반적인 스트리밍 패턴에는 원본 데이터를 수집하여 파이프라인에서 초기 데이터 세트를 만드는 작업이 포함됩니다. 이러한 초기 데이터 세트는 일반적으로 브론즈 테이블이라고 하며 간단한 변환을 수행하는 경우가 많습니다.

반면, 일반적으로 골드 테이블이라고 하는 파이프라인의 최종 테이블에는 복잡한 집계 또는 AUTO CDC ... INTO 작업의 대상에서 읽기가 필요한 경우가 많습니다. 이러한 작업은 기본적으로 추가가 아닌 업데이트를 만들기 때문에 스트리밍 테이블에 대한 입력으로 지원되지 않습니다. 이러한 변환은 구체화된 뷰에 더 적합합니다.

스트리밍 테이블과 구체화된 뷰를 단일 파이프라인으로 혼합하여 파이프라인을 간소화하고, 비용이 많이 드는 원시 데이터 수집 또는 다시 처리를 방지하고, SQL의 모든 기능을 통해 효율적으로 인코딩되고 필터링된 데이터 세트를 통해 복잡한 집계를 계산할 수 있습니다. 다음 예제에서는 이러한 유형의 혼합 처리를 보여 줍니다.

비고

이러한 예제에서는 자동 로더를 사용하여 클라우드 스토리지에서 파일을 로드합니다. Unity 카탈로그 사용 파이프라인에서 자동 로더를 사용하여 파일을 로드하려면 외부 위치를 사용해야 합니다. 파이프라인과 함께 Unity 카탈로그를 사용하는 방법에 대한 자세한 내용은 파이프라인에서 Unity 카탈로그 사용을 참조하세요.

파이썬

@dp.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dp.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("streaming_bronze").where(...)

@dp.materialized_view
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.read.table("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
  "abfss://path/to/raw/data",
  format => "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id

자동 로더 사용하여 Azure Storage에서 JSON 파일을 증분 방식으로 수집하는 방법에 대해 자세히 알아봅니다.

스트림-정적 조인

스트림-정적 조인은 주로 정적 차원 테이블을 사용하여 추가 전용 데이터의 연속 스트림을 비정규화할 때 적합합니다.

각 파이프라인 업데이트에서 스트림의 새 레코드는 정적 테이블의 최신 스냅샷과 조인됩니다. 스트리밍 테이블의 해당 데이터를 처리한 후 정적 테이블에서 레코드를 추가하거나 업데이트하는 경우 전체 새로 고침이 수행되지 않는 한 결과 레코드는 다시 계산되지 않습니다.

트리거된 실행을 위해 구성된 파이프라인에서 정적 테이블은 업데이트가 시작된 시점의 결과를 반환합니다. 연속 실행을 위해 구성된 파이프라인에서 테이블이 업데이트를 처리할 때마다 최신 버전의 정적 테이블이 쿼리됩니다.

다음은 스트림 정적 조인의 예입니다.

파이썬

@dp.table
def customer_sales():
  return spark.readStream.table("sales").join(spark.read.table("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
  INNER JOIN LEFT customers USING (customer_id)

집계를 효율적으로 계산하세요

스트리밍 테이블을 사용하여 개수, 최소, 최대 또는 합계와 같은 간단한 분산 집계와 평균 또는 표준 편차와 같은 대수 집계를 증분 방식으로 계산할 수 있습니다. Databricks는 GROUP BY country 절이 있는 쿼리와 같이 그룹 수가 제한된 쿼리에 대해 증분 집계를 권장합니다. 업데이트할 때마다 새 입력 데이터만 읽습니다.

증분 집계를 수행하는 Lakeflow Spark 선언적 파이프라인 쿼리를 작성하는 방법에 대한 자세한 내용은 워터마크를 사용하여 창에 표시된 집계 수행을 참조하세요.

Lakeflow Spark 선언적 파이프라인에서 MLflow 모델 사용

비고

Unity 카탈로그 사용 파이프라인에서 MLflow 모델을 사용하려면 preview 채널을 사용하도록 파이프라인을 구성해야 합니다. current 채널을 사용하려면 Hive 메타스토어에 게시하도록 파이프라인을 구성해야 합니다.

파이프라인에서 MLflow 학습 모델을 사용할 수 있습니다. MLflow 모델은 Azure Databricks에서 변환으로 처리됩니다. 즉, Spark DataFrame 입력에 따라 작동하고 결과를 Spark DataFrame으로 반환합니다. Lakeflow Spark 선언적 파이프라인은 DataFrames에 대한 데이터 세트를 정의하므로 MLflow를 사용하는 Apache Spark 워크로드를 몇 줄의 코드만 있는 파이프라인으로 변환할 수 있습니다. MLflow에 대한 자세한 내용은 ML 모델 수명 주기에 대한 MLflow를 참조하세요.

MLflow 모델을 호출하는 Python 스크립트가 이미 있는 경우 또는 @dp.table 데코레이터를 사용하여 @dp.materialized_view 변환 결과를 반환하도록 함수가 정의되도록 하여 이 코드를 파이프라인에 적용할 수 있습니다. Lakeflow Spark 선언적 파이프라인은 기본적으로 MLflow를 설치하지 않으므로, `%pip install mlflow`을 사용하여 MLFlow 라이브러리가 설치되었는지 확인하고, 원본의 맨 위에 `mlflow`과 `dp`가 임포트되었는지 확인하세요. 파이프라인 구문에 대한 소개는 Python을 사용하여 파이프라인 코드 개발을 참조하세요.

파이프라인에서 MLflow 모델을 사용하려면 다음 단계를 완료합니다.

  1. MLflow 모델의 실행 ID 및 모델 이름을 가져옵니다. 실행 ID 및 모델 이름은 MLflow 모델의 URI를 생성하는 데 사용됩니다.
  2. URI를 사용하여 MLflow 모델을 로드하는 Spark UDF를 정의합니다.
  3. 테이블 정의에서 UDF를 호출하여 MLflow 모델을 사용합니다.

다음 예제에서는 이 패턴의 기본 구문을 보여 줍니다.

%pip install mlflow

from pyspark import pipelines as dp
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dp.materialized_view
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

전체 예제로, 다음 코드는 대출 위험 데이터에 대해 학습된 MLflow 모델을 로드하는 loaded_model_udf로 명명된 Spark UDF를 정의합니다. 예측을 만드는 데 사용되는 데이터 열은 UDF에 인수로 전달됩니다. 테이블 loan_risk_predictionsloan_risk_input_data각 행에 대한 예측을 계산합니다.

%pip install mlflow

from pyspark import pipelines as dp
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dp.materialized_view(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

수동 삭제 또는 업데이트 유지

Lakeflow Spark 선언적 파이프라인을 사용하면 테이블에서 레코드를 수동으로 삭제하거나 업데이트하고 새로 고침 작업을 수행하여 다운스트림 테이블을 다시 계산할 수 있습니다.

기본적으로 파이프라인은 업데이트될 때마다 입력 데이터를 기반으로 테이블 결과를 다시 계산하므로 삭제된 레코드가 원본 데이터에서 다시 로드되지 않도록 해야 합니다. pipelines.reset.allowed 테이블 속성을 false 설정하면 테이블에 대한 새로 고침이 방지되지만 테이블에 대한 증분 쓰기 또는 새 데이터가 테이블로 흐르는 것을 방지하지는 않습니다.

다음 다이어그램에서는 두 개의 스트리밍 테이블을 사용하는 예제를 보여 줍니다.

  • raw_user_table 원본에서 원시 사용자 데이터를 수집합니다.
  • bmi_table raw_user_table체중과 높이를 사용하여 BMI 점수를 증분 계산합니다.

raw_user_table 사용자 레코드를 수동으로 삭제하거나 업데이트하고 bmi_table다시 계산하려고 합니다.

데이터 다이어그램 유지

다음 코드에서는 pipelines.reset.allowed 테이블 속성을 false 설정하여 의도한 변경 내용이 시간에 따라 유지되지만 파이프라인 업데이트가 실행될 때 다운스트림 테이블이 다시 계산되도록 raw_user_table 전체 새로 고침을 사용하지 않도록 설정하는 방법을 보여 줍니다.

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);