สร้างแบบจําลองด้วย ML อัตโนมัติ (ตัวอย่าง)
การเรียนรู้ของเครื่องอัตโนมัติ (AutoML) ครอบคลุมชุดของเทคนิคและเครื่องมือที่ออกแบบมาเพื่อปรับปรุงกระบวนการของการฝึกอบรมและปรับแบบจําลองการเรียนรู้ของเครื่องให้เหมาะสมด้วยการแทรกแซงของมนุษย์น้อยที่สุด วัตถุประสงค์หลักของ AutoML คือเพื่อลดความซับซ้อนและเร่งการเลือกแบบจําลองการเรียนรู้ของเครื่องที่เหมาะสมที่สุดและ hyperparameters สําหรับชุดข้อมูลที่กําหนดงานที่โดยทั่วไปต้องการความเชี่ยวชาญและทรัพยากรการคํานวณจํานวนมาก ภายในเฟรมเวิร์ก Fabric นักวิทยาศาสตร์ข้อมูลสามารถใช้ประโยชน์จาก flaml.AutoML
โมดูลเพื่อปรับแง่มุมต่าง ๆ ของเวิร์กโฟลว์การเรียนรู้ของเครื่องให้เป็นอัตโนมัติ
ในบทความนี้ เราจะเจาะลึกกระบวนการสร้างการทดลองใช้ AutoML โดยตรงจากโค้ดโดยใช้ชุดข้อมูล Spark นอกจากนี้ เราจะสํารวจวิธีการสําหรับการแปลงข้อมูลนี้เป็นกรอบข้อมูล Pandas และพูดคุยเกี่ยวกับเทคนิคสําหรับการทดลองใช้งานการทดลองของคุณแบบขนานกัน
สำคัญ
คุณลักษณะนี้อยู่ในตัวอย่าง
ข้อกำหนดเบื้องต้น
รับการสมัครใช้งาน Microsoft Fabric หรือลงทะเบียนเพื่อทดลองใช้งาน Microsoft Fabric ฟรี
ลงชื่อเข้าใช้ Microsoft Fabric
ใช้ตัวสลับประสบการณ์ทางด้านซ้ายของโฮมเพจของคุณเพื่อสลับไปยังประสบการณ์วิทยาศาสตร์ข้อมูล Synapse
- สร้างสภาพแวดล้อม Fabric ใหม่ หรือตรวจสอบว่าคุณกําลังทํางานบน Fabric Runtime 1.2 (Spark 3.4 (หรือสูงกว่า) และ Delta 2.4)
- สร้าง สมุดบันทึกใหม่
- แนบสมุดบันทึกของคุณเข้ากับเลคเฮ้าส์ ทางด้านซ้ายของสมุดบันทึกของคุณ ให้เลือก เพิ่ม เพื่อเพิ่มเลคเฮ้าส์ที่มีอยู่แล้ว หรือสร้างขึ้นใหม่
โหลดและเตรียมข้อมูล
ในส่วนนี้ เราจะระบุการตั้งค่าการดาวน์โหลดสําหรับข้อมูล และจากนั้นบันทึกไปยังเลคเฮ้าส์
ดาวน์โหลดข้อมูล
บล็อกรหัสนี้จะดาวน์โหลดข้อมูลจากแหล่งข้อมูลระยะไกลและบันทึกไปยังเลคเฮ้าส์
import os
import requests
IS_CUSTOM_DATA = False # if TRUE, dataset has to be uploaded manually
if not IS_CUSTOM_DATA:
# Specify the remote URL where the data is hosted
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
# List of data files to download
file_list = ["churn.csv"]
# Define the download path within the lakehouse
download_path = "/lakehouse/default/Files/churn/raw"
# Check if the lakehouse directory exists; if not, raise an error
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
# Create the download directory if it doesn't exist
os.makedirs(download_path, exist_ok=True)
# Download each data file if it doesn't already exist in the lakehouse
for fname in file_list:
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
โหลดข้อมูลลงในกรอบข้อมูล Spark
บล็อกรหัสต่อไปนี้โหลดข้อมูลจากไฟล์ CSV ลงใน Spark DataFrame และแคชเพื่อการประมวลผลที่มีประสิทธิภาพ
df = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv("Files/churn/raw/churn.csv")
.cache()
)
รหัสนี้ถือว่ามีการดาวน์โหลดไฟล์ข้อมูลแล้วและอยู่ในเส้นทางที่ระบุ ซึ่งจะอ่านไฟล์ CSV ลงใน Spark DataFrame อนุมาน schema และแคชสําหรับการเข้าถึงได้เร็วขึ้นในระหว่างการดําเนินการที่ตามมา
เตรียมข้อมูล
ในส่วนนี้ เราจะดําเนินการทําความสะอาดข้อมูลและวิศวกรรมคุณลักษณะบนชุดข้อมูล
ทำความสะอาดข้อมูล
ก่อนอื่น เรากําหนดฟังก์ชันเพื่อทําความสะอาดข้อมูล ซึ่งรวมถึงการทิ้งแถวที่มีข้อมูลที่ขาดหายไป ลบแถวที่ซ้ํากันโดยยึดตามคอลัมน์ที่ระบุ และลบคอลัมน์ที่ไม่จําเป็นออก
# Define a function to clean the data
def clean_data(df):
# Drop rows with missing data across all columns
df = df.dropna(how="all")
# Drop duplicate rows based on 'RowNumber' and 'CustomerId'
df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
# Drop columns: 'RowNumber', 'CustomerId', 'Surname'
df = df.drop('RowNumber', 'CustomerId', 'Surname')
return df
# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")
# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)
ฟังก์ชัน clean_data
นี้ช่วยให้มั่นใจว่าชุดข้อมูลไม่มีค่าและรายการซ้ําที่ขาดหายไปในขณะที่ลบคอลัมน์ที่ไม่จําเป็นออก
วิศวกรรมคุณลักษณะ
ถัดไป เราจะดําเนินการวิศวกรรมคุณลักษณะโดยการสร้างคอลัมน์ตัวอย่างสําหรับคอลัมน์ 'ภูมิศาสตร์' และ 'เพศ' โดยใช้การเข้ารหัสที่ร้อนเพียงตัวเดียว
# Import PySpark functions
from pyspark.sql import functions as F
# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
"*",
F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)
# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")
ที่นี่เราใช้การเข้ารหัสแบบหนึ่งร้อนเพื่อแปลงคอลัมน์ตามประเภทเป็นคอลัมน์ตัวอย่างไบนารีทําให้เหมาะสําหรับอัลกอริทึมการเรียนรู้ของเครื่อง
แสดงข้อมูลที่ได้รับการทําความสะอาดแล้ว
ในตอนท้าย เราแสดงชุดข้อมูลที่ได้รับการทําความสะอาดและออกแบบด้วยคุณลักษณะโดยใช้ฟังก์ชันแสดงผล
display(df_clean)
ขั้นตอนนี้ช่วยให้คุณสามารถตรวจสอบ DataFrame ที่เกิดขึ้นด้วยการแปลงที่ใช้
บันทึกไปยังเลคเฮาส์
ในตอนนี้ เราจะบันทึกชุดข้อมูลที่ได้รับการทําความสะอาดและออกแบบมาสําหรับวิศกรไปยังเลคเฮ้าส์
# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")
ที่นี่ เรานําการทําความสะอาดและการแปลง PySpark DataFrame และ df_clean
บันทึกเป็นตาราง Delta ที่ชื่อ "churn_data_clean" ในเลคเฮ้าส์ เราใช้รูปแบบ Delta สําหรับการกําหนดรุ่นและการจัดการที่มีประสิทธิภาพของชุดข้อมูล mode("overwrite")
ทําให้แน่ใจว่าตารางใด ๆ ที่มีอยู่ที่มีชื่อเดียวกันถูกเขียนทับ และเวอร์ชันใหม่ของตารางจะถูกสร้างขึ้น
สร้างชุดข้อมูลการทดสอบและการฝึกอบรม
ถัดไป เราจะสร้างชุดข้อมูลการทดสอบและการฝึกอบรมจากข้อมูลที่ได้รับการทําความสะอาดและออกแบบโดยคุณลักษณะ
ในส่วนโค้ดที่ให้มา เราโหลดชุดข้อมูลที่ได้รับการทําความสะอาดและออกแบบคุณลักษณะจาก lakehouse โดยใช้รูปแบบ Delta แยกชุดข้อมูลลงในการฝึกอบรมและชุดการทดสอบด้วยอัตราส่วน 80-20 และเตรียมข้อมูลสําหรับการเรียนรู้ของเครื่อง การเตรียมการนี้เกี่ยวข้องกับการนําเข้า VectorAssembler
จาก PySpark ML เพื่อรวมคอลัมน์คุณลักษณะลงในคอลัมน์ "คุณลักษณะ" เดียว ต่อมา เราใช้ VectorAssembler
เพื่อแปลงชุดข้อมูลการฝึกอบรมและการทดสอบ ส่งผลให้ train_data
test_data
DataFrame ที่มีตัวแปรเป้าหมาย "Exited" และเวกเตอร์คุณลักษณะ ชุดข้อมูลเหล่านี้พร้อมสําหรับการใช้งานในการสร้างและประเมินแบบจําลองการเรียนรู้ของเครื่องแล้ว
# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler
# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")
# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)
# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]
# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]
ฝึกแบบจําลองข้อมูลพื้นฐาน
ด้วยการใช้ข้อมูลที่ถูกแนะนํา เราจะฝึกแบบจําลองการเรียนรู้ของเครื่องพื้นฐาน กําหนดค่า MLflow สําหรับการติดตามการทดลอง กําหนดฟังก์ชันการคาดการณ์สําหรับการคํานวณเมตริก และสุดท้าย ดูและบันทึกคะแนน ROC AUC ที่เกิดขึ้น
ตั้งค่าระดับการบันทึก
ที่นี่ เรากําหนดค่าระดับการบันทึกเพื่อระงับเอาต์พุตที่ไม่จําเป็นจากไลบรารี Synapse.ml รักษาตัวล้างบันทึก
import logging
logging.getLogger('synapse.ml').setLevel(logging.ERROR)
กําหนดค่า MLflow
ในส่วนนี้ เรากําหนดค่า MLflow สําหรับการติดตามการทดลอง เราตั้งชื่อการทดลองเป็น "automl_sample" เพื่อจัดระเบียบการทํางาน นอกจากนี้ เรายังเปิดใช้งานการบันทึกอัตโนมัติ เพื่อให้แน่ใจว่าพารามิเตอร์แบบจําลอง เมตริก และวัตถุจะถูกบันทึกไปยัง MLflow โดยอัตโนมัติ
import mlflow
# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)
ฝึกและประเมินแบบจําลอง
ในตอนท้าย เราฝึกแบบจําลอง LightGBMClassifier บนข้อมูลการฝึกที่ให้มา แบบจําลองได้รับการกําหนดค่าด้วยการตั้งค่าที่จําเป็นสําหรับการจัดประเภทไบนารีและการจัดการความไม่สมดุล จากนั้นเราใช้แบบจําลองที่ได้รับการฝึกนี้เพื่อทําการคาดการณ์ข้อมูลทดสอบ เราแยกความน่าจะเป็นที่คาดการณ์สําหรับคลาสบวกและป้ายชื่อจริงจากข้อมูลทดสอบ หลังจากนั้นเราคํานวณคะแนน ROC AUC โดยใช้ฟังก์ชันของ roc_auc_score
sklearn
from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score
# Assuming you have already defined 'train_data' and 'test_data'
with mlflow.start_run(run_name="default") as run:
# Create a LightGBMClassifier model with specified settings
model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
# Fit the model to the training data
model = model.fit(train_data)
# Get the predictions
predictions = model.transform(test_data)
# Extract the predicted probabilities for the positive class
y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()
# Extract the true labels from the 'test_data' DataFrame
y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()
# Compute the ROC AUC score
roc_auc = roc_auc_score(y_true, y_pred)
# Log the ROC AUC score with MLflow
mlflow.log_metric("ROC_AUC", roc_auc)
# Print or log the ROC AUC score
print("ROC AUC Score:", roc_auc)
จากที่นี่ เราจะเห็นว่าแบบจําลองผลลัพธ์ของเราให้คะแนน ROC AUC ที่ 84%
สร้างการทดลองใช้ AutoML ด้วย FLAML
ในส่วนนี้ เราจะสร้างการทดลองใช้ AutoML โดยใช้แพคเกจ FLAML กําหนดค่าการตั้งค่ารุ่นทดลองใช้ แปลงชุดข้อมูล Spark เป็น Pandas บนชุดข้อมูล Spark เรียกใช้การทดลองใช้ AutoML และดูเมตริกที่เป็นผลลัพธ์
กําหนดค่าการทดลองใช้ AutoML
ที่นี่ เรานําเข้าคลาสและโมดูลที่จําเป็นจากแพคเกจ FLAML และสร้างอินสแตนซ์ของ AutoML ซึ่งจะถูกใช้เพื่อทําให้ไปป์ไลน์การเรียนรู้ของเครื่องเป็นอัตโนมัติ
# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark
# Create an AutoML instance
automl = AutoML()
กําหนดค่าการตั้งค่า
ในส่วนนี้ เรากําหนดการตั้งค่าการกําหนดค่าสําหรับการทดลองใช้ AutoML
# Define AutoML settings
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"log_file_name": 'flaml_experiment.log', # FLAML log file
"seed": 41, # Random seed
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
แปลงเป็น Pandas บน Spark
เมื่อต้องการเรียกใช้ AutoML ด้วยชุดข้อมูลที่ใช้ Spark เราจําเป็นต้องแปลงเป็น Pandas บนชุดข้อมูล Spark โดยใช้ to_pandas_on_spark
ฟังก์ชัน ซึ่งทําให้ FLAML ทํางานกับข้อมูลได้อย่างมีประสิทธิภาพ
# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)
เรียกใช้การทดลองใช้ AutoML
ในตอนนี้ เราดําเนินการทดลองใช้ AutoML เราใช้ MLflow ที่ซ้อนกันเพื่อติดตามการทดลองภายในบริบทการเรียกใช้ MLflow ที่มีอยู่ การทดลองใช้ AutoML จะดําเนินการบน Pandas บนชุดข้อมูล Spark (df_automl
) ที่มีตัวแปรเป้าหมาย "Exited
และการตั้งค่าที่กําหนดไว้จะถูกส่งผ่านไปยัง fit
ฟังก์ชันสําหรับการกําหนดค่า
'''The main flaml automl API'''
with mlflow.start_run(nested=True):
automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)
ดูเมตริกที่เป็นผลลัพธ์
ในส่วนสุดท้ายนี้ เราเรียกใช้และแสดงผลลัพธ์ของการทดลองใช้ AutoML เมตริกเหล่านี้ให้ข้อมูลเชิงลึกเกี่ยวกับประสิทธิภาพการทํางานและการกําหนดค่าของแบบจําลอง AutoML บนชุดข้อมูลที่ระบุ
# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))
รวมการทดลองใช้ AutoML ของคุณแบบคู่ขนานกับ Apache Spark
ในสถานการณ์ที่ชุดข้อมูลของคุณสามารถใส่ได้พอดีกับโหนดเดียวและคุณต้องการใช้ประโยชน์จากพลังของ Spark สําหรับการเรียกใช้การทดลองใช้ AutoML แบบขนานหลายรายการพร้อมกัน คุณสามารถทําตามขั้นตอนเหล่านี้ได้:
แปลงเป็นดาต้าเฟรมของ Pandas
เมื่อต้องการเปิดใช้งานแบบขนาน ข้อมูลของคุณต้องถูกแปลงเป็น DataFrame ของ Pandas ก่อน
pandas_df = train_raw.toPandas()
ที่นี่ เราแปลง train_raw
Spark DataFrame เป็น Pandas DataFrame ที่มี pandas_df
ชื่อว่าเพื่อให้เหมาะสําหรับการประมวลผลแบบขนาน
กําหนดค่าการตั้งค่าแบบขนาน
ตั้งค่า use_spark
เป็น True
เพื่อเปิดใช้งานการทํางานแบบขนานตาม Spark ตามค่าเริ่มต้น FLAML จะเปิดใช้หนึ่งรุ่นทดลองใช้ต่อเครื่องปฏิบัติการ คุณสามารถกําหนดจํานวนการทดลองใช้พร้อมกันโดยใช้ n_concurrent_trials
อาร์กิวเมนต์ได้
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"seed": 41, # Random seed
"use_spark": True, # Enable Spark-based parallelism
"n_concurrent_trials": 3, # Number of concurrent trials to run
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
ในการตั้งค่าเหล่านี้ เราระบุว่าเราต้องการใช้ Spark สําหรับความขนานโดยการตั้งค่าuse_spark
เป็นTrue
นอกจากนี้เรายังตั้งค่าจํานวนการทดลองใช้พร้อมกันเป็น 3 ซึ่งหมายความว่าการทดลองใช้สามตัวจะทํางานคู่ขนานบน Spark
เมื่อต้องการเรียนรู้เพิ่มเติมเกี่ยวกับวิธีการขนานกับเส้นทาง AutoML ของคุณ คุณสามารถเยี่ยมชม เอกสาร FLAML สําหรับงาน Spark ขนาน
เรียกใช้การทดลองใช้ AutoML แบบขนาน
ในตอนนี้ เราจะเรียกใช้การทดลองใช้ AutoML ควบคู่ไปกับการตั้งค่าที่ระบุ เราจะใช้การเรียกใช้ MLflow ที่ซ้อนกันเพื่อติดตามการทดลองภายในบริบทการเรียกใช้ MLflow ที่มีอยู่
'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
automl.fit(dataframe=pandas_df, label='Exited', **settings)
การดําเนินการนี้จะเป็นการดําเนินการทดลองใช้ AutoML ด้วยการเปิดใช้งานแบบขนาน อาร์กิวเมนต์ dataframe
ถูกตั้งค่าเป็น Pandas DataFrame pandas_df
และการตั้งค่าอื่น ๆ จะถูกส่งผ่านไปยัง fit
ฟังก์ชันสําหรับการดําเนินการแบบขนาน
ดูเมตริก
หลังจากเรียกใช้การทดลองใช้ AutoML แบบขนาน แล้วให้เรียกใช้และแสดงผลลัพธ์รวมถึงการกําหนดค่า hyperparameter ที่ดีที่สุด ROC AUC ในข้อมูลการตรวจสอบความถูกต้องและระยะเวลาการฝึกอบรมของการเรียกใช้ที่มีประสิทธิภาพที่ดีที่สุด
''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))