共用方式為


使用自動化 ML 建立模型 (預覽版)

自動化機器學習 (AutoML) 包含一組技術與工具,旨在簡化以最低限度的人工介入來訓練和最佳化機器學習模型的程序。 AutoML 的主要目標是簡化和加速為給定資料集選取最適合的機器學習模型和超參數,此工作通常需要豐富專業知識和計算資源。 在 Fabric 架構中,資料科學家可以利用 flaml.AutoML 模組,將其機器學習工作流程的各個部分自動化。

在本文中,我們將深入探討使用 Spark 資料集直接從程式碼產生 AutoML 試用的程序。 此外,我們將探索將此資料轉換成 Pandas DataFrame 的方法,並討論平行處理實驗試用的技術。

重要

這項功能處於預覽狀態

必要條件

  • 建立新的 Fabric 環境,或確保您是在 Fabric Runtime 1.2 (Spark 3.4 (或更高版本) 和 Delta 2.4) 上執行
  • 建立新的筆記本
  • 將筆記本連結至 Lakehouse。 在筆記本左側,選取 [新增],以新增現有的 Lakehouse 或建立新的 Lakehouse。

載入並準備資料

在本節中,我們將指定資料的下載設定,然後將其儲存至 Lakehouse。

下載資料

此程式碼區塊會從遠端來源下載資料,並將其儲存至 Lakehouse

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

將資料載入至 Spark DataFrame

下列程式碼區塊會將 CSV 檔案中的資料載入 Spark DataFrame,並快取資料以實現有效處理。

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

此程式碼假設資料檔案已下載,且位於指定的路徑中。 其會將 CSV 檔案讀入 Spark DataFrame、推斷結構描述,並在後續作業期間快取該檔案以加快存取速度。

準備資料

在本節中,我們將對資料集執行資料清理和特徵工程。

清除資料

首先,我們會定義一個函式來清理資料,其中包括卸除遺漏資料的資料列、根據特定資料行移除重複的資料列,以及卸除不必要的資料行。

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

clean_data 函式可協助確保資料集在移除不必要的資料行時,沒有遺漏值和重複項。

功能工程

接下來,我們會使用獨熱編碼來建立 [地理] 和 [性別] 資料行的虛擬資料行,進而執行特徵工程。

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

在這裡,我們會使用獨熱編碼,將類別資料行轉換成二進位虛擬資料行,使其適用於機器學習演算法。

顯示已清理的資料

最後,我們會使用顯示函式來顯示已清理和經過特徵工程設計的資料集。


display(df_clean)

此步驟可讓您使用套用的轉換來檢查產生的 DataFrame。

儲存到 Lakehouse

現在,我們會將已清理和經過特徵工程設計資料集儲存至 Lakehouse。

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

在這裡,我們會採用已清理和已轉換的 PySpark DataFrame df_clean,並將其儲存為 Lakehouse 中名為 "churn_data_clean" 的 Delta 資料表。 我們會使用 Delta 格式,以實現資料集的有效版本設定和管理。 mode("overwrite") 可確保覆寫任何具有相同名稱的現有資料表,並建立新版本的資料表。

建立測試和訓練資料集

接下來,我們將從已清理和經過特徵工程設計的資料建立測試和訓練資料集。

在提供的程式碼區段中,我們會使用 Delta 格式從 Lakehouse 載入已清理和經過特徵工程設計的資料集,並以 80-20 的比例將其分割成訓練和測試集,並為機器學習準備資料。 此準備牽涉到從 PySpark ML 匯入 VectorAssembler,以將特徵資料行合併成單一「特徵」資料行。 接著,我們會使用 VectorAssembler 來轉換訓練和測試資料集,進而產生 train_datatest_data DataFrame,其中包含目標變數 "Exited" 和特徵向量。 這些資料集現在已準備好用於建置和評估機器學習模型。

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

訓練基準模型

我們將使用特徵化資料來訓練基準機器學習模型、設定 MLflow 以進行實驗追蹤、定義計量計算的預測函式,最後檢視和記錄產生的 ROC AUC 分數。

