Använda SparkR

SparkR är ett R-paket som tillhandahåller en lättviktsklientdel för användning av Apache Spark från R. SparkR tillhandahåller en distribuerad implementering av dataramar som stöder åtgärder som val, filtrering, aggregering osv. SparkR stöder även distribuerad maskininlärning med MLlib.

Använd SparkR via Spark batch-jobbdefinitioner eller med interaktiva Microsoft Fabric-notebook-filer.

R-stöd är endast tillgängligt i Spark3.1 eller senare. R i Spark 2.4 stöds inte.

Förutsättningar

  • Öppna eller skapa en notebook-fil. Mer information finns i Använda Microsoft Fabric-notebook-filer.

  • Ange språkalternativet SparkR (R) för att ändra det primära språket.

  • Bifoga anteckningsboken till ett sjöhus. Till vänster väljer du Lägg till för att lägga till ett befintligt sjöhus eller för att skapa ett sjöhus.

Läsa och skriva SparkR DataFrames

Läsa en SparkR DataFrame från en lokal R-dataram

Det enklaste sättet att skapa en DataFrame är att konvertera en lokal R-data.frame till en Spark DataFrame.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Läsa och skriva SparkR DataFrame från Lakehouse

Data kan lagras på det lokala filsystemet för klusternoder. De allmänna metoderna för att läsa och skriva en SparkR DataFrame från Lakehouse är read.df och write.df. De här metoderna använder sökvägen för att filen ska läsas in och typen av datakälla. SparkR stöder läsning av CSV-, JSON-, text- och Parquet-filer internt.

Om du vill läsa och skriva till en Lakehouse lägger du först till den i sessionen. Till vänster i anteckningsboken väljer du Lägg till för att lägga till ett befintligt Lakehouse eller skapa ett Lakehouse.

Kommentar

Om du vill komma åt Lakehouse-filer med spark-paket, till exempel read.df eller write.df, använder du dess ADFS-sökväg eller relativa sökväg för Spark. I Lakehouse-utforskaren högerklickar du på de filer eller mappar som du vill komma åt och kopierar dess ADFS-sökväg eller relativa sökväg för Spark från snabbmenyn.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric har tidyverse förinstallerats. Du kan komma åt Lakehouse-filer i dina välbekanta R-paket, till exempel att läsa och skriva Lakehouse-filer med hjälp av readr::read_csv() och readr::write_csv().

Kommentar

Om du vill komma åt Lakehouse-filer med R-paket måste du använda sökvägen till fil-API:et. I Lakehouse-utforskaren högerklickar du på den fil eller mapp som du vill komma åt och kopierar sökvägen till fil-API:et från snabbmenyn.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

Du kan också läsa en SparkR-dataram i Lakehouse med hjälp av SparkSQL-frågor.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Läsa och skriva SQL-tabeller via RODBC

Använd RODBC för att ansluta till SQL-baserade databaser via ett ODBC-gränssnitt. Du kan till exempel ansluta till en synapse-dedikerad SQL-pool enligt följande exempelkod. Ersätt din egen anslutningsinformation med <database>, <uid>, <password>och <table>.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

DataFrame-åtgärder

SparkR DataFrames har stöd för många funktioner för strukturerad databearbetning. Här följer några grundläggande exempel. En fullständig lista finns i SparkR API-dokumenten.

Markera rader och kolumner

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Gruppering och sammansättning

SparkR-dataramar stöder många vanliga funktioner för att aggregera data efter gruppering. Vi kan till exempel beräkna ett histogram över väntetiden i den trofasta datauppsättningen enligt nedan

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Kolumnåtgärder

SparkR innehåller många funktioner som kan tillämpas direkt på kolumner för databearbetning och aggregering. I följande exempel visas användningen av grundläggande aritmetiska funktioner.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Använda användardefinierad funktion

SparkR stöder flera typer av användardefinierade funktioner:

Kör en funktion på en stor datamängd med dapply eller dapplyCollect

dapply

Tillämpa en funktion på varje partition i en SparkDataFrame. Den funktion som ska tillämpas på varje partition av SparkDataFrame och ska bara ha en parameter, som en data.frame motsvarar varje partition skickas till. Funktionens utdata ska vara en data.frame. Schemat anger radformatet för den resulterande SparkDataFrame. Den måste matcha datatyperna för returnerade värden.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Precis som dapply tillämpar du en funktion på varje partition i en SparkDataFrame och samlar in resultatet tillbaka. Funktionens utdata ska vara en data.frame. Men den här gången krävs inte schemat för att skickas. Observera att dapplyCollect det kan misslyckas om utdata från funktionen körs på hela partitionen inte kan hämtas till drivrutinen och får plats i drivrutinsminnet.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Kör en funktion på en stor datamängdsgruppering efter indatakolumner med gapply eller gapplyCollect

gapply

Tillämpa en funktion på varje grupp av en SparkDataFrame. Funktionen ska tillämpas på varje grupp av SparkDataFrame och ska bara ha två parametrar: grupperingsnyckel och R data.frame som motsvarar den nyckeln. Grupperna väljs från SparkDataFrames kolumner. Funktionens utdata ska vara en data.frame. Schemat anger radformatet för det resulterande SparkDataFrame. Den måste representera R-funktionens utdataschema från Spark-datatyper. Kolumnnamnen för de returnerade data.frame anges av användaren.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Som gapply, tillämpar en funktion på varje grupp i en SparkDataFrame och samlar in resultatet tillbaka till R data.frame. Funktionens utdata ska vara en data.frame. Men schemat krävs inte för att skickas. Observera att gapplyCollect det kan misslyckas om utdata från funktionen körs på hela partitionen inte kan hämtas till drivrutinen och får plats i drivrutinsminnet.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Kör lokala R-funktioner distribuerade med spark.lapply

spark.lapply

Precis som lapply i intern R spark.lapply kör en funktion över en lista över element och distribuerar beräkningen med Spark. Tillämpar en funktion på ett sätt som liknar doParallel eller lapply på element i en lista. Resultatet av alla beräkningar bör passa i en enda dator. Om så inte är fallet kan de göra något som liknar df <- createDataFrame(list) och sedan använda dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Köra SQL-frågor från SparkR

En SparkR DataFrame kan också registreras som en tillfällig vy som gör att du kan köra SQL-frågor över dess data. Sql-funktionen gör det möjligt för program att köra SQL-frågor programmatiskt och returnerar resultatet som en SparkR DataFrame.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Maskininlärning

SparkR exponerar de flesta MLLib-algoritmer. Under huven använder SparkR MLlib för att träna modellen.

I följande exempel visas hur du skapar en Gaussian GLM-modell med SparkR. Om du vill köra linjär regression anger du familj till "gaussian". Om du vill köra logistisk regression anger du familj till "binomial". När du använder SparkML GLM SparkR utför automatiskt en frekvent kodning av kategoriska funktioner så att det inte behöver göras manuellt. Utöver funktioner av typen String och Double är det också möjligt att få plats med MLlib Vector-funktioner för kompatibilitet med andra MLlib-komponenter.

Mer information om vilka maskininlärningsalgoritmer som stöds finns i dokumentationen för SparkR och MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)