Поделиться через


Создание конвейера машинного обучения Apache Spark

Масштабируемая библиотека машинного обучения Apache Spark (MLlib) предоставляет возможности моделирования в распределенную среду. Пакет spark.ml Spark — это набор высокоуровневых API, созданных на основе кадров данных. Эти API помогают создавать и настраивать практические конвейеры машинного обучения. Машинное обучение Spark относится к API MLlib, основанному на DataFrame, а не к старому API конвейера, основанному на RDD.

Конвейер машинного обучения — это полный рабочий процесс, объединяющий несколько алгоритмов машинного обучения. Для обработки и изучения данных может потребоваться много шагов, требующих последовательности алгоритмов. Конвейеры определяют этапы и порядок процесса машинного обучения. В MLlib этапы конвейера представлены определенной последовательностью PipelineStages, где преобразователь и оценщик каждый выполняет задачи.

Преобразователь — это алгоритм, который преобразует один кадр данных в другой с помощью transform() метода. Например, преобразователь признаков может считывать один столбец кадра данных, сопоставлять его с другим столбцом и выводить новый кадр данных с сопоставленным столбцом, добавленным к нему.

Оценщик — это абстракция алгоритмов обучения, и он отвечает за подгонку или обучение на наборе данных для создания преобразователя. Оценщик реализует метод с именем fit(), который принимает кадр данных и создает кадр данных, который является преобразователем.

Каждый экземпляр не отслеживающего состояния преобразователя или оценщика имеет собственный уникальный идентификатор, который используется при задании параметров. Оба используют универсальный API для указания этих параметров.

Пример конвейера

Для демонстрации практического использования конвейера машинного обучения в этом примере используется образец HVAC.csv файла данных, который уже загружен в хранилище по умолчанию для вашего кластера HDInsight, — либо в Azure Storage, либо в Data Lake Storage. Чтобы просмотреть содержимое файла, перейдите в /HdiSamples/HdiSamples/SensorSampleData/hvac каталог. HVAC.csv содержит набор времен с целевыми и фактическими температурами для систем ОВК (отопления, вентиляции и кондиционирования воздуха) в различных зданиях. Цель состоит в том, чтобы обучить модель на основе данных и создать прогнозную температуру для данного здания.

Следующий код:

  1. Определяет LabeledDocument, который сохраняет BuildingIDидентификатор SystemInfo системы и возраст системы, а также значение label (1.0, если здание слишком горячее, 0,0 в противном случае).
  2. Создает пользовательскую функцию parseDocument синтаксического анализа, которая принимает строку (строку) данных и определяет, является ли здание "горячим", сравнивая целевую температуру с фактической температурой.
  3. Применяет средство синтаксического анализа при извлечении исходных данных.
  4. Создает обучающие данные.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID

LabeledDocument = Row("BuildingID", "SystemInfo", "label")

# Define a function that parses the raw CSV file and returns an object of type LabeledDocument


def parseDocument(line):
    values = [str(x) for x in line.split(',')]
    if (values[3] > values[2]):
        hot = 1.0
    else:
        hot = 0.0

    textValue = str(values[4]) + " " + str(values[5])

    return LabeledDocument((values[6]), textValue, hot)


# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile(
    "wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")

documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()

В этом примере конвейера имеется три этапа: Tokenizer и HashingTF (как преобразователи), так и Logistic Regression (оценка). Извлеченные и проанализированные данные в training DataFrame передаются через конвейер, когда вызывается pipeline.fit(training).

  1. Первый этап Tokenizerразбивает SystemInfo входной столбец (состоящий из системного идентификатора и возрастных значений) на выходной words столбец. Этот новый words столбец добавляется в таблицу данных.
  2. Второй этап HashingTFпреобразует новый words столбец в векторы признаков. Этот новый features столбец добавляется в таблицу данных. Эти первые два этапа являются Трансформерами.
  3. Третий этап LogisticRegression, является оценивателем, и поэтому конвейер вызывает метод LogisticRegression.fit() для создания LogisticRegressionModel.
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

Чтобы увидеть новые столбцы words и features, добавленные преобразователями Tokenizer и HashingTF, а также пример оценивателя LogisticRegression, выполните метод PipelineModel.transform() на исходном DataFrame. В рабочем коде следующий шаг — передать тестовый кадр данных для проверки обучения.

peek = model.transform(training)
peek.show()

# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label|   words|            features|       rawPrediction|         probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|         4|     13 20|  0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...|       0.0|
|        17|      3 20|  0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...|       0.0|
|        18|     17 20|  1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...|       0.0|
|        15|      2 23|  0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...|       1.0|
|         3|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|         4|     13 28|  0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...|       0.0|
|         2|     12 24|  0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...|       1.0|
|        16|     20 26|  1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...|       0.0|
|         9|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|        12|       6 5|  0.0|  [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...|       0.0|
|        15|     10 17|  1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...|       1.0|
|         7|      2 11|  0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...|       0.0|
|        15|      14 2|  1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...|       0.0|
|         6|       3 2|  0.0|  [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...|       0.0|
|        20|     19 22|  0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...|       1.0|
|         8|     19 11|  0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...|       0.0|
|         6|      15 7|  0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...|       0.0|
|        13|      12 5|  0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...|       1.0|
|         4|      8 22|  0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...|       1.0|
|         7|      17 5|  0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...|       1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+

only showing top 20 rows

Теперь model объект можно использовать для прогнозирования. Полный пример этого приложения машинного обучения и пошаговые инструкции по его выполнению см. в статье "Создание приложений машинного обучения Apache Spark в Azure HDInsight".

См. также