Bruk SparkR
SparkR er en R-pakke som gir en lett front for å bruke Apache Spark fra R. SparkR gir en distribuert implementering av datarammer som støtter operasjoner som valg, filtrering, aggregasjon osv. SparkR støtter også distribuert maskinlæring ved hjelp av MLlib.
Bruk SparkR gjennom spark-satsvise jobbdefinisjoner eller med interaktive Microsoft Fabric-notatblokker.
R-støtte er bare tilgjengelig i Spark3.1 eller nyere. R i Spark 2.4 støttes ikke.
Få et Microsoft Fabric-abonnement. Eller registrer deg for en gratis prøveversjon av Microsoft Fabric.
Logg på Microsoft Fabric.
Bruk opplevelsesbryteren til venstre på hjemmesiden for å bytte til Synapse Data Science-opplevelsen.
Åpne eller opprett en notatblokk. Hvis du vil finne ut hvordan du bruker Microsoft Fabric-notatblokker.
Angi språkalternativet til SparkR (R) for å endre primærspråket.
Legg notatblokken til et lakehouse. På venstre side velger du Legg til for å legge til et eksisterende innsjøhus eller opprette et innsjøhus.
Den enkleste måten å opprette en DataFrame på, er å konvertere en lokal R-data.frame til en Spark DataFrame.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
Data kan lagres på det lokale filsystemet for klyngenoder. De generelle metodene for å lese og skrive en SparkR DataFrame fra Lakehouse er read.df
og write.df
. Disse metodene tar banen for filen som skal lastes inn, og typen datakilde. SparkR støtter lesing av CSV-, JSON-, tekst- og parkettfiler opprinnelig.
Hvis du vil lese og skrive til et Lakehouse, må du først legge det til i økten. På venstre side av notatblokken velger du Legg til for å legge til et eksisterende Lakehouse eller opprette et Lakehouse.
Obs!
Hvis du vil ha tilgang til Lakehouse-filer ved hjelp av Spark-pakker, for eksempel read.df
eller , bruker du ADFS-banen eller den relative banen for Sparkwrite.df
. Høyreklikk på filene eller mappen du vil ha tilgang til, i Lakehouse Explorer, og kopier ADFS-banen eller den relative banen for Spark fra hurtigmenyen.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
Microsoft Fabric er tidyverse
forhåndsinstallert. Du kan få tilgang til Lakehouse-filer i dine kjente R-pakker, for eksempel lese- og skrive Lakehouse-filer ved hjelp av readr::read_csv()
og readr::write_csv()
.
Obs!
Hvis du vil ha tilgang til Lakehouse-filer ved hjelp av R-pakker, må du bruke Fil-API-banen. Høyreklikk på filen eller mappen du vil ha tilgang til, i Lakehouse-utforskeren, og kopier fil-API-banen fra hurtigmenyen.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
Du kan også lese en SparkR-dataramme på Lakehouse ved hjelp av SparkSQL-spørringer.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
SparkR DataFrames støtter mange funksjoner for strukturert databehandling. Her er noen grunnleggende eksempler. Du finner en fullstendig liste i SparkR API-dokumentene.
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
SparkR-datarammer støtter mange vanlige funksjoner for å aggregere data etter gruppering. Vi kan for eksempel beregne et histogram av ventetiden i det trofaste datasettet som vist nedenfor
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
SparkR inneholder mange funksjoner som kan brukes direkte på kolonner for databehandling og aggregasjon. Følgende eksempel viser bruken av grunnleggende aritmetiske funksjoner.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
SparkR støtter flere typer brukerdefinerte funksjoner:
Bruk en funksjon på hver partisjon av en SparkDataFrame
. Funksjonen som skal brukes på hver partisjon av og bør bare ha én SparkDataFrame
parameter som en data.frame tilsvarer hver partisjon vil bli sendt til. Utdataene for funksjonen må være en data.frame
. Skjema angir radformatet for resultatet av et SparkDataFrame
. Den må samsvare med datatypene for den returnerte verdien.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
I likhet med dapply bruker du en funksjon på hver partisjon av en SparkDataFrame
og samler resultatet tilbake. Utdataene for funksjonen må være en data.frame
. Men denne gangen er ikke skjemaet nødvendig for å bli sendt. Vær oppmerksom på at dapplyCollect
det kan mislykkes hvis utdataene for funksjonen kjører på hele partisjonen, ikke kan trekkes til driveren og får plass i driverminnet.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
Bruk en funksjon på hver gruppe av en SparkDataFrame
. Funksjonen skal brukes på hver gruppe i SparkDataFrame
og skal bare ha to parametere: grupperingsnøkkel og R som tilsvarer data.frame
denne nøkkelen. Gruppene velges fra SparkDataFrames
kolonne(er). Utdataene for funksjonen må være en data.frame
. Skjema angir radformatet for resultatet SparkDataFrame
. Det må representere utdataskjemaet for R-funksjonen fra Spark-datatyper. Kolonnenavnene for de returnerte data.frame
er angitt av brukeren.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
Like gapply
, bruker en funksjon for hver gruppe av en SparkDataFrame
og samle resultatet tilbake til R data.frame
. Utdataene for funksjonen må være en data.frame
. Men skjemaet kreves ikke for å bli sendt. Vær oppmerksom på at gapplyCollect
det kan mislykkes hvis utdataene for funksjonen kjører på hele partisjonen, ikke kan trekkes til driveren og får plass i driverminnet.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
lapply
I likhet med i opprinnelig R kjører spark.lapply
du en funksjon over en liste over elementer og distribuerer beregningene med Spark. Bruker en funksjon på en måte som ligner doParallel
på eller lapply
på elementer i en liste. Resultatene av alle beregningene skal passe i én enkelt maskin. Hvis det ikke er tilfelle, kan de gjøre noe lignende df <- createDataFrame(list)
og deretter bruke dapply
.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
En SparkR DataFrame kan også registreres som en midlertidig visning som lar deg kjøre SQL-spørringer over dataene. Sql-funksjonen gjør det mulig for programmer å kjøre SQL-spørringer programmatisk og returnerer resultatet som en SparkR DataFrame.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
SparkR viser de fleste MLLib-algoritmer. Under panseret bruker SparkR MLlib til å trene modellen.
Følgende eksempel viser hvordan du bygger en gaussisk GLM-modell ved hjelp av SparkR. Hvis du vil kjøre lineær regresjon, setter du familien til "gaussian"
. Hvis du vil kjøre logistisk regresjon, setter du familien til "binomial"
. Når du bruker SparkML GLM
SparkR, utføres automatisk en-hot koding av kategoriske funksjoner, slik at det ikke trenger å gjøres manuelt. Utover streng- og dobbeltypefunksjoner er det også mulig å passe over MLlib Vector-funksjoner, for kompatibilitet med andre MLlib-komponenter.
Hvis du vil lære mer om hvilke maskinlæringsalgoritmer som støttes, kan du gå til dokumentasjonen for SparkR og MLlib.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)