Verwenden von SparkR

Wichtig

Microsoft Fabric befindet sich derzeit in der Vorschauversion. Diese Informationen beziehen sich auf eine Vorabversion des Produkts, an der vor der Veröffentlichung noch wesentliche Änderungen vorgenommen werden können. Microsoft übernimmt keine Garantie, weder ausdrücklich noch stillschweigend, für die hier bereitgestellten Informationen.

SparkR ist ein R-Paket, das ein schlankes Front-End für die Nutzung von Apache Spark in R bereitstellt. SparkR ermöglicht eine verteilte Datenrahmenimplementierung, die Vorgänge wie Auswählen, Filtern, Aggregieren usw. unterstützt. SparkR unterstützt außerdem verteiltes maschinelles Lernen mithilfe von MLlib.

Wichtig

Microsoft Fabric befindet sich derzeit in der Vorschauversion. Diese Informationen beziehen sich auf eine Vorabversion des Produkts, an der vor der Veröffentlichung noch wesentliche Änderungen vorgenommen werden können. Microsoft übernimmt keine Garantie, weder ausdrücklich noch stillschweigend, für die hier bereitgestellten Informationen.

Verwenden Sie SparkR über Spark-Batchauftragsdefinitionen oder mit interaktiven Microsoft Fabric-Notebooks.

R-Unterstützung ist nur in Spark3.1 oder höher verfügbar. R in Spark 2.4 wird nicht unterstützt.

Voraussetzungen

  • Ein Power BI Premium-Abonnement. Wenn Sie noch keines haben, finden Sie weitere Informationen unter So erwerben Sie Power BI Premium.

  • Ein Power BI-Arbeitsbereich mit zugewiesener Premium-Kapazität. Wenn Sie keinen Arbeitsbereich haben, führen Sie die Schritte unter Erstellen eines Arbeitsbereichs aus, um einen Arbeitsbereich zu erstellen und ihn einer Premium-Kapazität zuzuweisen.

  • Melden Sie sich bei Microsoft Fabric an.

  • Öffnen oder erstellen Sie ein Notebook. Weitere Informationen finden Sie unter Verwenden von Microsoft Fabric-Notebooks.

  • Ändern Sie die primäre Sprache, indem Sie die Sprachoption auf SparkR (R) festlegen.

  • Schließen Sie Ihr Notebook an ein Lakehouse an. Wählen Sie auf der linken Seite Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein Lakehouse zu erstellen.

Lesen und Schreiben von SparkR DataFrames

Lesen eines SparkR-DataFrames aus einem lokalen R data.frame

Die einfachste Möglichkeit zum Erstellen eines DataFrames besteht darin, einen lokalen R data.frame in einen Spark-DataFrame zu konvertieren.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Lesen und Schreiben von SparkR DataFrame aus Lakehouse

Daten können im lokalen Dateisystem von Clusterknoten gespeichert werden. Die allgemeinen Methoden zum Lesen und Schreiben eines SparkR-DataFrames aus Lakehouse sind read.df und write.df. Diese Methoden verwenden den Pfad für die zu ladende Datei und den Typ der Datenquelle. SparkR unterstützt nativ das Lesen von CSV-, JSON-, Text- und Parquet-Dateien.

Um ein Lakehouse zu lesen und zu schreiben, fügen Sie es zuerst Ihrer Sitzung hinzu. Wählen Sie links im Notebook Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein Lakehouse zu erstellen.

Hinweis

Um mit Spark-Paketen wie read.df oder write.dfauf Lakehouse-Dateien zuzugreifen, verwenden Sie den ADFS-Pfad oder den relativen Pfad für Spark. Klicken Sie im Lakehouse-Explorer mit der rechten Maustaste auf die Dateien oder den Ordner, auf die Sie zugreifen möchten, und kopieren Sie den ADFS-Pfad oder den relativen Pfad für Spark aus dem Kontextmenü.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric ist tidyverse vorinstalliert. Sie können in Ihren vertrauten R-Paketen auf Lakehouse-Dateien zugreifen, z. B. Lesen und Schreiben von Lakehouse-Dateien mit readr::read_csv() und readr::write_csv().

Hinweis

Um mit R-Paketen auf Lakehouse-Dateien zuzugreifen, müssen Sie den Datei-API-Pfad verwenden. Klicken Sie im Lakehouse-Explorer mit der rechten Maustaste auf die Datei oder den Ordner, auf die Sie zugreifen möchten, und kopieren Sie den Datei-API-Pfad aus dem Kontextmenü.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

Sie können auch einen SparkR-Datenrahmen in Ihrem Lakehouse mithilfe von SparkSQL-Abfragen lesen.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Lesen und Schreiben von SQL-Tabellen über RODBC

Verwenden Sie RODBC, um eine Verbindung mit SQL-basierten Datenbanken über eine ODBC-Schnittstelle herzustellen. Sie können beispielsweise eine Verbindung mit einem dedizierten Synapse-SQL-Pool herstellen, wie im folgenden Beispielcode gezeigt. Ersetzen Sie Ihre eigenen Verbindungsdetails durch <database>, <uid>, <password>und <table>.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R date.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

DataFrame-Vorgänge

SparkR DataFrames unterstützen viele Funktionen zur strukturierten Datenverarbeitung. Es folgen einige einfache Beispiele. Eine vollständige Liste finden Sie in der SparkR-API-Dokumentation.

Auswählen von Zeilen und Spalten

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Gruppierung und Aggregation

SparkR-Datenrahmen unterstützen viele häufig verwendete Funktionen zum Aggregieren von Daten nach der Gruppierung. Beispielsweise können wir ein Histogramm der Wartezeit im originalgetreuen Dataset berechnen, wie unten gezeigt

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Spaltenvorgänge (Column operations)

SparkR bietet viele Funktionen, die direkt auf Spalten für die Datenverarbeitung und -aggregation angewendet werden können. Das folgende Beispiel zeigt die Verwendung grundlegender arithmetischer Funktionen.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Anwenden einer benutzerdefinierten Funktion

SparkR unterstützt verschiedene Arten von benutzerdefinierten Funktionen:

Ausführen einer Funktion für ein großes Dataset mit dapply oder dapplyCollect

dapply

Wenden Sie eine Funktion auf jede Partition eines an SparkDataFrame. Die Funktion, die auf jede Partition von SparkDataFrame und angewendet werden soll, sollte nur einen Parameter aufweisen, dem ein data.frame entspricht, der jeder Partition entspricht. Die Ausgabe der Funktion sollte eine data.framesein. Schema gibt das Zeilenformat des resultierenden ein an SparkDataFrame. Er muss mit den Datentypen des zurückgegebenen Werts übereinstimmen.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Wenden Sie wie dapply eine Funktion auf jede Partition eines an SparkDataFrame , und sammeln Sie das Ergebnis zurück. Die Ausgabe der Funktion sollte eine data.framesein. Dieses Mal muss das Schema jedoch nicht übergeben werden. Beachten Sie, dass dapplyCollect ein Fehler auftreten kann, wenn die Ausgaben der Funktion, die auf der gesamten Partition ausgeführt werden, nicht in den Treiber abgerufen werden können und in den Treiberspeicher passen.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Ausführen einer Funktion für ein großes Dataset, gruppieren nach Eingabespalten mit gapply oder gapplyCollect

gapply

Wenden Sie eine Funktion auf jede Gruppe eines an SparkDataFrame. Die Funktion soll auf jede Gruppe von SparkDataFrame angewendet werden und sollte nur zwei Parameter aufweisen: Gruppierungsschlüssel und R data.frame , die diesem Schlüssel entsprechen. Die Gruppen werden aus SparkDataFrames Spalten ausgewählt. Die Ausgabe der Funktion sollte eine data.framesein. Schema gibt das Zeilenformat des resultierenden SparkDataFramean. Sie muss das Ausgabeschema der R-Funktion aus Spark-Datentypen darstellen. Die Spaltennamen der zurückgegebenen data.frame werden vom Benutzer festgelegt.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Wie gapplywendet eine Funktion auf jede Gruppe von a an SparkDataFrame , und sammelt das Ergebnis wieder an R data.frame. Die Ausgabe der Funktion sollte eine data.framesein. Das Schema muss jedoch nicht übergeben werden. Beachten Sie, dass gapplyCollect ein Fehler auftreten kann, wenn die Ausgaben der Funktion, die auf der gesamten Partition ausgeführt werden, nicht in den Treiber abgerufen werden können und in den Treiberspeicher passen.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Ausführen lokaler R-Funktionen, verteilt mit spark.lapply

spark.lapply

lapply Führt ähnlich wie in nativer R spark.lapply eine Funktion über eine Liste von Elementen aus und verteilt die Berechnungen mit Spark. Wendet eine Funktion ähnlich wie elemente einer Liste andoParallel.lapply Die Ergebnisse aller Berechnungen sollten in einen einzelnen Computer passen. Wenn dies nicht der Fall ist, können sie so etwas wie df <- createDataFrame(list) tun und dann verwenden dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Ausführen von SQL-Abfragen über SparkR

Ein SparkR-DataFrame kann auch als temporäre Ansicht registriert werden, mit der Sie SQL-Abfragen über seine Daten ausführen können. Die sql-Funktion ermöglicht es Anwendungen, SQL-Abfragen programmgesteuert auszuführen, und gibt das Ergebnis als SparkR-DataFrame zurück.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Machine Learning

SparkR macht die meisten MLLib-Algorithmen verfügbar. Im Hintergrund verwendet SparkR die MLlib, um das Modell zu trainieren.

Im folgenden Beispiel wird gezeigt, wie ein gaußsches GLM-Modell mithilfe von SparkR erstellt wird. Legen Sie zum Ausführen einer linearen Regression „family“ auf "gaussian" fest. Legen Sie zum Ausführen einer logistischen Regression „family“ auf "binomial" fest. Bei Verwendung von SparkML GLM führt SparkR automatisch die One-Hot-Codierung kategorischer Features durch, sodass dies nicht manuell erfolgen muss. Neben String- und Double-Typfeatures ist es auch möglich, MLlib Vector-Features zur Kompatibilität mit anderen MLlib-Komponenten anzupassen.

Weitere Informationen dazu, welche Algorithmen für maschinelles Lernen unterstützt werden, finden Sie in der Dokumentation für SparkR und MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)

Nächste Schritte