다음을 통해 공유


자동화된 ML을 사용하여 모델 만들기(미리 보기)

AutoML(자동화된 Machine Learning)에는 최소한의 사용자 개입으로 기계 학습 모델을 학습하고 최적화하는 프로세스를 간소화하도록 설계된 일련의 기술과 도구가 포함됩니다. AutoML의 주요 목적은 일반적으로 상당한 전문 지식과 계산 리소스를 요구하는 작업인 지정된 데이터 세트에 가장 적합한 기계 학습 모델 및 하이퍼 매개 변수의 선택을 단순화하고 가속화하는 것입니다. 패브릭 프레임워크 내에서 데이터 과학자는 flaml.AutoML 모듈을 활용하여 기계 학습 워크플로의 다양한 측면을 자동화할 수 있습니다.

이 문서에서는 Spark 데이터 세트를 사용하여 코드에서 직접 AutoML 평가판을 생성하는 프로세스를 자세히 설명합니다. 또한 이 데이터를 Pandas 데이터 프레임으로 변환하는 방법을 살펴보고 실험 시험을 병렬화하는 기술에 대해 설명합니다.

Important

이 기능은 미리 보기로 제공됩니다.

필수 조건

  • 패브릭 환경을 만들거나 Fabric 런타임 1.2(Spark 3.4 이상) 및 델타 2.4에서 실행 중인지 확인합니다.
  • 새 Notebook을 만듭니다.
  • 레이크하우스에 Notebook을 첨부합니다. Notebook 왼쪽에서 추가를 선택하여 기존 레이크하우스를 추가하거나 새 레이크하우스를 만듭니다.

데이터 로드 및 준비

이 섹션에서는 데이터에 대한 다운로드 설정을 지정한 다음, 레이크하우스에 저장합니다.

데이터 다운로드

이 코드 블록은 원격 원본에서 데이터를 다운로드하고 레이크하우스에 저장합니다.

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Spark 데이터 프레임에 데이터 로드

다음 코드 블록은 CSV 파일의 데이터를 Spark DataFrame으로 로드하고 효율적인 처리를 위해 캐시합니다.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

이 코드는 데이터 파일이 다운로드되어 지정된 경로에 있다고 가정합니다. CSV 파일을 Spark DataFrame으로 읽고, 스키마를 유추하고, 후속 작업 중에 더 빠르게 액세스할 수 있도록 캐시합니다.

데이터 준비

이 섹션에서는 데이터 세트에 대한 데이터 정리 및 기능 엔지니어링을 수행합니다.

데이터 정리

먼저 데이터가 누락된 행 삭제, 특정 열에 따라 중복 행 제거 및 불필요한 열 삭제를 포함하여 데이터를 정리하는 함수를 정의합니다.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

clean_data 함수는 불필요한 열을 제거하는 동안 데이터 세트에 누락된 값과 중복 항목이 없는지 확인하는 데 도움이 됩니다.

기능 엔지니어링

다음으로, 원 핫 인코딩을 사용하여 '지역' 및 '성별' 열에 대한 더미 열을 만들어 기능 엔지니어링을 수행합니다.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

여기서는 원 핫 인코딩으로 범주 열을 이진 더미 열로 변환하여 기계 학습 알고리즘에 적합합니다.

정리된 데이터 표시

마지막으로 디스플레이 함수를 사용하여 정리되고 기능 엔지니어링된 데이터 세트를 표시합니다.


display(df_clean)

이 단계를 사용하면 적용된 변환을 사용하여 결과 DataFrame을 검사할 수 있습니다.

레이크하우스에 저장

이제 정리되고 기능 엔지니어링된 데이터 세트를 레이크하우스에 저장합니다.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

여기서는 정리되고 변환된 PySpark DataFrame인 df_clean을 가져와 레이크하우스에서 "churn_data_clean"라는 델타 테이블로 저장합니다. 데이터 세트의 효율적인 버전 관리 및 관리를 위해 델타 형식을 사용합니다. mode("overwrite")는 같은 이름의 기존 테이블이 덮어쓰기 되는 경우 새로운 버전을 만들도록 해줍니다.

테스트 및 학습 데이터 세트 만들기

다음으로 정리되고 기능 엔지니어링된 데이터에서 테스트 및 학습 데이터 세트를 만듭니다.

제공된 코드 섹션에서는 델타 형식을 사용하여 레이크하우스에서 정리되고 기능 엔지니어링된 데이터 세트를 로드하고, 80-20 비율로 학습 및 테스트 세트로 분할하고, 기계 학습을 위해 데이터를 준비합니다. 이 준비에는 PySpark ML에서 VectorAssembler를 가져와 기능 열을 단일 "기능" 열로 결합하는 작업이 포함됩니다. 그런 다음 VectorAssembler를 사용해 학습 및 테스트 데이터 세트를 변환하여 대상 변수 "Exited"와 기능 벡터를 포함하는 train_datatest_data DataFrame을 만듭니다. 이러한 데이터 세트는 이제 기계 학습 모델을 빌드하고 평가하는 데 사용할 준비가 되었습니다.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

기준 모델 학습

주요 데이터를 사용하여 기준 기계 학습 모델을 학습시키고, 실험 추적을 위한 MLflow를 구성하고, 메트릭 계산에 대한 예측 함수를 정의하고, 마지막으로 결과 ROC AUC 점수를 보고 기록합니다.

