Создание модели машинного обучения с помощью Apache Spark MLlib

В этой статье описано, как с помощью Apache Spark MLlib создать приложение машинного обучения для проведения простого прогнозного анализа на основе открытого набора данных Azure. Apache Spark содержит встроенные библиотеки машинного обучения. В этом примере используется классификация с помощью логистической регрессии.

SparkML и MLlib — это основные библиотеки Spark, содержащие множество служебных программ, которые подходят для выполнения задач машинного обучения, в частности для таких:

  • Классификация
  • Регрессия
  • Кластеризация
  • тематического моделирования;
  • сингулярного разложения и анализа по методу главных компонент;
  • проверки гипотез и статистической выборки.

Общие сведения о классификации и логистической регрессии

Классификация — это распространенная задача машинного обучения, которая представляет собой процесс сортировки входных данных по категориям. Алгоритм классификации определяет принцип назначения меток предоставленным входным данным. Например, вы можете создать алгоритм машинного обучения, который принимает в качестве входных данных информацию об акциях и делит их на две категории: акции, которые следует продать, и акции, которые стоит оставить.

Логистическая регрессия — один из алгоритмов, который можно использовать для классификации. API Spark для логистической регрессии подходит для задач двоичной классификации или разделения входных данных на две группы. Дополнительные сведения о логистической регрессии см. на соответствующей вики-странице (на английском языке).

В общих словах, в основе логистической регрессии лежит некая логистическая функция, используемая для прогнозирования вероятности принадлежности вектора входных данных к одной или другой группе.

Пример прогнозного анализа на основе данных такси Нью-Йорка

Чтобы приступить к работе, установите azureml-opendatasets. Данные доступны через открытые наборы данных Azure. Этот подмножество набора данных содержит сведения о желтых поездках на такси, включая время начала и окончания, стоимость и другие атрибуты.

%pip install azureml-opendatasets

В остальной части этой статьи мы будем использовать Apache Spark для выполнения некоторого анализа данных о чаевых такси Нью-Йорка, а затем для разработки модели для прогнозирования того, включает ли конкретная поездка совет или нет.

Создание модели машинного обучения Apache Spark

  1. Создайте записную книжку PySpark. Инструкции см. в разделе Создание записной книжки.

  2. Импортируйте типы, необходимые для этой записной книжки.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Мы будем использовать MLflow для отслеживания экспериментов машинного обучения и соответствующих запусков. Если включена автоматическая запись Microsoft Fabric, соответствующие метрики и параметры автоматически записываются.

    import mlflow
    

Создание входного кадра данных

В этом примере мы загрузим данные в кадр данных Pandas, а затем преобразуем его в кадр данных Apache Spark. С помощью этого формата можно применить другие операции Apache Spark для очистки и фильтрации набора данных.

  1. Выполните приведенные ниже строки, чтобы создать кадр данных Spark, вставив код в новую ячейку. При этом данные извлекаются через API Открытых наборов данных. Мы можем отфильтровать эти данные, чтобы просмотреть определенное окно данных. В следующем примере кода с помощью параметров start_date и end_date применяется фильтр, который возвращает данные за один месяц.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Следующий код сокращает набор данных примерно до 10 000 строк. Чтобы ускорить разработку и обучение, мы рассмотрим наш набор данных.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Далее мы хотим ознакомиться с нашими данными с помощью встроенной display() команды. Это позволяет легко просматривать образец данных или изучать тенденции в данных графически.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Подготовка данных

Подготовка данных является важным шагом в процессе машинного обучения. Он включает очистку, преобразование и организацию необработанных данных, чтобы сделать его подходящим для анализа и моделирования. В следующем коде выполняется несколько шагов подготовки данных:

  • Удаление излителей и неправильных значений путем фильтрации набора данных
  • Удаление столбцов, которые не требуются для обучения модели
  • Создание новых столбцов из необработанных данных
  • Создайте метку, чтобы определить, будет ли совет или нет для данной поездки на такси
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Затем мы добавим вторую передачу данных, чтобы добавить окончательные функции.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Создание модели машинного обучения с помощью R

И последняя задача — преобразование данных с метками в формат, удобный для анализа методом логистической регрессии. Входные данные для алгоритма логистической регрессии должны представлять собой набор пар "метка — вектор признаков", где вектор признаков содержит числа, представляющие точку входных данных.

Поэтому необходимо преобразовать столбцы категорий в числа. А именно, столбцы trafficTimeBins и weekdayString нужно преобразовать в целочисленные представления. Есть несколько подходов к выполнению преобразования. В следующем примере используется OneHotEncoder подход.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Это действие позволяет создать новый кадр данных со всеми столбцами в правильном формате для обучения модели.

Обучение модели логистической регрессии

Первая задача — разделить набор данных на обучающий набор и тестовый или проверочный набор.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Теперь, когда есть два кадра данных, необходимо создать формулу модели и выполнить ее для обучающего кадра данных, Затем можно проверить наличие тестового кадра данных. Поэкспериментируйте с различными версиями формулы модели, чтобы оценить влияние разных сочетаний.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Выходные данные этой ячейки выглядят так:

Area under ROC = 0.9749430523917996

Создание визуального представления прогноза

Теперь можно создать окончательную визуализацию для интерпретации результатов модели. Кривая ROC является одним из способов проверки результата.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.