Dela via


Skapa modeller med automatiserad ML (förhandsversion)

Automatiserad maskininlärning (AutoML) omfattar en uppsättning tekniker och verktyg som utformats för att effektivisera träningsprocessen och optimera maskininlärningsmodeller med minimal mänsklig inblandning. Det primära målet med AutoML är att förenkla och påskynda valet av den lämpligaste maskininlärningsmodellen och hyperparametrar för en viss datamängd, en uppgift som vanligtvis kräver betydande expertis och beräkningsresurser. Inom ramverket Infrastruktur kan dataexperter använda modulen flaml.AutoML för att automatisera olika aspekter av sina arbetsflöden för maskininlärning.

I den här artikeln går vi in i processen att generera AutoML-utvärderingar direkt från kod med hjälp av en Spark-datauppsättning. Dessutom kommer vi att utforska metoder för att konvertera dessa data till en Pandas-dataram och diskutera tekniker för parallellisering av dina experimenteringsförsök.

Viktigt!

Den här funktionen är i förhandsversion.

Förutsättningar

  • Skapa en ny Infrastrukturmiljö eller se till att du körs på Fabric Runtime 1.2 (Spark 3.4 (eller senare) och Delta 2.4)
  • Skapa en ny notebook-fil.
  • Bifoga anteckningsboken till ett sjöhus. Till vänster i anteckningsboken väljer du Lägg till för att lägga till ett befintligt lakehouse eller skapa ett nytt.

Läsa in och förbereda data

I det här avsnittet anger vi nedladdningsinställningarna för data och sparar dem sedan i lakehouse.

Nedladdning av data

Det här kodblocket laddar ned data från en fjärrkälla och sparar dem i lakehouse

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Läsa in data i en Spark-dataram

Följande kodblock läser in data från CSV-filen till en Spark DataFrame och cachelagrar dem för effektiv bearbetning.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Den här koden förutsätter att datafilen har laddats ned och finns i den angivna sökvägen. Den läser CSV-filen i en Spark DataFrame, härleder schemat och cachelagrar det för snabbare åtkomst under efterföljande åtgärder.

Förbereda data

I det här avsnittet ska vi utföra datarensning och funktionsframställning på datamängden.

Rensa data

Först definierar vi en funktion för att rensa data, vilket inkluderar att ta bort rader med saknade data, ta bort dubblettrader baserat på specifika kolumner och släppa onödiga kolumner.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

Funktionen clean_data hjälper till att säkerställa att datamängden är fri från saknade värden och dubbletter samtidigt som onödiga kolumner tas bort.

Funktionsframställning

Därefter utför vi funktionsframställning genom att skapa dummykolumner för kolumnerna Geografi och Kön med hjälp av kodning med en frekvent kodning.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Här använder vi kodning med en frekvent kodning för att konvertera kategoriska kolumner till binära dummy-kolumner, vilket gör dem lämpliga för maskininlärningsalgoritmer.

Visa rensade data

Slutligen visar vi den rensade och funktionsutvecklade datamängden med hjälp av visningsfunktionen.


display(df_clean)

Med det här steget kan du inspektera den resulterande DataFrame med de tillämpade transformeringarna.

Spara till lakehouse

Nu ska vi spara den rensade och funktionsutvecklade datamängden i lakehouse.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Här tar vi den rensade och omvandlade PySpark DataFrame df_cleanoch sparar den som en Delta-tabell med namnet "churn_data_clean" i lakehouse. Vi använder Delta-formatet för effektiv versionshantering och hantering av datamängden. Säkerställer mode("overwrite") att alla befintliga tabeller med samma namn skrivs över och att en ny version av tabellen skapas.

Skapa test- och träningsdatauppsättningar

Därefter skapar vi test- och träningsdatauppsättningarna från de rensade och funktionsutvecklade data.

I det angivna kodavsnittet läser vi in en rensad och funktionskonstruerad datauppsättning från lakehouse med hjälp av Delta-format, delar upp den i tränings- och testuppsättningar med förhållandet 80–20 och förbereder data för maskininlärning. Den här förberedelsen omfattar import VectorAssembler från PySpark ML för att kombinera funktionskolumner till en enda "funktionskolumn". Därefter använder VectorAssembler vi för att transformera tränings- och testningsdatauppsättningarna, vilket resulterar i train_data och test_data DataFrames som innehåller målvariabeln "Exited" och funktionsvektorerna. Dessa datauppsättningar är nu redo att användas för att skapa och utvärdera maskininlärningsmodeller.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Träna baslinjemodell

Med hjälp av funktionaliserade data tränar vi en maskininlärningsmodell för baslinje, konfigurerar MLflow för experimentspårning, definierar en förutsägelsefunktion för måttberäkning och visar och loggar slutligen den resulterande ROC AUC-poängen.