設定記錄層級

在這裡,我們會設定記錄層級來隱藏來自 Synapse.ml 程式庫的不必要輸出,以便保持記錄簡潔。

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

設定 MLflow

在本節中,我們會設定 MLflow 以進行實驗追蹤。 我們將實驗名稱設定為 "automl_sample",以組織執行。 此外,我們會啟用自動記錄,確保模型參數、計量和成品會自動記錄至 MLflow。

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

定型及評估模型

最後,我們會在提供的訓練資料上訓練 LightGBMClassifier 模型。 模型是使用二元分類和不平衡處理的必要設定來設定。 我們使用此訓練的模型,來對測試資料進行預測。 我們會從測試資料中擷取正類別和 true 標籤的預測機率。 之後,我們會使用 sklearn 的 roc_auc_score 函式來計算 ROC AUC 分數。

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

從這裡,我們可以看到產生的模型的 ROC AUC 分數為 84%。

使用 FLAML 建立 AutoML 試用

在本節中,我們將使用 FLAML 套件建立 AutoML 試用、設定試用設定、將 Spark 資料集轉換成 Spark 資料集上的 Pandas、執行 AutoML 試用,以及檢視產生的計量。

設定 AutoML 試用

在這裡,我們會從 FLAML 套件匯入必要的類別和模組,並建立 AutoML 的執行個體,以用來自動化機器學習管線。

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

配置設定

在本節中,我們會定義 AutoML 試用的組態設定。

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

轉換成 Spark 上的 Pandas

若要使用 Spark 型資料集執行 AutoML,我們需要使用 to_pandas_on_spark 函式將其轉換成 Spark 資料集上的 Pandas。 這可讓 FLAML 有效率地處理資料。

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

執行 AutoML 試用

現在,我們會執行 AutoML 試用。 我們使用巢狀 MLflow 執行來追蹤現有 MLflow 執行內容內的實驗。 AutoML 試用會在包含目標變數 Exited 的 Spark 資料集上的 Pandas (df_automl) 上執行,且已定義的設定會傳遞至 fit 函式以進行設定。

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

檢視產生的計量

在此最後一節中,我們會擷取並顯示 AutoML 試用的結果。 這些計量可讓您深入了解指定資料集上 AutoML 模型的效能和設定。

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

使用 Apache Spark 平行處理 AutoML 試用

如果您的資料集可以放入單一節點,而且您想要利用 Spark 的強大功能同時執行多個平行 AutoML 試用,您可以遵循下列步驟:

轉換成 Pandas DataFrame

若要啟用平行處理,您的資料必須先轉換成 Pandas DataFrame。

pandas_df = train_raw.toPandas()

在這裡,我們會將 train_raw Spark DataFrame 轉換成名為 pandas_df 的 Pandas DataFrame,使其適合平行處理。

設定平行處理設定

use_spark 設定為 True,以啟用 Spark 型平行處理原則。 依預設,FLAML 會為每個執行程式啟動一次試用。 您可以使用 n_concurrent_trials 引數來自訂同時試用的數目。

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

在這些設定中,我們會將 use_spark 設定為 True,以指定我們想要利用 Spark 進行平行處理原則。 我們也將同時試用數目設定為 3,這表示將在 Spark 上平行執行是三個試用。

若要深入了解如何平行處理 AutoML 試用,您可以瀏覽平行 Spark 工作的 FLAML 文件

平行執行 AutoML 試用

現在,我們將與指定的設定平行執行 AutoML 試用。 我們將使用巢狀 MLflow 執行來追蹤現有 MLflow 執行內容內的實驗。

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

現在會執行已啟用平行處理的 AutoML 試用。 dataframe 引數會設定為 Pandas DataFrame pandas_df,而其他設定會傳遞至 fit 函式,以進行平行執行。

檢視計量

執行平行 AutoML 試用之後,擷取並顯示結果,包括最佳超參數設定、驗證資料上的 ROC AUC,以及效能最佳的執行的訓練持續時間。

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))