Dela via


Skapa en Apache Spark-maskininlärningspipeline

Apache Sparks skalbara maskininlärningsbibliotek (MLlib) ger modelleringsfunktioner till en distribuerad miljö. Spark-paketet spark.ml är en uppsättning högnivå-API:er som bygger på DataFrames. Dessa API:er hjälper dig att skapa och finjustera praktiska maskininlärningspipelines. Spark-maskininlärning refererar till det här MLlib DataFrame-baserade API:et, inte det äldre RDD-baserade pipeline-API:et.

En pipeline för maskininlärning (ML) är ett komplett arbetsflöde som kombinerar flera maskininlärningsalgoritmer tillsammans. Det kan krävas många steg för att bearbeta och lära av data, vilket kräver en sekvens med algoritmer. Pipelines definierar faser och ordning för en maskininlärningsprocess. I MLlib representeras faserna i en pipeline av en specifik sekvens med PipelineStages, där en transformerare och en beräknare utför uppgifter.

En Transformerare är en algoritm som transformerar en DataFrame till en annan med hjälp transform() av metoden . En funktionstransformator kan till exempel läsa en kolumn i en DataFrame, mappa den till en annan kolumn och mata ut en ny DataFrame med den mappade kolumnen tillagd.

En beräknare är en abstraktion av inlärningsalgoritmer och ansvarar för att anpassa eller träna på en datamängd för att skapa en transformerare. En beräknare implementerar en metod med namnet fit(), som accepterar en DataFrame och skapar en DataFrame, som är en transformerare.

Varje tillståndslös instans av en transformerare eller en beräknare har sin egen unika identifierare, som används när parametrar anges. Båda använder ett enhetligt API för att ange dessa parametrar.

Pipelineexempel

För att demonstrera en praktisk användning av en ML-pipeline använder det här exemplet exempeldatafilen HVAC.csv som är förinstallerad på standardlagringen för ditt HDInsight-kluster, antingen Azure Storage eller Data Lake Storage. Om du vill visa innehållet i filen går du till /HdiSamples/HdiSamples/SensorSampleData/hvac katalogen . HVAC.csv innehåller en uppsättning tider med både mål- och faktiska temperaturer för HVAC-system (uppvärmning, ventilation och luftkonditionering) i olika byggnader. Målet är att träna modellen på data och skapa en prognostemperatur för en viss byggnad.

Följande kod:

  1. Definierar en LabeledDocument, som lagrar BuildingID, SystemInfo (ett systems identifierare och ålder) och en label (1,0 om byggnaden är för varm, 0,0 annars).
  2. Skapar en anpassad parserfunktion parseDocument som tar en rad med data och avgör om byggnaden är "varm" genom att jämföra måltemperaturen med den faktiska temperaturen.
  3. Använder parsern när källdata extraheras.
  4. Skapar träningsdata.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID

LabeledDocument = Row("BuildingID", "SystemInfo", "label")

# Define a function that parses the raw CSV file and returns an object of type LabeledDocument


def parseDocument(line):
    values = [str(x) for x in line.split(',')]
    if (values[3] > values[2]):
        hot = 1.0
    else:
        hot = 0.0

    textValue = str(values[4]) + " " + str(values[5])

    return LabeledDocument((values[6]), textValue, hot)


# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile(
    "wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")

documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()

Den här exempelpipelinen har tre steg: Tokenizer och HashingTF (båda Transformatorer) och Logistic Regression (en beräknare). Extraherade och parsade data i training DataFrame flödar genom pipelinen när pipeline.fit(training) anropas.

  1. Den första fasen, Tokenizer, delar upp indatakolumnen SystemInfo (bestående av systemidentifieraren och åldersvärdena) i en words utdatakolumn. Den här nya words kolumnen läggs till i DataFrame.
  2. Det andra steget, HashingTF, konverterar den nya words kolumnen till funktionsvektorer. Den här nya features kolumnen läggs till i DataFrame. Dessa två första steg är Transformatorer.
  3. Den tredje fasen, LogisticRegression, är en beräknare och därför anropar pipelinen LogisticRegression.fit() metoden för att skapa en LogisticRegressionModel.
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

Om du vill se de nya words kolumnerna Tokenizer och features som lagts till av transformatorerna och HashingTF och ett exempel på LogisticRegression beräknaren kör du en PipelineModel.transform() metod på den ursprungliga DataFrame. I produktionskoden är nästa steg att skicka in en testdataram för att verifiera träningen.

peek = model.transform(training)
peek.show()

# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label|   words|            features|       rawPrediction|         probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|         4|     13 20|  0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...|       0.0|
|        17|      3 20|  0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...|       0.0|
|        18|     17 20|  1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...|       0.0|
|        15|      2 23|  0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...|       1.0|
|         3|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|         4|     13 28|  0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...|       0.0|
|         2|     12 24|  0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...|       1.0|
|        16|     20 26|  1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...|       0.0|
|         9|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|        12|       6 5|  0.0|  [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...|       0.0|
|        15|     10 17|  1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...|       1.0|
|         7|      2 11|  0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...|       0.0|
|        15|      14 2|  1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...|       0.0|
|         6|       3 2|  0.0|  [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...|       0.0|
|        20|     19 22|  0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...|       1.0|
|         8|     19 11|  0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...|       0.0|
|         6|      15 7|  0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...|       0.0|
|        13|      12 5|  0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...|       1.0|
|         4|      8 22|  0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...|       1.0|
|         7|      17 5|  0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...|       1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+

only showing top 20 rows

Objektet model kan nu användas för att göra förutsägelser. Det fullständiga exemplet på det här maskininlärningsprogrammet och stegvisa instruktioner för att köra det finns i Skapa Apache Spark-maskininlärningsprogram i Azure HDInsight.

Se även