Ange loggningsnivå

Här konfigurerar vi loggningsnivån för att förhindra onödiga utdata från Synapse.ml-biblioteket och hålla loggarna renare.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Konfigurera MLflow

I det här avsnittet konfigurerar vi MLflow för experimentspårning. Vi ställer in experimentnamnet på "automl_sample" för att organisera körningarna. Dessutom aktiverar vi automatisk loggning, vilket säkerställer att modellparametrar, mått och artefakter loggas automatiskt till MLflow.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Träna och utvärdera modellen

Slutligen tränar vi en LightGBMClassifier-modell på de angivna träningsdata. Modellen konfigureras med nödvändiga inställningar för binär klassificering och obalanshantering. Sedan använder vi den här tränade modellen för att göra förutsägelser på testdata. Vi extraherar de förväntade sannolikheterna för den positiva klassen och de sanna etiketterna från testdata. Efteråt beräknar vi ROC AUC-poängen med sklearns roc_auc_score funktion.

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Härifrån kan vi se att vår resulterande modell uppnår en ROC AUC-poäng på 84 %.

Skapa en AutoML-utvärderingsversion med FLAML

I det här avsnittet skapar vi en AutoML-utvärderingsversion med hjälp av FLAML-paketet, konfigurerar utvärderingsinställningarna, konverterar Spark-datamängden till en Pandas på Spark-datauppsättning, kör AutoML-utvärderingsversionen och visar de resulterande måtten.

Konfigurera AutoML-utvärderingsversionen

Här importerar vi nödvändiga klasser och moduler från FLAML-paketet och skapar en instans av AutoML, som används för att automatisera maskininlärningspipelinen.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Konfigurera inställningar

I det här avsnittet definierar vi konfigurationsinställningarna för AutoML-utvärderingsversionen.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Konvertera till Pandas på Spark

Om du vill köra AutoML med en Spark-baserad datauppsättning måste vi konvertera den till en Pandas på Spark-datauppsättning med to_pandas_on_spark hjälp av funktionen. Detta gör att FLAML kan arbeta effektivt med data.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Kör AutoML-utvärderingsversionen

Nu kör vi AutoML-utvärderingsversionen. Vi använder en kapslad MLflow-körning för att spåra experimentet i den befintliga MLflow-körningskontexten. AutoML-utvärderingsversionen utförs på Pandas på Spark-datamängden (df_automl) med målvariabeln "Exited och de definierade inställningarna skickas till fit funktionen för konfiguration.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Visa resulterande mått

I det här sista avsnittet hämtar och visar vi resultatet av AutoML-utvärderingsversionen. Dessa mått ger insikter om prestanda och konfiguration av AutoML-modellen på den angivna datamängden.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Parallellisera din AutoML-utvärderingsversion med Apache Spark

I scenarier där din datauppsättning får plats i en enda nod och du vill utnyttja kraften i Spark för att köra flera parallella AutoML-utvärderingar samtidigt kan du följa dessa steg:

Konvertera till Pandas-dataram

Om du vill aktivera parallellisering måste dina data först konverteras till en Pandas DataFrame.

pandas_df = train_raw.toPandas()

Här konverterar train_raw vi Spark DataFrame till en Pandas DataFrame med namnet pandas_df för att göra den lämplig för parallell bearbetning.

Konfigurera parallelliseringsinställningar

Ange use_spark till True för att aktivera Spark-baserad parallellitet. Som standard startar FLAML en utvärderingsversion per köre. Du kan anpassa antalet samtidiga utvärderingsversioner med hjälp n_concurrent_trials av argumentet .

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

I de här inställningarna anger vi att vi vill använda Spark för parallellitet genom att ange use_spark till True. Vi anger också antalet samtidiga utvärderingsversioner till 3, vilket innebär att tre utvärderingsversioner körs parallellt på Spark.

Om du vill veta mer om hur du parallelliserar dina AutoML-spår kan du gå till FLAML-dokumentationen för parallella Spark-jobb.

Kör AutoML-utvärderingsversionen parallellt

Nu ska vi köra AutoML-utvärderingsversionen parallellt med de angivna inställningarna. Vi använder en kapslad MLflow-körning för att spåra experimentet i den befintliga MLflow-körningskontexten.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Nu körs AutoML-utvärderingsversionen med parallellisering aktiverat. Argumentet dataframe är inställt på Pandas DataFrame pandas_dfoch andra inställningar skickas till fit funktionen för parallell körning.

Visa mått

När du har kört den parallella AutoML-utvärderingsversionen hämtar och visar du resultatet, inklusive den bästa hyperparameterkonfigurationen, ROC AUC på valideringsdata och träningsvaraktigheten för den bäst presterande körningen.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))