Apache Spark Machine Learning-folyamat létrehozása
Az Apache Spark skálázható gépi tanulási kódtára (MLlib) modellezési képességeket biztosít egy elosztott környezethez. A Spark-csomag spark.ml
a DataFrame-ekre épülő, magas szintű API-k készlete. Ezek az API-k segítenek gyakorlati gépi tanulási folyamatok létrehozásában és finomhangolásában. A Spark machine learning erre az MLlib DataFrame-alapú API-ra utal, nem a régebbi RDD-alapú folyamat API-ra.
A gépi tanulási (ML) folyamat egy teljes munkafolyamat, amely több gépi tanulási algoritmust kombinál. Az adatok feldolgozásához és az adatokból való tanuláshoz számos lépés szükséges, amelyekhez algoritmusok sorozata szükséges. A folyamatok határozzák meg a gépi tanulási folyamatok fázisait és sorrendjét. Az MLlibben a folyamat szakaszait a PipelineStages egy adott sorozata képviseli, ahol egy transzformátor és egy becslő végzi a feladatokat.
A transzformátor olyan algoritmus, amely a metódussal átalakítja az egyik DataFrame-et egy transform()
másikra. Egy funkcióátalakító például beolvashatja egy DataFrame egyik oszlopát, leképezheti azt egy másik oszlopba, és ki tud adni egy új DataFrame-et, amelyhez hozzá van fűzve a megfeleltetett oszlop.
A Becslő a tanulási algoritmusok absztrakciója, és az adathalmazok átalakításhoz való illesztéséért vagy betanításáért felelős. A Becslő implementál egy nevű metódust fit()
, amely egy DataFrame-et fogad el, és létrehoz egy DataFrame-et, amely egy transzformátor.
A transzformátor vagy a becslő állapot nélküli példányai saját egyedi azonosítóval rendelkeznek, amelyet paraméterek megadásakor használnak. Mindkettő egységes API-t használ ezeknek a paramétereknek a megadásához.
Példa folyamatra
Az ML-folyamat gyakorlati használatának bemutatásához ez a példa a HDInsight-fürt alapértelmezett tárolójára előre betöltött mintaadatfájlt HVAC.csv
használja, az Azure Storage vagy Data Lake Storage. A fájl tartalmának megtekintéséhez lépjen a /HdiSamples/HdiSamples/SensorSampleData/hvac
könyvtárba. HVAC.csv
különböző épületek fűtési, szellőztetési és légkondicionálási rendszereinek cél- és tényleges hőmérsékletét is tartalmazza. A cél a modell betanítása az adatokon, és egy adott épület előrejelzési hőmérsékletének előállítása.
A következő kód:
- Definiál egy
LabeledDocument
, amely tárolja aBuildingID
,SystemInfo
(a rendszer azonosítója és kora) és egylabel
(1,0, ha az épület túl meleg, 0,0 egyébként). - Létrehoz egy egyéni elemzőfüggvényt
parseDocument
, amely egy sornyi adatot vesz fel, és a célhőmérséklet és a tényleges hőmérséklet összehasonlításával meghatározza, hogy az épület "meleg"-e. - Alkalmazza az elemzőt a forrásadatok kinyerésekor.
- Betanítási adatokat hoz létre.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID
LabeledDocument = Row("BuildingID", "SystemInfo", "label")
# Define a function that parses the raw CSV file and returns an object of type LabeledDocument
def parseDocument(line):
values = [str(x) for x in line.split(',')]
if (values[3] > values[2]):
hot = 1.0
else:
hot = 0.0
textValue = str(values[4]) + " " + str(values[5])
return LabeledDocument((values[6]), textValue, hot)
# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile(
"wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()
Ez a példafolyamat három fázisból áll: Tokenizer
és HashingTF
(mind transzformátorok), mind Logistic Regression
(becslő). A DataFrame-ben training
kinyert és elemzett adatok a meghívásakor pipeline.fit(training)
áthaladnak a folyamaton.
- Az első szakasz ()
Tokenizer
aSystemInfo
bemeneti oszlopot (amely a rendszerazonosítóból és az életkorértékekből áll) egywords
kimeneti oszlopra osztja. Ez az újwords
oszlop hozzá lesz adva a DataFrame-hez. - A második fázis,
HashingTF
a funkcióvektorokká alakítja az újwords
oszlopot. Ez az újfeatures
oszlop hozzá lesz adva a DataFrame-hez. Ez az első két fázis a Transformers. - A harmadik fázis( )
LogisticRegression
egy becslő, ezért a folyamat meghívja aLogisticRegression.fit()
metódust egyLogisticRegressionModel
előállításához.
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
A és transzformátorok Tokenizer
által hozzáadott új words
és features
oszlopok, valamint a LogisticRegression
becslő mintájának megtekintéséhez futtasson egy metódust PipelineModel.transform()
az eredeti DataFrame-en.HashingTF
Az éles kódban a következő lépés egy teszt DataFrame átadása a betanítás ellenőrzéséhez.
peek = model.transform(training)
peek.show()
# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label| words| features| rawPrediction| probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
| 4| 13 20| 0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...| 0.0|
| 17| 3 20| 0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...| 0.0|
| 18| 17 20| 1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...| 0.0|
| 15| 2 23| 0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...| 1.0|
| 3| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 4| 13 28| 0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...| 0.0|
| 2| 12 24| 0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...| 1.0|
| 16| 20 26| 1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...| 0.0|
| 9| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 12| 6 5| 0.0| [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...| 0.0|
| 15| 10 17| 1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...| 1.0|
| 7| 2 11| 0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...| 0.0|
| 15| 14 2| 1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...| 0.0|
| 6| 3 2| 0.0| [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...| 0.0|
| 20| 19 22| 0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...| 1.0|
| 8| 19 11| 0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...| 0.0|
| 6| 15 7| 0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...| 0.0|
| 13| 12 5| 0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...| 1.0|
| 4| 8 22| 0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...| 1.0|
| 7| 17 5| 0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...| 1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
only showing top 20 rows
Az model
objektum mostantól előrejelzések készítésére is használható. Ennek a gépi tanulási alkalmazásnak a teljes mintájáért és a futtatásához szükséges részletes utasításokért lásd: Apache Spark gépi tanulási alkalmazások létrehozása az Azure HDInsighton.