Поделиться через


Создание моделей с помощью автоматизированного машинного обучения (предварительная версия)

Автоматизированная Машинное обучение (AutoML) включает набор методов и инструментов, предназначенных для упрощения процесса обучения и оптимизации моделей машинного обучения с минимальным вмешательством человека. Основной целью AutoML является упрощение и ускорение выбора наиболее подходящей модели машинного обучения и гиперпараметров для заданного набора данных, задача, которая обычно требует значительных знаний и вычислительных ресурсов. В платформе Fabric специалисты по обработке и анализу данных могут использовать flaml.AutoML модуль для автоматизации различных аспектов рабочих процессов машинного обучения.

В этой статье мы рассмотрим процесс создания пробных версий AutoML непосредственно из кода с помощью набора данных Spark. Кроме того, мы рассмотрим методы преобразования этих данных в кадр данных Pandas и обсудим методы параллелизации проб экспериментов.

Внимание

Эта функция доступна в предварительной версии.

Необходимые компоненты

  • Получение подписки Microsoft Fabric. Или зарегистрируйте бесплатную пробную версию Microsoft Fabric.

  • Войдите в Microsoft Fabric.

  • Используйте переключатель интерфейса в левой части домашней страницы, чтобы перейти на интерфейс Synapse Обработка и анализ данных.

    Снимок экрана: меню переключателя интерфейса, в котором показано, где выбрать Обработка и анализ данных.

Загрузка и подготовка данных

В этом разделе мы укажем параметры скачивания для данных, а затем сохраните его в lakehouse.

Загрузка данных

Этот блок кода загружает данные из удаленного источника и сохраняет его в lakehouse.

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Загрузка данных в кадр данных Spark

Следующий блок кода загружает данные из CSV-файла в кадр данных Spark и кэширует его для эффективной обработки.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Этот код предполагает, что файл данных скачан и расположен в указанном пути. Он считывает CSV-файл в кадр данных Spark, выводит схему и кэширует его для ускорения доступа во время последующих операций.

Подготовка данных

В этом разделе мы будем выполнять очистку данных и проектирование функций в наборе данных.

Очистка данных.

Во-первых, мы определим функцию очистки данных, которая включает удаление строк с отсутствующими данными, удаление повторяющихся строк на основе определенных столбцов и удаление ненужных столбцов.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

Функция clean_data помогает убедиться, что набор данных не содержит отсутствующих значений и дубликатов при удалении ненужных столбцов.

Проектирование признаков

Затем мы выполняем проектирование функций, создавая фиктивные столбцы для столбцов Geography и Gender с помощью одно горячей кодировки.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Здесь мы используем одно-горячую кодировку для преобразования категориальных столбцов в двоичные фиктивные столбцы, что делает их подходящими для алгоритмов машинного обучения.

Отображение чистых данных

Наконец, мы отображаем чистый и встроенный набор данных с помощью функции отображения.


display(df_clean)

Этот шаг позволяет проверить результирующий кадр данных с примененными преобразованиями.

Сохранение в Lakehouse

Теперь мы сохраним очищенный и встроенный набор данных в lakehouse.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Здесь мы берем чистый и преобразованный Кадр df_cleanданных PySpark, и сохраните его в виде таблицы Delta с именем "churn_data_clean" в лейкхаусе. Мы используем разностный формат для эффективного управления версиями и управления набором данных. Гарантирует mode("overwrite") , что любая существующая таблица с тем же именем перезаписывается и создается новая версия таблицы.

Создание тестовых и обучающих наборов данных

Затем мы создадим тестовые и обучающие наборы данных из очищенных и встроенных данных.

В предоставленном разделе кода мы загружаем чистый и встроенный набор данных из lakehouse с помощью разностного формата, разделяем его на наборы обучения и тестирования с коэффициентом 80-20 и подготавливаем данные для машинного обучения. Эта подготовка включает импорт VectorAssembler из PySpark ML для объединения столбцов признаков в один столбец "функции". Впоследствии мы используем VectorAssembler для преобразования наборов данных для обучения и тестирования, что приводит к train_datatest_data тому, что кадры данных содержат целевую переменную "Exited" и векторы признаков. Эти наборы данных теперь готовы к использованию в создании и оценке моделей машинного обучения.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Обучение базовой модели

С помощью признаков данных мы обучим базовую модель машинного обучения, настройте MLflow для отслеживания экспериментов, определите функцию прогнозирования для вычисления метрик и, наконец, просмотрите и зайдите в журнал итоговую оценку ROC AUC.

