Freigeben über


Erstellen von Modellen mit automatisiertem ML (Vorschau)

Automatisiertes maschinelles Lernen (AutoML) umfasst eine Reihe von Techniken und Tools, die entwickelt wurden, um den Schulungs- und Optimierungsprozess von Machine Learning-Modellen mit minimalem menschlichem Eingriff zu optimieren. Das Hauptziel von AutoML ist es, die Auswahl des am besten geeigneten Machine Learning-Modells und der entsprechenden Hyperparameter für ein bestimmtes DataSet zu vereinfachen und zu beschleunigen, eine Aufgabe, die in der Regel große Erfahrung und Rechenressourcen erfordert. Im Fabric-Framework können Wissenschaftliche Fachkräfte für Daten das flaml.AutoML Modul nutzen, um verschiedene Aspekte ihrer Workflows für maschinelles Lernen zu automatisieren.

In diesem Artikel befassen wir uns mit dem Prozess der Generierung von AutoML-Testversionen direkt aus Code mithilfe eines Spark-DataSets. Darüber hinaus untersuchen wir Methoden für die Konvertierung dieser Daten in einen Pandas-DataFrame und besprechen Techniken zur Parallelisierung Ihrer Experimentierversuche.

Wichtig

Dieses Feature befindet sich in der Vorschau.

Voraussetzungen

  • Erstellen Sie eine neue Fabric-Umgebung, oder stellen Sie sicher, dass Sie mit der Fabric Runtime 1.2 (Spark 3.4 (oder höher) und Delta 2.4) arbeiten
  • Erstellen Sie ein neues Notebook.
  • Verbinden Sie Ihr Notebook mit einem Lakehouse. Wählen Sie auf der linken Seite Ihres Notebooks Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein neues zu erstellen.

Laden und Vorbereiten von Daten

In diesem Abschnitt geben wir die Downloadeinstellungen für die Daten an und speichern sie dann im Lakehouse.

Herunterladen von Daten

Dieser Codeblock lädt die Daten aus einer Remotequelle herunter und speichert sie im Lakehouse

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Laden von Daten in ein Spark-DataFrame

Der folgende Codeblock lädt die Daten aus der CSV-Datei in einen Spark DataFrame und speichert sie für eine effiziente Verarbeitung im Cache.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Dieser Code geht davon aus, dass die Datendatei heruntergeladen wurde und sich im angegebenen Pfad befindet. Die CSV-Datei wird in einen Spark DataFrame eingelesen, das Schema wird abgeleitet und für einen schnelleren Zugriff während der nachfolgenden Operationen zwischengespeichert.

Vorbereiten der Daten

In diesem Abschnitt führen wir eine Datenbereinigung und ein Feature-Engineering im Datensatz durch.

Bereinigen von Daten

Zunächst definieren wir eine Funktion zum Bereinigen der Daten. Dazu gehört das Löschen von Zeilen mit fehlenden Daten, das Entfernen doppelter Zeilen auf der Grundlage bestimmter Spalten und das Löschen unnötiger Spalten.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

Die clean_data Funktion trägt dazu bei, dass das DataSet frei von fehlenden Werten und Duplikaten ist, während unnötige Spalten entfernt werden.

Featureentwicklung

Als nächstes führen wir ein Feature-Engineering durch, indem wir Dummy-Spalten für die Spalten 'Geographie' und 'Geschlecht' mit Hilfe von One-Hot-Codierung erstellen.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Hier verwenden wir die One-Hot-Codierung, um kategorische Spalten in binäre Dummy-Spalten umzuwandeln, damit sie für Algorithmen des maschinellen Lernens geeignet sind.

Anzeigen der bereinigten Daten

Abschließend zeigen wir den bereinigten, mit Feature Engineering versehenen Datensatz mithilfe der Anzeigefunktion an.


display(df_clean)

