Udostępnij za pośrednictwem


Tworzenie potoku uczenia maszynowego Apache Spark

Skalowalna biblioteka uczenia maszynowego (MLlib) platformy Apache Spark oferuje możliwości modelowania w środowisku rozproszonym. Pakiet spark.ml Spark to zestaw interfejsów API wysokiego poziomu opartych na ramkach danych. Te interfejsy API ułatwiają tworzenie i dostrajanie praktycznych potoków uczenia maszynowego. Uczenie maszynowe platformy Spark odnosi się do tego interfejsu API opartego na DataFrame MLlib, a nie starszego interfejsu API potoku opartego na RDD.

Potok uczenia maszynowego to kompletny przepływ pracy łączący wiele algorytmów uczenia maszynowego. Przetwarzanie i uczenie się na podstawie danych może wymagać wielu kroków wymagających sekwencji algorytmów. Potoki definiują etapy i kolejność procesu uczenia maszynowego. W MLlib etapy przetwarzania są reprezentowane przez określoną sekwencję PipelineStages, gdzie Transformer i Estimator wykonują zadania.

Transformer to algorytm, który przekształca jedną ramkę danych na drugą, używając metody transform(). Na przykład funkcja przekształcania funkcji może odczytać jedną kolumnę ramki danych, zmapować ją na inną kolumnę i wyprowadzić nową ramkę danych z dołączonym do niej zamapowaną kolumną.

Narzędzie do szacowania to abstrakcja algorytmów uczenia i jest odpowiedzialna za dopasowywanie lub trenowanie zestawu danych w celu utworzenia transformatora. Narzędzie do szacowania implementuje metodę o nazwie fit(), która akceptuje ramkę danych i tworzy ramkę danych, która jest transformatorem.

Każde bezstanowe wystąpienie klasy Transformer lub Estimator ma własny unikatowy identyfikator, który jest używany podczas określania parametrów. Oba używają jednolitego interfejsu API do określania tych parametrów.

Przykład potoku

Aby zademonstrować praktyczne użycie potoku uczenia maszynowego, w tym przykładzie użyto przykładowego HVAC.csv pliku danych, który jest wstępnie załadowany do domyślnego magazynu dla klastra HDInsight, Azure Storage lub Data Lake Storage. Aby wyświetlić zawartość pliku, przejdź do /HdiSamples/HdiSamples/SensorSampleData/hvac katalogu. HVAC.csv zawiera zestaw czasów zarówno docelowych, jak i rzeczywistych temperatur dla systemów ogrzewania, wentylacji i klimatyzacji w różnych budynkach. Celem jest wytrenowanie modelu na danych i wygenerowanie prognozy temperatury dla danego budynku.

Następujący kod:

  1. Definiuje element LabeledDocument, który przechowuje BuildingID, SystemInfo (identyfikator systemu i wiek) oraz label (przyjmuje wartość 1,0, jeśli budynek jest zbyt gorący, w przeciwnym razie 0,0).
  2. Tworzy niestandardową funkcję parsera parseDocument, która przyjmuje linię (wiersz) danych i określa, czy budynek jest "gorący", porównując temperaturę docelową z rzeczywistą temperaturą.
  3. Stosuje analizator podczas wyodrębniania danych źródłowych.
  4. Tworzy dane szkoleniowe.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID

LabeledDocument = Row("BuildingID", "SystemInfo", "label")

# Define a function that parses the raw CSV file and returns an object of type LabeledDocument


def parseDocument(line):
    values = [str(x) for x in line.split(',')]
    if (values[3] > values[2]):
        hot = 1.0
    else:
        hot = 0.0

    textValue = str(values[4]) + " " + str(values[5])

    return LabeledDocument((values[6]), textValue, hot)


# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile(
    "wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")

documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()

Ten przykładowy potok ma trzy etapy: Tokenizer oraz HashingTF (oba jako Transformers) oraz Logistic Regression (Estimator). Wyodrębnione i przeanalizowane dane w ramce danych training przepływają przez potok, gdy wywołane jest pipeline.fit(training).

  1. Pierwszy etap , Tokenizerdzieli kolumnę wejściową SystemInfo (składającą się z identyfikatora systemu i wartości wieku) na kolumnę wyjściową words . Ta nowa words kolumna zostanie dodana do ramki danych.
  2. Drugi etap , HashingTFkonwertuje nową words kolumnę na wektory cech. Ta nowa features kolumna zostanie dodana do ramki danych. Te dwa pierwsze etapy to Transformatory.
  3. Trzeci etap, LogisticRegression, jest Estymatorem, a więc potok wywołuje metodę LogisticRegression.fit() w celu utworzenia LogisticRegressionModel.
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

Aby zobaczyć nowe kolumny words i features dodane przez transformatory Tokenizer i HashingTF, a także przykładowe działanie estymatora LogisticRegression, uruchom metodę PipelineModel.transform() na oryginalnej ramce danych. W kodzie produkcyjnym następnym krokiem będzie przekazanie testowego DataFrame w celu zweryfikowania procesu trenowania.

peek = model.transform(training)
peek.show()

# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label|   words|            features|       rawPrediction|         probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|         4|     13 20|  0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...|       0.0|
|        17|      3 20|  0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...|       0.0|
|        18|     17 20|  1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...|       0.0|
|        15|      2 23|  0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...|       1.0|
|         3|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|         4|     13 28|  0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...|       0.0|
|         2|     12 24|  0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...|       1.0|
|        16|     20 26|  1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...|       0.0|
|         9|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|        12|       6 5|  0.0|  [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...|       0.0|
|        15|     10 17|  1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...|       1.0|
|         7|      2 11|  0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...|       0.0|
|        15|      14 2|  1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...|       0.0|
|         6|       3 2|  0.0|  [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...|       0.0|
|        20|     19 22|  0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...|       1.0|
|         8|     19 11|  0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...|       0.0|
|         6|      15 7|  0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...|       0.0|
|        13|      12 5|  0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...|       1.0|
|         4|      8 22|  0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...|       1.0|
|         7|      17 5|  0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...|       1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+

only showing top 20 rows

Obiekt model może teraz służyć do przewidywania. Aby zapoznać się z pełnym przykładem tej aplikacji uczenia maszynowego i instrukcjami krok po kroku dotyczącymi jej uruchamiania, zobacz Tworzenie aplikacji uczenia maszynowego platformy Apache Spark w usłudze Azure HDInsight.

Zobacz też