Настройка уровня ведения журнала

Здесь мы настраиваем уровень ведения журнала для подавления ненужных выходных данных из библиотеки Synapse.ml, сохраняя журналы более чистыми.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Настройка MLflow

В этом разделе мы настроим MLflow для отслеживания экспериментов. Присвойте имени эксперимента значение "automl_sample", чтобы упорядочить запуски. Кроме того, мы включите автоматическое ведение журнала, обеспечивая автоматический вход параметров модели, метрик и артефактов в MLflow.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Обучение и оценка модели

Наконец, мы обучаем модель LightGBMClassifier на предоставленных обучающих данных. Модель настроена с необходимыми параметрами для двоичной классификации и обработки дисбаланса. Затем мы используем обученную модель для прогнозирования на тестовых данных. Мы извлекаем прогнозируемые вероятности для положительного класса и истинные метки из тестовых данных. Затем мы вычислим оценку AUC ROC с помощью функции sklearn roc_auc_score .

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Здесь мы видим, что полученная модель достигает оценки AUC ROC на 84%.

Создание пробной версии AutoML с помощью FLAML

В этом разделе мы создадим пробную версию AutoML с помощью пакета FLAML, настройте параметры пробной версии, преобразуем набор данных Spark в набор данных Pandas в Spark, запустите пробную версию AutoML и просмотрите полученные метрики.

Настройка пробной версии AutoML

Здесь мы импортируем необходимые классы и модули из пакета FLAML и создадим экземпляр AutoML, который будет использоваться для автоматизации конвейера машинного обучения.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Настройка параметров

В этом разделе мы определим параметры конфигурации для пробной версии AutoML.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Преобразование в Pandas в Spark

Чтобы запустить AutoML с набором данных на основе Spark, необходимо преобразовать его в набор данных Pandas в набор данных Spark с помощью to_pandas_on_spark функции. Это позволяет FLAML эффективно работать с данными.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Запуск пробной версии AutoML

Теперь мы выполняем пробную версию AutoML. Мы используем вложенный запуск MLflow для отслеживания эксперимента в существующем контексте выполнения MLflow. Пробная версия AutoML выполняется на Pandas в наборе данных Spark (df_automl) с целевой переменной "Exited и определенные параметры передаются fit в функцию для настройки.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Просмотр результирующей метрики

В этом заключительном разделе мы извлекаем и отображаем результаты пробной версии AutoML. Эти метрики предоставляют аналитические сведения о производительности и конфигурации модели AutoML в заданном наборе данных.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Параллелизация пробной версии AutoML с помощью Apache Spark

В сценариях, когда набор данных может входить в один узел, и вы хотите использовать возможности Spark для одновременного запуска нескольких параллельных пробных версий AutoML, выполните следующие действия.

Преобразование в кадр данных Pandas

Чтобы включить параллелизацию, данные необходимо сначала преобразовать в кадр данных Pandas.

pandas_df = train_raw.toPandas()

Здесь мы преобразуем train_raw кадр данных Spark в кадр данных Pandas DataFrame с именем pandas_df , чтобы сделать его подходящим для параллельной обработки.

Настройка параметров параллелизации

Установите для use_sparkTrue включения параллелизма на основе Spark. По умолчанию FLAML запускает одну пробную версию для каждого исполнителя. Число одновременных пробных версий можно настроить с помощью аргумента n_concurrent_trials .

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

В этих параметрах мы указываем, что мы хотим использовать Spark для параллелизма, задав для этого use_spark значение True. Мы также задали число параллельных пробных версий 3, что означает, что три пробные версии будут выполняться параллельно в Spark.

Дополнительные сведения о параллелизации маршрутов AutoML см. в документации FLAML для параллельных заданий Spark.

Параллельное выполнение пробной версии AutoML

Теперь мы будем запускать пробную версию AutoML параллельно с указанными параметрами. Мы будем использовать вложенный запуск MLflow для отслеживания эксперимента в существующем контексте выполнения MLflow.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Теперь будет выполнена пробная версия AutoML с включенной параллелизацией. Аргумент dataframe имеет значение Pandas DataFrame pandas_df, а другие параметры передаются функции для параллельного fit выполнения.

Просмотр метрик

После выполнения параллельной пробной версии AutoML извлеките и отобразите результаты, включая лучшую конфигурацию гиперпараметров, ROC AUC для данных проверки и продолжительность обучения наилучшего выполнения.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))

Следующие шаги