Bruk SparkR

SparkR er en R-pakke som gir en lett front for å bruke Apache Spark fra R. SparkR gir en distribuert implementering av datarammer som støtter operasjoner som valg, filtrering, aggregasjon osv. SparkR støtter også distribuert maskinlæring ved hjelp av MLlib.

Bruk SparkR gjennom spark-satsvise jobbdefinisjoner eller med interaktive Microsoft Fabric-notatblokker.

R-støtte er bare tilgjengelig i Spark3.1 eller nyere. R i Spark 2.4 støttes ikke.

Forutsetning

  • Få et Microsoft Fabric-abonnement. Eller registrer deg for en gratis prøveversjon av Microsoft Fabric.

  • Logg på Microsoft Fabric.

  • Bruk opplevelsesbryteren til venstre på hjemmesiden for å bytte til Synapse Data Science-opplevelsen.

    Screenshot of the experience switcher menu, showing where to select Data Science.

  • Åpne eller opprett en notatblokk. Hvis du vil finne ut hvordan du bruker Microsoft Fabric-notatblokker.

  • Angi språkalternativet til SparkR (R) for å endre primærspråket.

  • Legg notatblokken til et lakehouse. På venstre side velger du Legg til for å legge til et eksisterende innsjøhus eller opprette et innsjøhus.

Lese og skrive SparkR-datarammer

Les en SparkR-dataramme fra en lokal R-data.frame

Den enkleste måten å opprette en DataFrame på, er å konvertere en lokal R-data.frame til en Spark DataFrame.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Les og skriv SparkR DataFrame fra Lakehouse

Data kan lagres på det lokale filsystemet for klyngenoder. De generelle metodene for å lese og skrive en SparkR DataFrame fra Lakehouse er read.df og write.df. Disse metodene tar banen for filen som skal lastes inn, og typen datakilde. SparkR støtter lesing av CSV-, JSON-, tekst- og parkettfiler opprinnelig.

Hvis du vil lese og skrive til et Lakehouse, må du først legge det til i økten. På venstre side av notatblokken velger du Legg til for å legge til et eksisterende Lakehouse eller opprette et Lakehouse.

Merk

Hvis du vil ha tilgang til Lakehouse-filer ved hjelp av Spark-pakker, for eksempel read.df eller , bruker du ADFS-banen eller den relative banen for Sparkwrite.df. Høyreklikk på filene eller mappen du vil ha tilgang til, i Lakehouse Explorer, og kopier ADFS-banen eller den relative banen for Spark fra hurtigmenyen.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric er tidyverse forhåndsinstallert. Du kan få tilgang til Lakehouse-filer i dine kjente R-pakker, for eksempel lese- og skrive Lakehouse-filer ved hjelp av readr::read_csv() og readr::write_csv().

Merk

Hvis du vil ha tilgang til Lakehouse-filer ved hjelp av R-pakker, må du bruke Fil-API-banen. Høyreklikk på filen eller mappen du vil ha tilgang til, i Lakehouse-utforskeren, og kopier fil-API-banen fra hurtigmenyen.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

Du kan også lese en SparkR-dataramme på Lakehouse ved hjelp av SparkSQL-spørringer.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Lese og skrive SQL-tabeller gjennom RODBC

Bruk RODBC til å koble til SQL-baserte databaser via et ODBC-grensesnitt. Du kan for eksempel koble til et synapse dedikert SQL-utvalg som vist i følgende eksempelkode. Bytt ut dine egne tilkoblingsdetaljer for <database>, <uid>, <password>og <table>.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

DataFrame-operasjoner

SparkR DataFrames støtter mange funksjoner for strukturert databehandling. Her er noen grunnleggende eksempler. Du finner en fullstendig liste i SparkR API-dokumentene.

Merke rader og kolonner

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Gruppering og aggregasjon

SparkR-datarammer støtter mange vanlige funksjoner for å aggregere data etter gruppering. Vi kan for eksempel beregne et histogram av ventetiden i det trofaste datasettet som vist nedenfor

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Kolonneoperasjoner