로깅 수준 설정

여기서는 Synapse.ml 라이브러리에서 불필요한 출력을 표시하지 않도록 로깅 수준을 구성하여 로그를 더 깔끔하게 유지합니다.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

MLflow 구성

이 섹션에서는 실험 추적을 위해 MLflow를 구성합니다. 실험 이름을 "automl_sample"로 설정하여 실행을 구성합니다. 또한 자동 로깅을 사용하도록 설정하여 모델 매개 변수, 메트릭 및 아티팩트가 MLflow에 자동으로 기록되도록 합니다.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

모델 학습 및 평가

마지막으로 제공된 학습 데이터에 대해 LightGBMClassifier 모델을 학습시킵니다. 모델은 이진 분류 및 불균형 처리에 필요한 설정으로 구성됩니다. 그다음 이 학습 모델을 사용해 테스트 데이터에 대한 예측을 수행합니다. 테스트 데이터에서 양수 클래스 및 실제 레이블에 대해 예측된 확률을 추출합니다. 그 후 sklearn의 roc_auc_score 함수를 사용하여 ROC AUC 점수를 계산합니다.

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

여기에서 결과 모델이 ROC AUC 점수 84%를 달성했음을 알 수 있습니다.

FLAML을 사용하여 AutoML 평가판 만들기

이 섹션에서는 FLAML 패키지를 사용하여 AutoML 평가판을 만들고, 평가판 설정을 구성하고, Spark 데이터 세트를 Spark 데이터 세트의 Pandas로 변환하고, AutoML 평가판을 실행하고, 결과 메트릭을 확인합니다.

AutoML 작업을 구성합니다.

여기서는 FLAML 패키지에서 필요한 클래스 및 모듈을 가져오고 기계 학습 파이프라인을 자동화하는 데 사용할 AutoML 인스턴스를 만듭니다.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

설정 구성

이 섹션에서는 AutoML 평가판에 대한 구성 설정을 정의합니다.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Spark에서 Pandas로 변환

Spark 기반 데이터 세트로 AutoML을 실행하려면 to_pandas_on_spark 함수를 사용하여 Spark 데이터 세트의 Pandas로 변환해야 합니다. 이를 통해 FLAML은 데이터를 효율적으로 사용할 수 있습니다.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

AutoML 평가판 실행

이제 AutoML 평가판을 실행합니다. 중첩된 MLflow 실행을 사용하여 기존 MLflow 실행 컨텍스트 내에서 실험을 추적합니다. AutoML 평가판은 Exited 대상 변수를 사용하여 Spark 데이터 세트(df_automl)의 Pandas에서 수행되며 정의된 설정은 구성을 위해 fit 함수에 전달됩니다.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

결과 메트릭 보기

이 마지막 섹션에서는 AutoML 평가판의 결과를 검색하고 표시합니다. 이러한 메트릭은 지정된 데이터 세트에서 AutoML 모델의 성능 및 구성에 대한 인사이트를 제공합니다.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Apache Spark를 사용하여 AutoML 평가판 병렬 처리

데이터 세트가 단일 노드에 들어갈 수 있고 여러 병렬 AutoML 평가판을 동시에 실행하기 위해 Spark의 기능을 활용하려는 시나리오에서는 다음 단계를 수행할 수 있습니다.

Pandas DataFrame으로 변환

병렬 처리를 사용하도록 설정하려면 먼저 데이터를 Pandas DataFrame으로 변환해야 합니다.

pandas_df = train_raw.toPandas()

여기서는 train_raw Spark DataFrame을 병렬 처리에 적합하도록 pandas_df로 명명된 Pandas DataFrame으로 변환합니다.

병렬 처리 설정 구성

Spark 기반 병렬 처리를 사용하도록 use_sparkTrue로 설정합니다. 기본적으로 FLAML은 실행기당 하나의 평가판을 시작합니다. n_concurrent_trials 인수를 사용하여 동시 평가판 수를 사용자 지정할 수 있습니다.

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

이러한 설정에서는 use_sparkTrue로 설정하여 병렬 처리에 Spark를 활용하도록 지정합니다. 또한 동시 평가판 수를 3으로 설정했습니다. 즉, Spark에서 세 개의 평가판이 병렬로 실행됩니다.

AutoML 내역을 병렬화하는 방법에 대한 자세한 내용은 병렬 Spark 작업에 대한 FLAML 설명서를 참조하세요.

AutoML 평가판을 병렬로 실행

이제 지정된 설정과 병렬로 AutoML 평가판을 실행합니다. 중첩된 MLflow 실행을 사용하여 기존 MLflow 실행 컨텍스트 내에서 실험을 추적합니다.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

이제 병렬화를 사용하도록 설정된 AutoML 평가판을 실행합니다. dataframe 인수는 Pandas DataFrame pandas_df으로 설정되고 다른 설정은 병렬 실행을 위해 fit 함수에 전달됩니다.

메트릭 보기

병렬 AutoML 평가판을 실행한 후 최상의 하이퍼 매개 변수 구성, 유효성 검사 데이터에 대한 ROC AUC 및 최상의 실행의 학습 기간을 포함하여 결과를 검색하고 표시합니다.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))