In diesem Schritt können Sie den sich ergebenden DataFrame mit den vorgenommenen Transformationen überprüfen.

Speichern in Lakehouse

Jetzt speichern wir den bereinigten, mit Feature-Engineering versehenen Datensatz in Lakehouse.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Hier nehmen wir den bereinigten und transformierten PySpark DataFrame, df_clean, und speichern ihn als Delta-Tabelle mit dem Namen "churn_data_clean" in Lakehouse. Wir verwenden das Delta-Format für eine effiziente Versionsverwaltung und -pflege des DataSet. mode("overwrite") gewährleistet, dass jede bestehende Tabelle mit demselben Namen überschrieben wird und eine neue Version der Tabelle erstellt wird.

Erstellen von Test- und Trainings-DataSets

Als Nächstes erstellen wir die Test- und Trainings-DataSets aus den bereinigten und mit Feature-Engineering versehenen Daten.

Im mitgelieferten Code-Abschnitt laden wir einen bereinigten und mit Feature-Engineering aufbereiteten Datensatz aus dem Lakehouse im Delta-Format, teilen ihn in Trainings- und Testsets im Verhältnis 80-20 auf und bereiten die Daten für das maschinelle Lernen vor. Bei dieser Vorbereitung wird der Import von VectorAssembler von PySpark ML zum Kombinieren von Featurespalten in eine einzelne "Features"-Spalte verwendet. Anschließend verwenden wir die VectorAssembler für die Transformation der Trainings- und Test-DataSets, was zu train_data und test_data DataFrames führt, die die Zielvariable "Exited" und die Featurevektoren enthalten. Diese DataSets sind jetzt für die Erstellung und Auswertung von Machine Learning-Modellen bereit.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Trainieren eines Baselinemodells

Anhand der featurisierten Daten werden wir ein grundlegendes Modell für maschinelles Lernen trainieren, MLflow für die Verfolgung von Experimenten konfigurieren, eine Vorhersagefunktion für die Berechnung von Metriken definieren und schließlich den resultierenden ROC AUC-Wert anzeigen und protokollieren.

Festlegen des Protokolliergrads

Hier konfigurieren wir die Protokollierungsstufe, um unnötige Ausgaben aus der Synapse.ml-Bibliothek zu unterdrücken und die Protokolle sauberer zu halten.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Konfigurieren von MLflow

In diesem Abschnitt konfigurieren wir MLflow für die Nachverfolgung von Experimenten. Wir legen den Experimentnamen auf "automl_sample" fest, um die Ausführung zu organisieren. Darüber hinaus aktivieren wir die automatische Protokollierung, um sicherzustellen, dass Modellparameter, Metriken und Artefakte automatisch bei MLflow protokolliert werden.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Trainieren und Bewerten des Modells

Abschließend trainieren wir ein LightGBMClassifier-Modell auf den bereitgestellten Trainingsdaten. Das Modell wird mit den erforderlichen Einstellungen für die Binärklassifizierung und die Ungleichgewichtbehandlung konfiguriert. Dann verwenden wir das Training-Modell für Vorhersagen mit einem Test-Datensatz. Wir extrahieren die vorhergesagten Wahrscheinlichkeiten für die positive Klasse und die tatsächlichen Bezeichnungen aus den Testdaten. Anschließend berechnen wir die ROC-AUC-Bewertung mithilfe der Sklearn-Funktion roc_auc_score.

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Von hier aus können wir sehen, dass unser resultierendes Modell einen ROC AUC-Wert von 84% erreicht.

Erstellen einer AutoML-Testversion mit FLAML

In diesem Abschnitt erstellen wir eine AutoML-Testversion mithilfe des FLAML-Pakets, konfigurieren die Testeinstellungen, konvertieren das Spark-DataSet in ein Pandas auf Spark-DataSet, führen die AutoML-Testversion aus und zeigen die resultierenden Metriken an.

Konfigurieren der AutoML-Testversion

