Vytvoření modelu strojového učení pomocí knihovny Apache Spark MLlib

V tomto článku se dozvíte, jak pomocí knihovny Apache Spark MLlib vytvořit aplikaci strojového učení, která v otevřené datové sadě Azure provede jednoduchou prediktivní analýzu. Spark poskytuje integrované knihovny strojového učení. Tento příklad používá klasifikaci prostřednictvím logistické regrese.

SparkML a MLlib jsou základní knihovny Sparku, které poskytují mnoho nástrojů, které jsou užitečné pro úlohy strojového učení, včetně nástrojů vhodných pro:

  • Klasifikace
  • Regrese
  • Clustering
  • Modelování témat
  • Rozklad hodnoty s jednotným číslem (SVD) a analýza hlavních komponent (PCA)
  • Testování hypotéz a výpočet vzorové statistiky

Vysvětlení klasifikace a logistické regrese

Klasifikace, oblíbená úloha strojového učení, je proces řazení vstupních dat do kategorií. Je úlohou klasifikačního algoritmu zjistit, jak přiřadit popisky vstupním datům, která zadáte. Můžete si například představit algoritmus strojového učení, který přijímá informace o akciích jako vstup a rozděluje akcie do dvou kategorií: akcie, které byste měli prodávat, a akcie, které byste měli zachovat.

Logistická regrese je algoritmus, který můžete použít pro klasifikaci. Rozhraní API logistické regrese Sparku je užitečné pro binární klasifikaci nebo klasifikaci vstupních dat do jedné ze dvou skupin. Další informace o logistické regresi najdete na Wikipedii.

V souhrnu proces logistické regrese vytváří logistickou funkci , pomocí které můžete předpovědět pravděpodobnost, že vstupní vektor patří do jedné skupiny nebo druhé.

Příklad prediktivní analýzy dat taxislužby NYC

Začněte instalací azureml-opendatasets. Data jsou k dispozici prostřednictvím Azure Open Datasets. Tato podmnožina datové sady obsahuje informace o žlutých cestách taxíkem, včetně počátečního a koncového času a umístění, nákladů a dalších atributů.

%pip install azureml-opendatasets

Ve zbývající části tohoto článku použijeme Apache Spark k provedení určité analýzy dat tipu taxislužby NYC a pak vytvoříme model, který předpovídá, jestli konkrétní cesta obsahuje tip nebo ne.

Vytvoření modelu strojového učení Apache Sparku

  1. Vytvořte poznámkový blok PySpark. Pokyny najdete v tématu Vytvoření poznámkového bloku.

  2. Importujte typy požadované pro tento poznámkový blok.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. MLflow použijeme ke sledování experimentů strojového učení a odpovídajících spuštění. Pokud je povolené automatické protokolování Microsoft Fabric, zachytí se automaticky odpovídající metriky a parametry.

    import mlflow
    

Vytvoření vstupního datového rámce

V tomto příkladu načteme data do datového rámce Pandas a pak je převedeme na datový rámec Apache Spark. Pomocí tohoto formátu můžeme použít další operace Apache Sparku k vyčištění a filtrování datové sady.

  1. Spuštěním následujících řádků vytvořte datový rámec Sparku vložením kódu do nové buňky. Tento krok načte data prostřednictvím rozhraní API Open Datasets. Tato data můžeme filtrovat, abychom se podívali na konkrétní okno dat. Následující příklad kódu používá start_date a end_date použije filtr, který vrátí jeden měsíc dat.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Následující kód zmenšuje datovou sadu na přibližně 10 000 řádků. Abychom urychlili vývoj a trénování, vypíšeme teď naši datovou sadu.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. V dalším kroku se chceme podívat na naše data pomocí integrovaného display() příkazu. To nám umožňuje snadno zobrazit ukázku dat nebo prozkoumat trendy v datech graficky.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Příprava dat

Příprava dat je zásadním krokem v procesu strojového učení. Zahrnuje čištění, transformaci a uspořádání nezpracovaných dat, aby byla vhodná pro analýzu a modelování. V následujícím kódu provedete několik kroků přípravy dat:

  • Odebrání odlehlých hodnot a nesprávných hodnot filtrováním datové sady
  • Odebrání sloupců, které nejsou potřeba pro trénování modelu
  • Vytvoření nových sloupců z nezpracovaných dat
  • Vygenerujte popisek, který určí, jestli bude pro danou cestu taxíkem tip nebo ne.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Potom provedeme druhou předáním dat, abychom přidali konečné funkce.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Vytvoření logistického regresního modelu

Posledním úkolem je převést označená data do formátu, který je možné analyzovat prostřednictvím logistické regrese. Vstup do logistického regresního algoritmu musí být sada dvojic vektorů popisku/funkce, kde vektor funkce je vektorem čísel, který představuje vstupní bod.

Proto je potřeba převést sloupce kategorií na čísla. Konkrétně je potřeba převést trafficTimeBins sloupce weekdayString na celočíselné reprezentace. Převod je možné provést několika způsoby. Následující příklad používá OneHotEncoder přístup.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Výsledkem této akce je nový datový rámec se všemi sloupci ve správném formátu pro trénování modelu.

Trénování logistického regresního modelu

Prvním úkolem je rozdělit datovou sadu na trénovací sadu a testovací nebo ověřovací sadu.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Teď, když existují dva datové rámce, dalším úkolem je vytvořit vzorec modelu a spustit ho proti trénovacímu datovému rámci. Pak můžete ověřit testovací datový rámec. Experimentujte s různými verzemi vzorce modelu, abyste viděli dopad různých kombinací.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Výstupem z této buňky je:

Area under ROC = 0.9749430523917996

Vytvoření vizuální reprezentace předpovědi

Teď můžete vytvořit konečnou vizualizaci, která interpretuje výsledky modelu. Křivka ROC je jedním ze způsobů, jak zkontrolovat výsledek.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.