자습서 4부: 일괄 처리 채점 수행 및 레이크하우스에 예측 저장

이 자습서에서는 Microsoft Fabric MLflow 모델 레지스트리를 사용하여 3부에서 학습된 등록된 LightGBMClassifier 모델을 가져오고 Lakehouse에서 로드된 테스트 데이터 세트에 대한 일괄 처리 예측을 수행하는 방법을 알아봅니다.

Microsoft Fabric을 사용하면 모든 컴퓨팅 엔진에서 일괄 처리 채점을 지원하는 PREDICT라는 확장성 있는 함수를 사용하여 기계 학습 모델을 운영할 수 있습니다. Microsoft Fabric Notebook 또는 지정된 모델의 항목 페이지에서 직접 일괄 처리 예측을 생성할 수 있습니다. PREDICT에 대해 알아봅니다.

테스트 데이터 세트에서 일괄 처리 예측을 생성하려면 학습된 모든 기계 학습 모델 중에서 최상의 성능을 보여 주는 학습된 LightGBM 모델 버전 1을 사용합니다. 테스트 데이터 세트를 spark DataFrame에 로드하고 MLFlowTransformer 개체를 만들어 일괄 처리 예측을 생성합니다. 그런 다음 다음 세 가지 방법 중 하나를 사용하여 PREDICT 함수를 호출할 수 있습니다.

  • SynapseML의 변환기 API
  • Spark SQL API
  • PySpark UDF(사용자 정의 함수)

필수 조건

  • Microsoft Fabric 구독가져옵니다. 또는 무료 Microsoft Fabric 평가판에 등록합니다.

  • Microsoft Fabric에 로그인합니다.

  • 홈페이지 왼쪽의 환경 전환기를 사용하여 Synapse 데이터 과학 환경으로 전환합니다.

    Screenshot of the experience switcher menu, showing where to select Data Science.

자습서 시리즈의 5부 중 4부입니다. 이 자습서를 완료하려면 먼저 다음을 완료합니다.

Notebook에서 팔로우

4-predict.ipynb 는 이 자습서와 함께 제공되는 Notebook입니다.

이 자습서에 대해 함께 제공되는 Notebook을 열려면 데이터 과학 자습서를 위해 시스템 준비 자습서의 지침에 따라 Notebook을 작업 영역으로 가져옵니다.

이 페이지에서 코드를 복사하여 붙여 넣으면 새 Notebook을 만들 수 있습니다.

코드 실행을 시작하기 전에 Lakehouse를 Notebook에 연결해야 합니다.

Important

이 시리즈의 다른 부분에서 사용한 것과 동일한 레이크하우스를 연결합니다.

테스트 데이터 로드

3부에 저장한 테스트 데이터를 로드합니다.

df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)

변환기 API를 사용하여 예측

SynapseML에서 Transformer API를 사용하려면 먼저 MLFlowTransformer 개체를 만들어야 합니다.

MLFlowTransformer 개체 인스턴스화

MLFlowTransformer 개체는 3부에 등록한 MLFlow 모델 주위의 래퍼입니다. 지정된 DataFrame에서 일괄 처리 예측을 생성할 수 있습니다. MLFlowTransformer 개체를 인스턴스화하려면 다음 매개 변수를 제공해야 합니다.

  • 모델에 대한 입력으로 필요한 테스트 DataFrame의 열입니다(이 경우 모두 필요).
  • 새 출력 열의 이름입니다(이 경우 예측).
  • 예측을 생성하는 올바른 모델 이름 및 모델 버전입니다(이 경우 lgbm_sm 버전 1).
from synapse.ml.predict import MLFlowTransformer

model = MLFlowTransformer(
    inputCols=list(df_test.columns),
    outputCol='predictions',
    modelName='lgbm_sm',
    modelVersion=1
)

이제 MLFlowTransformer 개체가 있으므로 이를 사용하여 일괄 처리 예측을 생성할 수 있습니다.

import pandas

predictions = model.transform(df_test)
display(predictions)

Spark SQL API를 사용하여 예측

다음 코드는 Spark SQL API를 사용하여 PREDICT 함수를 호출합니다.

from pyspark.ml.feature import SQLTransformer 

# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns

sqlt = SQLTransformer().setStatement( 
    f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")

# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))

UDF(사용자 정의 함수)를 사용하여 PREDICT

다음 코드는 PySpark UDF를 사용하여 PREDICT 함수를 호출합니다.

from pyspark.sql.functions import col, pandas_udf, udf, lit

# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns

display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))

모델의 항목 페이지에서 PREDICT 코드를 생성할 수도 있습니다. PREDICT에 대해 알아봅니다.

레이크하우스에 모델 예측 결과 작성

일괄 처리 예측을 생성한 후에는 모델 예측 결과를 레이크하우스에 다시 씁니다.

# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")

다음 단계

계속 진행: