Verwenden von SparkR

SparkR ist ein R-Paket, das ein schlankes Front-End für die Nutzung von Apache Spark in R bereitstellt. SparkR ermöglicht eine verteilte Datenrahmenimplementierung, die Vorgänge wie Auswählen, Filtern, Aggregieren usw. unterstützt. SparkR unterstützt außerdem verteiltes maschinelles Lernen mithilfe von MLlib.

Verwenden Sie SparkR über Spark-Batchauftragsdefinitionen oder mit interaktiven Microsoft Fabric-Notebooks.

R-Unterstützung ist nur in Spark3.1 oder höher verfügbar. R in Spark 2.4 wird nicht unterstützt.

Voraussetzungen

  • Öffnen oder erstellen Sie ein Notebook. Informationen dazu finden Sie unter Verwenden von Microsoft Fabric-Notebooks.

  • Legen Sie zum Ändern der primären Sprache die Sprachoption auf SparkR (R) fest.

  • Fügen Sie Ihr Notebook an ein Lakehouse an. Wählen Sie auf der linken Seite Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein Lakehouse zu erstellen.

Lesen und Schreiben von SparkR-Dataframes

Erstellen eines SparkR-Dataframe aus einem lokalen R-Dataframe (data.frame)

Die einfachste Möglichkeit zum Erstellen eines Dataframe besteht in der Konvertierung eines lokalen R-Dataframe (data.frame) in SparkDataFrame.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Lesen und Schreiben von SparkR Dataframes aus Lakehouse

Daten können im lokalen Dateisystem von Clusterknoten gespeichert werden. Die allgemeinen Methoden zum Lesen und Schreiben eines SparkR-Dataframe aus Lakehouse sind read.df und write.df. Diese Methoden verwenden den Pfad der zu ladenden Datei und den Typ der Datenquelle. SparkR unterstützt nativ das Lesen von CSV-, JSON-, Text- und Parquet-Dateien.

Um in einem Lakehouse zu lesen und zu schreiben, fügen Sie das Lakehouse zuerst zu Ihrer Sitzung hinzu. Wählen Sie auf der linken Seite des Notebooks die Option Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein neues zu erstellen.

Hinweis

Um mithilfe von Spark-Paketen wie read.df oder write.df auf Lakehouse-Dateien zuzugreifen, verwenden Sie den ADFS-Pfad oder den relativen Pfad für Spark. Klicken Sie im Lakehouse-Explorer mit der rechten Maustaste auf die Dateien oder den Ordner, auf die bzw. den Sie zugreifen möchten, und kopieren Sie den ADFS-Pfad oder den relativen Pfad für Spark aus dem Kontextmenü.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

In Microsoft Fabric ist tidyverse vorinstalliert. Sie können in Ihren vertrauten R-Paketen auf Lakehouse-Dateien zugreifen und können z. B. Lakehouse-Dateien mit readr::read_csv() und readr::write_csv()lesen und schreiben.

Hinweis

Um mithilfe von R-Paketen auf Lakehouse-Dateien zuzugreifen, müssen Sie den Datei-API-Pfad verwenden. Klicken Sie im Lakehouse-Explorer mit der rechten Maustaste auf die Datei oder den Ordner, auf die bzw. den Sie zugreifen möchten, und kopieren Sie den Datei-API-Pfad aus dem Kontextmenü.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

Sie können auch einen SparkR-Dataframe in Ihrem Lakehouse mithilfe von SparkSQL-Abfragen lesen.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Lesen und Schreiben von SQL-Tabellen über RODBC

Verwenden Sie RODBC, um eine Verbindung mit SQL-basierten Datenbanken über eine ODBC-Schnittstelle herzustellen. Sie können beispielsweise wie im folgenden Beispielcode eine Verbindung mit einem dedizierten Synapse-SQL-Pool herstellen. Ersetzen Sie Ihre eigenen Verbindungsdetails für <database>, <uid>, <password> und <table>.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

DataFrame-Vorgänge

Spark-DataFrames unterstützen viele Funktionen für die strukturierte Datenverarbeitung. Es folgen einige einfache Beispiele. Eine vollständige Liste finden Sie in der API-Dokumentation für SparkR.

Auswählen von Zeilen und Spalten

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Gruppierung und Aggregation

SparkR-Dataframes unterstützen viele gängige Funktionen zum Aggregieren von Daten nach der Gruppierung. Beispielsweise können wir wie unten gezeigt ein Histogramm der Wartezeit im originalgetreuen Dataset berechnen.

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Spaltenvorgänge (Column operations)

SparkR bietet viele Funktionen zur Datenverarbeitung und -aggregation, die direkt auf Spalten angewendet werden können. Das folgende Beispiel zeigt die Verwendung grundlegender arithmetischer Funktionen.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Anwenden einer benutzerdefinierten Funktion

