Share via


使用自動化 ML 建立模型 (預覽)

自動化 機器學習 (AutoML) 包含一組技術與工具,旨在簡化訓練和優化機器學習模型的程式,只需最少人為介入。 AutoML 的主要目標是簡化和加速為給定數據集選取最適合的機器學習模型和超參數,此工作通常需要相當長的專業知識和計算資源。 在 Fabric 架構中,數據科學家可以利用模組 flaml.AutoML ,將機器學習工作流程的各個層面自動化。

在本文中,我們將深入探討使用Spark數據集直接從程式代碼產生 AutoML 試用版的程式。 此外,我們將探索將這項數據轉換成 Pandas 數據框架的方法,並討論平行處理實驗試驗的技術。

重要

這項功能處於預覽狀態

必要條件

  • 建立新的 Fabric 環境 ,或確保您是在 Fabric 運行時間 1.2 上執行 (Spark 3.4 (或更新版本) 和 Delta 2.4)
  • 建立 新的筆記本
  • 將筆記本附加至 Lakehouse。 在筆記本左側,選取 [新增 ] 以新增現有的 Lakehouse 或建立新的湖屋。

載入和準備數據

在本節中,我們將指定數據的下載設定,然後將它儲存至 Lakehouse。

下載資料

此程式代碼區塊會從遠端來源下載數據,並將其儲存至 Lakehouse

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

將數據載入 Spark 資料框架

下列程式代碼區塊會將 CSV 檔案中的數據載入 Spark DataFrame,並快取數據以有效率地處理。

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

此程式代碼假設資料檔已下載,且位於指定的路徑中。 它會將 CSV 檔案讀入 Spark DataFrame、推斷架構,並在後續作業期間快取該檔案以加快存取速度。

準備資料

在本節中,我們將對數據集執行數據清理和特徵工程。

清除資料

首先,我們會定義一個函式來清除數據,其中包括卸除遺漏數據的數據列、根據特定數據行移除重複的數據列,以及卸除不必要的數據行。

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

clean_data 式可協助確保數據集在移除不必要的數據行時,沒有遺漏的值和重複專案。

功能工程

接下來,我們會使用單熱編碼來建立 『Geography』 和 『Gender』 資料行的虛擬數據行來執行特徵工程。

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

在這裡,我們會使用單熱編碼,將類別數據行轉換成二進位虛擬數據行,使其適用於機器學習演算法。

顯示已清除的數據

最後,我們會使用顯示函式來顯示已清除和特徵設計的數據集。


display(df_clean)

此步驟可讓您使用套用的轉換來檢查產生的 DataFrame。

儲存到湖屋

現在,我們會將清理和功能工程數據集儲存至 Lakehouse。

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

在這裡,我們會採用已清理和轉換的 PySpark DataFrame, df_clean並將它儲存為 Lakehouse 中名為 “churn_data_clean” 的 Delta 數據表。 我們會使用 Delta 格式,以有效率地進行數據集的版本控制和管理。 mode("overwrite")可確保覆寫任何具有相同名稱的現有數據表,並建立新版本的數據表。

建立測試和定型數據集

接下來,我們將從已清理和特徵工程的數據建立測試和定型數據集。

在提供的程式代碼區段中,我們會使用 Delta 格式從 Lakehouse 載入已清理和功能工程的數據集,並以 80-20 的比例將其分割成定型和測試集,併為機器學習準備數據。 此準備牽涉到從 PySpark ML 匯 VectorAssembler 入 ,以將功能數據行合併成單一「特徵」數據行。 接著,我們會使用 VectorAssembler 來轉換定型和測試數據集,進而產生 train_datatest_data DataFrame,其中包含目標變數 「Exited」 和特徵向量。 這些數據集現在已準備好用於建置和評估機器學習模型。

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

定型基準模型

我們將使用特徵化數據來定型基準機器學習模型、設定 MLflow 以進行實驗追蹤、定義計量計算的預測函式,最後檢視和記錄產生的 ROC AUC 分數。

設定記錄層級