Hier importieren wir die erforderlichen Klassen und Module aus dem FLAML-Paket und erstellen eine Instanz von AutoML, die zum Automatisieren der Machine-Learning-Pipeline verwendet wird.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Konfigurieren von Einstellungen

In diesem Abschnitt definieren wir die Konfigurationseinstellungen für die AutoML-Testversion.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

In Spark auf Pandas konvertieren

Um AutoML mit einem Spark-basierten Dataset auszuführen, müssen wir es mithilfe der to_pandas_on_spark Funktion in einen Pandas auf Spark-DataSet konvertieren. Dadurch kann FLAML effizient mit den Daten arbeiten.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Ausführen der AutoML-Testversion

Jetzt führen wir die AutoML-Testversion aus. Wir verwenden einen geschachtelten MLflow-Lauf, um das Experiment im vorhandenen MLflow-Ausführungskontext nachzuverfolgen. Der AutoML-Testversion wird auf dem Pandas auf Spark-Datensatz (df_automl) mit der Zielvariablen "Exited durchgeführt und die definierten Einstellungen werden zur Konfiguration an die fit Funktion übergeben.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Anzeigen der resultierenden Metriken

In diesem letzten Abschnitt werden die Ergebnisse der AutoML-Testversion abgerufen und angezeigt. Diese Metriken liefern Einblicke in die Leistung und Konfiguration des AutoML-Modells für das angegebene DataSet.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Parallelisieren Ihrer AutoML-Testversion mit Apache Spark

In Szenarien, in denen Ihr DataSet in einen Single Node passen kann und Sie die Leistungsfähigkeit von Spark für die gleichzeitige Ausführung mehrerer paralleler AutoML-Testversionen nutzen möchten, können Sie die folgenden Schritte ausführen:

In Pandas-DataFrame konvertieren

Um die Parallelisierung zu ermöglichen, müssen Ihre Daten zuerst in einen Pandas DataFrame konvertiert werden.

pandas_df = train_raw.toPandas()

Hier konvertieren wir den train_raw Spark DataFrame in einen Pandas DataFrame mit dem Namen pandas_df, um ihn für Parallelverarbeitung nutzbar zu machen.

Konfigurieren von Parallelisierungs-Einstellungen

Setzen Sie use_spark auf True fest, um die Spark-basierte Parallelität zu aktivieren. Standardmäßig startet FLAML eine Testversion pro Executor. Sie können die Anzahl der gleichzeitigen Testversionen mit dem Argument n_concurrent_trials anpassen.

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

In diesen Einstellungen geben wir an, dass wir Spark für die Parallelität nutzen möchten, indem wir use_spark auf True setzen. Wir legen auch die Anzahl der gleichzeitigen Testversionen auf 3 fest, was bedeutet, dass drei Testversionen parallel auf Spark ausgeführt werden.

Weitere Informationen zum Parallelisieren Ihrer AutoML-Testversionen finden Sie in der FLAML-Dokumentation für parallele Spark-Aufträge.

Parallele Ausführung der AutoML-Testversion

Jetzt führen wir die AutoML-Testversion parallel zu den angegebenen Einstellungen aus. Wir verwenden einen geschachtelten MLflow-Lauf, um das Experiment im vorhandenen MLflow-Ausführungs-Kontext nachzuverfolgen.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Dadurch wird nun die AutoML-Testversion mit aktivierter Parallelisierung ausgeführt. Das dataframe Argument wird auf dem Pandas DataFrame pandas_dffestgelegt, und andere Einstellungen werden zur parallelen Ausführung an die fit Funktion übergeben.

Anzeigen von Metriken

Nachdem Sie die parallele AutoML-Testversion ausgeführt haben, rufen Sie die Ergebnisse ab und zeigen sie an, einschließlich der besten Hyperparameter-Konfiguration, des ROC AUC für die Validierungsdaten und der Trainingsdauer des besten Laufs.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))