SparkR unterstützt verschiedene Arten von benutzerdefinierten Funktionen:

Ausführen einer Funktion für ein großes Dataset mit dapply oder dapplyCollect

dapply

Wenden Sie eine Funktion auf jede Partition eines SparkDataFrame an. Die Funktion, die auf jede Partition von SparkDataFrame angewendet werden soll, sollte nur über einen Parameter verfügen. An diesen wird ein Dataframe übergeben, der jeder Partition entspricht. Die Ausgabe der Funktion sollte ein data.framesein. Das Schema gibt das Zeilenformat des resultierenden SparkDataFrame an. Es muss mit den Datentypen des zurückgegebenen Werts übereinstimmen.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Wenden Sie wie bei dapply eine Funktion auf jede Partition eines SparkDataFrame an, und rufen Sie das Ergebnis ab. Die Ausgabe der Funktion sollte ein data.framesein. Dieses Mal muss das Schema jedoch nicht übergeben werden. Beachten Sie, dass dapplyCollect fehlschlagen kann, wenn die Ausgaben der Funktion, die auf der gesamten Partition ausgeführt werden, nicht in den Treiber gepullt werden können und nicht in den Treiberspeicher passen.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Ausführen einer Funktion für ein großes Dataset mit Gruppierung nach Eingabespalte(n) mit gapply oder gapplyCollect

gapply

Wenden Sie eine Funktion auf jede SparkDataFrame-Gruppe an. Die Funktion soll auf jede Gruppe von SparkDataFrame angewendet werden und sollte nur zwei Parameter haben: den Gruppierungsschlüssel und den data.frame von R, der diesem Schlüssel entspricht. Die Gruppen werden aus den SparkDataFrames-Spalten ausgewählt. Die Ausgabe der Funktion sollte ein data.framesein. Das Schema gibt das Zeilenformat des resultierenden SparkDataFrame an. Es muss das Ausgabeschema der R-Funktion aus den Spark-Datentypen entsprechen. Die Spaltennamen des zurückgegebenen data.frame werden vom Benutzer bzw. der Benutzerin festgelegt.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Wie bei gapply wird eine Funktion auf jede SparkDataFrame-Gruppe angewendet, und die Ergebnisse werden zurück an R-data.frame übergeben. Die Ausgabe der Funktion sollte ein data.framesein. Das Schema muss jedoch nicht übergeben werden. Beachten Sie, dass gapplyCollect fehlschlagen kann, wenn die Ausgaben der Funktion, die auf der gesamten Partition ausgeführt werden, nicht in den Treiber gepullt werden können und nicht in den Treiberspeicher passen.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Ausführen lokaler R-Funktionen, verteilt mit „spark.lapply“

spark.lapply

Ähnlich wie bei lapply wird in nativen R-Elementen von spark.lapply eine Funktion für eine Liste von Elementen ausgeführt, und die Berechnungen werden mit Spark verteilt. Es wird eine Funktion ähnlich wie bei doParallel und lapply auf Elemente einer Liste angewendet. Der Speicherplatz eines einzelnen Computers sollte für alle Berechnungen ausreichend sein. Wenn dies nicht der Fall ist, können sie beispielsweise df <- createDataFrame(list) anwenden und anschließend dapply verwenden.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Ausführen von SQL-Abfragen über SparkR

Ein SparkR-Dataframe kann auch als temporäre Ansicht registriert werden, mit der Sie SQL-Abfragen über die Daten ausführen können. Die SQL-Funktion ermöglicht es Anwendungen, SQL-Abfragen programmgesteuert auszuführen, und gibt das Ergebnis als SparkR-Dataframe zurück.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Machine Learning

SparkR macht die meisten MLLib-Algorithmen verfügbar. Im Hintergrund verwendet SparkR die MLlib, um das Modell zu trainieren.

Das folgende Beispiel zeigt, wie Sie mit SparkR ein gaußsches generalisiertes lineares Modell (GLM) erstellen. Legen Sie zum Ausführen einer linearen Regression „family“ auf "gaussian" fest. Legen Sie zum Ausführen einer logistischen Regression „family“ auf "binomial" fest. Bei Verwendung von SparkML GLM führt SparkR automatisch eine One-Hot-Codierung kategorischer Features durch, sodass diese nicht manuell erfolgen muss. Neben den Typfeatures „String“ und „Double“ ist es auch möglich, MLlib Vector-Features einzupassen, um die Kompatibilität mit anderen MLlib-Komponenten zu gewährleisten.

Weitere Informationen dazu, welche Algorithmen für maschinelles Lernen unterstützt werden, finden Sie in der Dokumentation für SparkR und MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)