Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
V tomto článku se dozvíte, jak pomocí knihovny Apache Spark MLlib vytvořit aplikaci strojového učení, která v otevřené datové sadě Azure provede jednoduchou prediktivní analýzu. Spark poskytuje integrované knihovny strojového učení. Tento příklad používá klasifikaci prostřednictvím logistické regrese.
SparkML a MLlib jsou základní knihovny Sparku, které poskytují mnoho nástrojů, které jsou užitečné pro úlohy strojového učení, včetně nástrojů vhodných pro:
- Klasifikace
- Regrese
- Klastrování
- Modelování témat
- Rozklad hodnoty s jednotným číslem (SVD) a analýza hlavních komponent (PCA)
- Testování hypotéz a výpočet vzorové statistiky
Vysvětlení klasifikace a logistické regrese
Klasifikace, oblíbená úloha strojového učení, je proces řazení vstupních dat do kategorií. Je úlohou klasifikačního algoritmu zjistit, jak přiřadit popisky vstupním datům, která zadáte. Můžete si například představit algoritmus strojového učení, který přijímá informace o akciích jako vstup a rozděluje akcie do dvou kategorií: akcie, které byste měli prodávat, a akcie, které byste měli zachovat.
Logistická regrese je algoritmus, který můžete použít pro klasifikaci. Rozhraní API logistické regrese Sparku je užitečné pro binární klasifikaci nebo klasifikaci vstupních dat do jedné ze dvou skupin. Další informace o logistické regresi najdete na Wikipedii.
V souhrnu proces logistické regrese vytváří logistickou funkci , pomocí které můžete předpovědět pravděpodobnost, že vstupní vektor patří do jedné skupiny nebo druhé.
Příklad prediktivní analýzy dat taxislužby NYC
V tomto příkladu použijete Spark k provedení prediktivní analýzy spropitného za jízdy taxíkem z New Yorku. Data jsou k dispozici prostřednictvím Azure Open Datasets. Tato podmnožina datové sady obsahuje informace o žlutých cestách taxi, včetně informací o jednotlivých cestách, počátečním a koncovém čase a umístění, nákladech a dalších zajímavých atributech.
Důležité
Za získání těchto dat z úložného prostoru mohou být účtovány další poplatky.
V následujících krocích vytvoříte model, který předpovídá, jestli konkrétní cesta zahrnuje tip, nebo ne.
Vytvoření modelu strojového učení Apache Sparku
Vytvořte poznámkový blok pomocí jádra PySpark. Pokyny najdete v tématu Vytvoření poznámkového bloku.
Importujte typy požadované pro tuto aplikaci. Zkopírujte a vložte následující kód do prázdné buňky a stiskněte Shift+Enter. Nebo buňku spusťte pomocí modré ikony přehrávání vlevo od kódu
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluatorKvůli jádru PySpark nemusíte explicitně vytvářet žádné kontexty. Kontext Sparku se automaticky vytvoří za vás při spuštění první buňky kódu.
Vytvoření vstupního datového rámce
Vzhledem k tomu, že nezpracovaná data jsou ve formátu Parquet, můžete použít kontext Sparku k přímému načtení souboru do paměti jako datového rámce. I když kód v následujících krocích používá výchozí možnosti, je možné vynutit mapování datových typů a dalších atributů schématu v případě potřeby.
Spuštěním následujících řádků vytvořte datový rámec Sparku vložením kódu do nové buňky. Tento krok načte data prostřednictvím rozhraní API Open Datasets. Stažení všech těchto dat vygeneruje přibližně 1,5 miliardy řádků.
V závislosti na velikosti bezserverového fondu Apache Spark mohou být surová data příliš velká nebo jejich zpracování může trvat příliš dlouho. Tato data můžete filtrovat až na něco menšího. Následující příklad kódu používá
start_dateaend_datepoužije filtr, který vrátí jeden měsíc dat.from azureml.opendatasets import NycTlcYellow from datetime import datetime from dateutil import parser end_date = parser.parse('2018-05-08 00:00:00') start_date = parser.parse('2018-05-01 00:00:00') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) filtered_df = spark.createDataFrame(nyc_tlc.to_pandas_dataframe())Nevýhodou jednoduchého filtrování je, že z statistického hlediska může do dat zavádět předsudky. Dalším přístupem je použití vzorkování integrovaného do Sparku.
Následující kód zmenšuje datovou sadu na přibližně 2 000 řádků, pokud se použije po předchozím kódu. Tento krok vzorkování můžete použít místo jednoduchého filtru nebo ve spojení s jednoduchým filtrem.
# To make development easier, faster, and less expensive, downsample for now sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)Teď se můžete podívat na data, abyste viděli, co bylo přečteno. V závislosti na velikosti datové sady je obvykle lepší zkontrolovat data s podmnožinou, nikoli úplnou sadou.
Následující kód nabízí dva způsoby zobrazení dat. První způsob je základní. Druhý způsob poskytuje mnohem bohatší možnosti mřížky spolu s možností graficky vizualizovat data.
#sampled_taxi_df.show(5) display(sampled_taxi_df)V závislosti na velikosti vygenerované datové sady a často potřebujete experimentovat nebo spouštět poznámkový blok, můžete datovou sadu ukládat do mezipaměti místně v pracovním prostoru. Existují tři způsoby, jak provádět explicitní ukládání do mezipaměti:
- Datový rámec uložte místně jako soubor.
- Uložte datový rámec jako dočasnou tabulku nebo zobrazení.
- Uložte datový rámec jako trvalou tabulku.
První dva z těchto přístupů jsou zahrnuty v následujících příkladech kódu.
Vytvoření dočasné tabulky nebo zobrazení poskytuje různé přístupové cesty k datům, ale trvá pouze po dobu trvání relace instance Sparku.
sampled_taxi_df.createOrReplaceTempView("nytaxi")
Příprava dat
Data v nezpracované podobě často nejsou vhodná pro přímé předávání modelu. Abyste je získali do stavu, ve kterém ho model může využívat, musíte u dat provést řadu akcí.
V následujícím kódu provedete čtyři třídy operací:
- Odebrání odlehlých hodnot nebo nesprávných hodnot prostřednictvím filtrování.
- Odebrání sloupců, které nejsou potřeba.
- Vytvoření nových sloupců odvozených z nezpracovaných dat, aby model fungoval efektivněji. Tato operace se někdy označuje jako featurizace.
- Značení. Vzhledem k tomu, že provádíte binární klasifikaci (bude na dané cestě tip nebo ne), je potřeba převést částku tipu na hodnotu 0 nebo 1.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Potom provedete druhé předání dat a přidáte konečné funkce.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Vytvoření logistického regresního modelu
Posledním úkolem je převést označená data do formátu, který je možné analyzovat prostřednictvím logistické regrese. Vstup do logistického regresního algoritmu musí být sada dvojic vektorů popisku/funkce, kde vektor funkce je vektorem čísel, který představuje vstupní bod.
Proto je potřeba převést sloupce kategorií na čísla. Konkrétně je potřeba převést trafficTimeBins sloupce weekdayString na celočíselné reprezentace. Převod je možné provést několika způsoby. Následující příklad používá OneHotEncoder přístup, který je běžný.
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Výsledkem této akce je nový datový rámec se všemi sloupci ve správném formátu pro trénování modelu.
Trénování logistického regresního modelu
Prvním úkolem je rozdělit datovou sadu na trénovací sadu a testovací nebo ověřovací sadu. Rozdělení tady je libovolné. Experimentujte s různými rozdělenými nastaveními a zjistěte, jestli mají vliv na model.
# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Teď, když existují dva datové rámce, dalším úkolem je vytvořit vzorec modelu a spustit ho proti trénovacímu datovému rámci. Pak můžete ověřit testovací datový rámec. Experimentujte s různými verzemi vzorce modelu, abyste viděli dopad různých kombinací.
Poznámka:
Pokud chcete model uložit, přiřaďte roli Přispěvatel dat objektů blob služby Storage k oboru prostředků serveru Azure SQL Database. Podrobný postup najdete v tématu Přiřazování rolí Azure s využitím webu Azure Portal. Tento krok můžou provádět pouze členové s oprávněními vlastníka.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Saving the model is optional, but it's another form of inter-session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName
lrModel.save(logRegDirfilename)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
Výstupem z této buňky je:
Area under ROC = 0.9779470729751403
Vytvoření vizuální reprezentace předpovědi
Teď můžete vytvořit konečnou vizualizaci, která vám pomůže zdůvodnění výsledků tohoto testu. Křivka ROC je jedním ze způsobů, jak zkontrolovat výsledek.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Vypnutí instance Sparku
Po skončení běhu aplikace vypněte notebook a uvolněte prostředky zavřením záložky. Nebo vyberte Ukončit relaci ve stavovém panelu v dolní části notebooku.
Viz také
Další kroky
Poznámka:
Některá oficiální dokumentace k Apache Sparku závisí na používání konzoly Spark, která není dostupná v Apache Sparku ve službě Azure Synapse Analytics. Místo toho použijte prostředí poznámkového bloku nebo IntelliJ .