在這裡,我們會設定記錄層級來隱藏來自 Synapse.ml 連結庫的不必要輸出,讓記錄保持簡潔。

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

設定 MLflow

在本節中,我們會設定 MLflow 以進行實驗追蹤。 我們將實驗名稱設定為 「automl_sample」,以組織執行。 此外,我們會啟用自動記錄,確保模型參數、計量和成品會自動記錄至 MLflow。

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

定型及評估模型

最後,我們會在提供的訓練數據上定型 LightGBMClassifier 模型。 模型是使用二元分類和不平衡處理的必要設定來設定。 然後,我們會使用此定型的模型,對測試數據進行預測。 我們會從測試數據中擷取正類別和 true 標籤的預測機率。 之後,我們會使用 sklearn 的 roc_auc_score 函式來計算 ROC AUC 分數。

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

從這裡,我們可以看到產生的模型達到ROC AUC分數84%。

使用 FLAML 建立 AutoML 試用版

在本節中,我們將使用 FLAML 套件建立 AutoML 試用版、設定試用版設定、將 Spark 數據集轉換成 Spark 數據集上的 Pandas、執行 AutoML 試用版,以及檢視產生的計量。

設定 AutoML 試用版

在這裡,我們會從 FLAML 套件匯入必要的類別和模組,並建立 AutoML 的實例,以用來自動化機器學習管線。

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

配置設定

在本節中,我們會定義 AutoML 試用版的組態設定。

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

轉換成 Spark 上的 Pandas

若要使用 Spark 型數據集執行 AutoML,我們需要使用 to_pandas_on_spark 函式將它轉換成 Spark 數據集上的 Pandas。 這可讓 FLAML 有效率地使用數據。

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

執行 AutoML 試用版

現在,我們會執行 AutoML 試用版。 我們使用巢狀 MLflow 執行來追蹤現有 MLflow 執行內容內的實驗。 AutoML 試用版會在具有目標變數的 Pandas on Spark 數據集上df_automl執行,Exited 且已定義的設定會傳遞至函式以進行 fit 組態。

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

檢視產生的計量

在此最後一節中,我們會擷取並顯示 AutoML 試用版的結果。 這些計量可讓您深入瞭解指定數據集上 AutoML 模型的效能和設定。

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

使用 Apache Spark 平行處理 AutoML 試用版

如果您的數據集可以放入單一節點,而且您想要利用 Spark 的強大功能同時執行多個平行 AutoML 試用版,您可以遵循下列步驟:

轉換成 Pandas 數據框架

若要啟用平行處理,您的數據必須先轉換成 Pandas DataFrame。

pandas_df = train_raw.toPandas()

在這裡,我們會將 train_raw Spark DataFrame轉換成名為 pandas_df 的 Pandas DataFrame,使其適合平行處理。

設定平行處理設定

設定 use_sparkTrue 以啟用 Spark 型平行處理原則。 根據預設,FLAML 會針對每個執行程序啟動一個試用版。 您可以使用 自變數來自定義並行試用版 n_concurrent_trials 的數目。

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

在這些設定中,我們會將 設定use_sparkTrue為 ,以指定我們想要利用Spark進行平行處理原則。 我們也將並行試用版數目設定為 3,這表示三個試用版將在 Spark 上平行執行。

若要深入瞭解如何平行處理 AutoML 追蹤,您可以瀏覽平行 Spark 作業FLAML 檔。

平行執行 AutoML 試用版

現在,我們將與指定的設定平行執行 AutoML 試用版。 我們將使用巢狀 MLflow 執行來追蹤現有 MLflow 執行內容內的實驗。

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

現在,這會執行已啟用平行處理的 AutoML 試用版。 自 dataframe 變數會設定為 Pandas DataFrame pandas_df,而其他設定會傳遞至函 fit 式以進行平行執行。

檢視計量

執行平行 AutoML 試用版之後,擷取並顯示結果,包括驗證數據上的最佳超參數設定、ROC AUC,以及最佳執行執行的定型持續時間。

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))

下一步