SparkR inneholder mange funksjoner som kan brukes direkte på kolonner for databehandling og aggregasjon. Følgende eksempel viser bruken av grunnleggende aritmetiske funksjoner.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Bruk brukerdefinert funksjon

SparkR støtter flere typer brukerdefinerte funksjoner:

Kjøre en funksjon på et stort datasett med dapply eller dapplyCollect

dapply

Bruk en funksjon på hver partisjon av en SparkDataFrame. Funksjonen som skal brukes på hver partisjon av og bør bare ha én SparkDataFrame parameter som en data.frame tilsvarer hver partisjon vil bli sendt til. Utdataene for funksjonen må være en data.frame. Skjema angir radformatet for resultatet av et SparkDataFrame. Den må samsvare med datatypene for den returnerte verdien.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

I likhet med dapply bruker du en funksjon på hver partisjon av en SparkDataFrame og samler resultatet tilbake. Utdataene for funksjonen må være en data.frame. Men denne gangen er ikke skjemaet nødvendig for å bli sendt. Vær oppmerksom på at dapplyCollect det kan mislykkes hvis utdataene for funksjonen kjører på hele partisjonen, ikke kan trekkes til driveren og får plass i driverminnet.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Kjør en funksjon på en stor datasettgruppering etter inndatakolonner med gapply eller gapplyCollect

gapply

Bruk en funksjon på hver gruppe av en SparkDataFrame. Funksjonen skal brukes på hver gruppe i SparkDataFrame og skal bare ha to parametere: grupperingsnøkkel og R som tilsvarer data.frame denne nøkkelen. Gruppene velges fra SparkDataFrames kolonne(er). Utdataene for funksjonen må være en data.frame. Skjema angir radformatet for resultatet SparkDataFrame. Det må representere utdataskjemaet for R-funksjonen fra Spark-datatyper. Kolonnenavnene for de returnerte data.frame er angitt av brukeren.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Like gapply, bruker en funksjon for hver gruppe av en SparkDataFrame og samle resultatet tilbake til R data.frame. Utdataene for funksjonen må være en data.frame. Men skjemaet kreves ikke for å bli sendt. Vær oppmerksom på at gapplyCollect det kan mislykkes hvis utdataene for funksjonen kjører på hele partisjonen, ikke kan trekkes til driveren og får plass i driverminnet.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Kjør lokale R-funksjoner distribuert med spark.lapply

spark.lapply

lapply I likhet med i opprinnelig R kjører spark.lapply du en funksjon over en liste over elementer og distribuerer beregningene med Spark. Bruker en funksjon på en måte som ligner doParallel på eller lapply på elementer i en liste. Resultatene av alle beregningene skal passe i én enkelt maskin. Hvis det ikke er tilfelle, kan de gjøre noe lignende df <- createDataFrame(list) og deretter bruke dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Kjør SQL-spørringer fra SparkR

En SparkR DataFrame kan også registreres som en midlertidig visning som lar deg kjøre SQL-spørringer over dataene. Sql-funksjonen gjør det mulig for programmer å kjøre SQL-spørringer programmatisk og returnerer resultatet som en SparkR DataFrame.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Maskinlæring

SparkR viser de fleste MLLib-algoritmer. Under panseret bruker SparkR MLlib til å trene modellen.

Følgende eksempel viser hvordan du bygger en gaussisk GLM-modell ved hjelp av SparkR. Hvis du vil kjøre lineær regresjon, setter du familien til "gaussian". Hvis du vil kjøre logistisk regresjon, setter du familien til "binomial". Når du bruker SparkML GLM SparkR, utføres automatisk en-hot koding av kategoriske funksjoner, slik at det ikke trenger å gjøres manuelt. Utover streng- og dobbeltypefunksjoner er det også mulig å passe over MLlib Vector-funksjoner, for kompatibilitet med andre MLlib-komponenter.

Hvis du vil lære mer om hvilke maskinlæringsalgoritmer som støttes, kan du gå til dokumentasjonen for SparkR og MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)