Skapa modeller med automatiserad ML (förhandsversion)
Automatiserad maskininlärning (AutoML) omfattar en uppsättning tekniker och verktyg som utformats för att effektivisera träningsprocessen och optimera maskininlärningsmodeller med minimal mänsklig inblandning. Det primära målet med AutoML är att förenkla och påskynda valet av den lämpligaste maskininlärningsmodellen och hyperparametrar för en viss datamängd, en uppgift som vanligtvis kräver betydande expertis och beräkningsresurser. Inom ramverket Infrastruktur kan dataexperter använda modulen flaml.AutoML
för att automatisera olika aspekter av sina arbetsflöden för maskininlärning.
I den här artikeln går vi in i processen att generera AutoML-utvärderingar direkt från kod med hjälp av en Spark-datauppsättning. Dessutom kommer vi att utforska metoder för att konvertera dessa data till en Pandas-dataram och diskutera tekniker för parallellisering av dina experimenteringsförsök.
Viktigt!
Den här funktionen är i förhandsversion.
Förutsättningar
Skaffa en Microsoft Fabric-prenumeration. Eller registrera dig för en kostnadsfri utvärderingsversion av Microsoft Fabric.
Logga in på Microsoft Fabric.
Använd upplevelseväxlaren till vänster på startsidan för att växla till Synapse Datavetenskap upplevelse.
- Skapa en ny Infrastrukturmiljö eller se till att du körs på Fabric Runtime 1.2 (Spark 3.4 (eller senare) och Delta 2.4)
- Skapa en ny notebook-fil.
- Bifoga anteckningsboken till ett sjöhus. Till vänster i anteckningsboken väljer du Lägg till för att lägga till ett befintligt lakehouse eller skapa ett nytt.
Läsa in och förbereda data
I det här avsnittet anger vi nedladdningsinställningarna för data och sparar dem sedan i lakehouse.
Nedladdning av data
Det här kodblocket laddar ned data från en fjärrkälla och sparar dem i lakehouse
import os
import requests
IS_CUSTOM_DATA = False # if TRUE, dataset has to be uploaded manually
if not IS_CUSTOM_DATA:
# Specify the remote URL where the data is hosted
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
# List of data files to download
file_list = ["churn.csv"]
# Define the download path within the lakehouse
download_path = "/lakehouse/default/Files/churn/raw"
# Check if the lakehouse directory exists; if not, raise an error
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
# Create the download directory if it doesn't exist
os.makedirs(download_path, exist_ok=True)
# Download each data file if it doesn't already exist in the lakehouse
for fname in file_list:
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
Läsa in data i en Spark-dataram
Följande kodblock läser in data från CSV-filen till en Spark DataFrame och cachelagrar dem för effektiv bearbetning.
df = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv("Files/churn/raw/churn.csv")
.cache()
)
Den här koden förutsätter att datafilen har laddats ned och finns i den angivna sökvägen. Den läser CSV-filen i en Spark DataFrame, härleder schemat och cachelagrar det för snabbare åtkomst under efterföljande åtgärder.
Förbereda data
I det här avsnittet ska vi utföra datarensning och funktionsframställning på datamängden.
Rensa data
Först definierar vi en funktion för att rensa data, vilket inkluderar att ta bort rader med saknade data, ta bort dubblettrader baserat på specifika kolumner och släppa onödiga kolumner.
# Define a function to clean the data
def clean_data(df):
# Drop rows with missing data across all columns
df = df.dropna(how="all")
# Drop duplicate rows based on 'RowNumber' and 'CustomerId'
df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
# Drop columns: 'RowNumber', 'CustomerId', 'Surname'
df = df.drop('RowNumber', 'CustomerId', 'Surname')
return df
# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")
# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)
Funktionen clean_data
hjälper till att säkerställa att datamängden är fri från saknade värden och dubbletter samtidigt som onödiga kolumner tas bort.
Funktionsframställning
Därefter utför vi funktionsframställning genom att skapa dummykolumner för kolumnerna Geografi och Kön med hjälp av kodning med en frekvent kodning.
# Import PySpark functions
from pyspark.sql import functions as F
# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
"*",
F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)
# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")
Här använder vi kodning med en frekvent kodning för att konvertera kategoriska kolumner till binära dummy-kolumner, vilket gör dem lämpliga för maskininlärningsalgoritmer.
Visa rensade data
Slutligen visar vi den rensade och funktionsutvecklade datamängden med hjälp av visningsfunktionen.
display(df_clean)
Med det här steget kan du inspektera den resulterande DataFrame med de tillämpade transformeringarna.
Spara till lakehouse
Nu ska vi spara den rensade och funktionsutvecklade datamängden i lakehouse.
# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")
Här tar vi den rensade och omvandlade PySpark DataFrame df_clean
och sparar den som en Delta-tabell med namnet "churn_data_clean" i lakehouse. Vi använder Delta-formatet för effektiv versionshantering och hantering av datamängden. Säkerställer mode("overwrite")
att alla befintliga tabeller med samma namn skrivs över och att en ny version av tabellen skapas.
Skapa test- och träningsdatauppsättningar
Därefter skapar vi test- och träningsdatauppsättningarna från de rensade och funktionsutvecklade data.
I det angivna kodavsnittet läser vi in en rensad och funktionskonstruerad datauppsättning från lakehouse med hjälp av Delta-format, delar upp den i tränings- och testuppsättningar med förhållandet 80–20 och förbereder data för maskininlärning. Den här förberedelsen omfattar import VectorAssembler
från PySpark ML för att kombinera funktionskolumner till en enda "funktionskolumn". Därefter använder VectorAssembler
vi för att transformera tränings- och testningsdatauppsättningarna, vilket resulterar i train_data
och test_data
DataFrames som innehåller målvariabeln "Exited" och funktionsvektorerna. Dessa datauppsättningar är nu redo att användas för att skapa och utvärdera maskininlärningsmodeller.
# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler
# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")
# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)
# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]
# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]
Träna baslinjemodell
Med hjälp av funktionaliserade data tränar vi en maskininlärningsmodell för baslinje, konfigurerar MLflow för experimentspårning, definierar en förutsägelsefunktion för måttberäkning och visar och loggar slutligen den resulterande ROC AUC-poängen.
Ange loggningsnivå
Här konfigurerar vi loggningsnivån för att förhindra onödiga utdata från Synapse.ml-biblioteket och hålla loggarna renare.
import logging
logging.getLogger('synapse.ml').setLevel(logging.ERROR)
Konfigurera MLflow
I det här avsnittet konfigurerar vi MLflow för experimentspårning. Vi ställer in experimentnamnet på "automl_sample" för att organisera körningarna. Dessutom aktiverar vi automatisk loggning, vilket säkerställer att modellparametrar, mått och artefakter loggas automatiskt till MLflow.
import mlflow
# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)
Träna och utvärdera modellen
Slutligen tränar vi en LightGBMClassifier-modell på de angivna träningsdata. Modellen konfigureras med nödvändiga inställningar för binär klassificering och obalanshantering. Sedan använder vi den här tränade modellen för att göra förutsägelser på testdata. Vi extraherar de förväntade sannolikheterna för den positiva klassen och de sanna etiketterna från testdata. Efteråt beräknar vi ROC AUC-poängen med sklearns roc_auc_score
funktion.
from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score
# Assuming you have already defined 'train_data' and 'test_data'
with mlflow.start_run(run_name="default") as run:
# Create a LightGBMClassifier model with specified settings
model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
# Fit the model to the training data
model = model.fit(train_data)
# Get the predictions
predictions = model.transform(test_data)
# Extract the predicted probabilities for the positive class
y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()
# Extract the true labels from the 'test_data' DataFrame
y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()
# Compute the ROC AUC score
roc_auc = roc_auc_score(y_true, y_pred)
# Log the ROC AUC score with MLflow
mlflow.log_metric("ROC_AUC", roc_auc)
# Print or log the ROC AUC score
print("ROC AUC Score:", roc_auc)
Härifrån kan vi se att vår resulterande modell uppnår en ROC AUC-poäng på 84 %.
Skapa en AutoML-utvärderingsversion med FLAML
I det här avsnittet skapar vi en AutoML-utvärderingsversion med hjälp av FLAML-paketet, konfigurerar utvärderingsinställningarna, konverterar Spark-datamängden till en Pandas på Spark-datauppsättning, kör AutoML-utvärderingsversionen och visar de resulterande måtten.
Konfigurera AutoML-utvärderingsversionen
Här importerar vi nödvändiga klasser och moduler från FLAML-paketet och skapar en instans av AutoML, som används för att automatisera maskininlärningspipelinen.
# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark
# Create an AutoML instance
automl = AutoML()
Konfigurera inställningar
I det här avsnittet definierar vi konfigurationsinställningarna för AutoML-utvärderingsversionen.
# Define AutoML settings
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"log_file_name": 'flaml_experiment.log', # FLAML log file
"seed": 41, # Random seed
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Konvertera till Pandas på Spark
Om du vill köra AutoML med en Spark-baserad datauppsättning måste vi konvertera den till en Pandas på Spark-datauppsättning med to_pandas_on_spark
hjälp av funktionen. Detta gör att FLAML kan arbeta effektivt med data.
# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)
Kör AutoML-utvärderingsversionen
Nu kör vi AutoML-utvärderingsversionen. Vi använder en kapslad MLflow-körning för att spåra experimentet i den befintliga MLflow-körningskontexten. AutoML-utvärderingsversionen utförs på Pandas på Spark-datamängden (df_automl
) med målvariabeln "Exited
och de definierade inställningarna skickas till fit
funktionen för konfiguration.
'''The main flaml automl API'''
with mlflow.start_run(nested=True):
automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)
Visa resulterande mått
I det här sista avsnittet hämtar och visar vi resultatet av AutoML-utvärderingsversionen. Dessa mått ger insikter om prestanda och konfiguration av AutoML-modellen på den angivna datamängden.
# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))
Parallellisera din AutoML-utvärderingsversion med Apache Spark
I scenarier där din datauppsättning får plats i en enda nod och du vill utnyttja kraften i Spark för att köra flera parallella AutoML-utvärderingar samtidigt kan du följa dessa steg:
Konvertera till Pandas-dataram
Om du vill aktivera parallellisering måste dina data först konverteras till en Pandas DataFrame.
pandas_df = train_raw.toPandas()
Här konverterar train_raw
vi Spark DataFrame till en Pandas DataFrame med namnet pandas_df
för att göra den lämplig för parallell bearbetning.
Konfigurera parallelliseringsinställningar
Ange use_spark
till True
för att aktivera Spark-baserad parallellitet. Som standard startar FLAML en utvärderingsversion per köre. Du kan anpassa antalet samtidiga utvärderingsversioner med hjälp n_concurrent_trials
av argumentet .
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"seed": 41, # Random seed
"use_spark": True, # Enable Spark-based parallelism
"n_concurrent_trials": 3, # Number of concurrent trials to run
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
I de här inställningarna anger vi att vi vill använda Spark för parallellitet genom att ange use_spark
till True
. Vi anger också antalet samtidiga utvärderingsversioner till 3, vilket innebär att tre utvärderingsversioner körs parallellt på Spark.
Om du vill veta mer om hur du parallelliserar dina AutoML-spår kan du gå till FLAML-dokumentationen för parallella Spark-jobb.
Kör AutoML-utvärderingsversionen parallellt
Nu ska vi köra AutoML-utvärderingsversionen parallellt med de angivna inställningarna. Vi använder en kapslad MLflow-körning för att spåra experimentet i den befintliga MLflow-körningskontexten.
'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
automl.fit(dataframe=pandas_df, label='Exited', **settings)
Nu körs AutoML-utvärderingsversionen med parallellisering aktiverat. Argumentet dataframe
är inställt på Pandas DataFrame pandas_df
och andra inställningar skickas till fit
funktionen för parallell körning.
Visa mått
När du har kört den parallella AutoML-utvärderingsversionen hämtar och visar du resultatet, inklusive den bästa hyperparameterkonfigurationen, ROC AUC på valideringsdata och träningsvaraktigheten för den bäst presterande körningen.
''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))