Brug SparkR
SparkR er en R-pakke, der giver en let frontend til brug af Apache Spark fra R. SparkR leverer en distribueret implementering af dataramme, der understøtter handlinger som valg, filtrering, sammenlægning osv. SparkR understøtter også distribueret maskinel indlæring ved hjælp af MLlib.
Brug SparkR via Spark-batchjobdefinitioner eller med interaktive Microsoft Fabric-notesbøger.
R-understøttelse er kun tilgængelig i Spark3.1 eller nyere. R i Spark 2.4 understøttes ikke.
Få et Microsoft Fabric-abonnement. Du kan også tilmelde dig en gratis Prøveversion af Microsoft Fabric.
Brug oplevelsesskifteren i venstre side af startsiden til at skifte til Synapse Data Science-oplevelsen.
Åbn eller opret en notesbog. Du kan få mere at vide under Sådan bruger du Microsoft Fabric-notesbøger.
Angiv sprogindstillingen til SparkR (R) for at ændre det primære sprog.
Vedhæft din notesbog til et lakehouse. I venstre side skal du vælge Tilføj for at tilføje et eksisterende lakehouse eller for at oprette et lakehouse.
Den nemmeste måde at oprette en DataFrame på er ved at konvertere en lokal R-data.frame til en Spark DataFrame.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
Data kan gemmes i det lokale filsystem for klyngenoder. De generelle metoder til at læse og skrive en SparkR-dataramme fra Lakehouse er read.df
og write.df
. Disse metoder tager stien til den fil, der skal indlæses, og typen af datakilde. SparkR understøtter læsning af CSV-, JSON-, tekst- og parquetfiler oprindeligt.
Hvis du vil læse og skrive til et Lakehouse, skal du først føje det til din session. I venstre side af notesbogen skal du vælge Tilføj for at tilføje et eksisterende Lakehouse eller oprette et Lakehouse.
Bemærk
Hvis du vil have adgang til Lakehouse-filer ved hjælp af Spark-pakker, f.eks. eller write.df
, skal du bruge dens ADFS-sti eller relative sti til Spark.read.df
I Lakehouse Explorer skal du højreklikke på de filer eller mapper, du vil have adgang til, og kopiere dens ADFS-sti eller relative sti til Spark fra genvejsmenuen.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
Microsoft Fabric er tidyverse
forudinstalleret. Du kan få adgang til Lakehouse-filer i dine velkendte R-pakker, f.eks. læse og skrive Lakehouse-filer ved hjælp af readr::read_csv()
og readr::write_csv()
.
Bemærk
Hvis du vil have adgang til Lakehouse-filer ved hjælp af R-pakker, skal du bruge stien til fil-API'en. Højreklik på den fil eller mappe, du vil have adgang til, i Stifinder i Lakehouse, og kopiér dens fil-API-sti fra genvejsmenuen.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
Du kan også læse en SparkR-dataramme i dit Lakehouse ved hjælp af SparkSQL-forespørgsler.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
SparkR DataFrames understøtter mange funktioner til struktureret databehandling. Her er nogle grundlæggende eksempler. Du kan finde en komplet liste i SparkR API-dokumentationerne.
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
SparkR-datarammer understøtter mange almindeligt anvendte funktioner til at aggregere data efter gruppering. Vi kan f.eks. beregne et histogram over ventetiden i det trofaste datasæt som vist nedenfor
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
SparkR indeholder mange funktioner, der kan anvendes direkte på kolonner til databehandling og aggregering. I følgende eksempel vises brugen af grundlæggende aritmetiske funktioner.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
SparkR understøtter flere typer brugerdefinerede funktioner:
Anvend en funktion på hver partition i en SparkDataFrame
. Den funktion, der skal anvendes på hver partition i SparkDataFrame
, og den skal kun have én parameter, som en data.frame svarer til hver partition til, overføres. Outputtet af funktionen skal være en data.frame
. Skemaet angiver rækkeformatet for den resulterende .SparkDataFrame
Den skal svare til datatyperne for den returnerede værdi.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
På samme måde som dapply skal du anvende en funktion på hver partition af en SparkDataFrame
og indsamle resultatet tilbage. Outputtet af funktionen skal være en data.frame
. Men denne gang kræves det ikke, at skemaet overføres. Bemærk, at det dapplyCollect
kan mislykkes, hvis outputtet af funktionen, der kører på hele partitionen, ikke kan trækkes til driveren og passe til driverhukommelsen.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
Kør en funktion på en gruppering af store datasæt efter inputkolonne(er) med gapply
eller gapplyCollect
Anvend en funktion på hver gruppe af en SparkDataFrame
. Funktionen skal anvendes på hver gruppe af SparkDataFrame
og skal kun have to parametre: grupperingsnøgle og R data.frame
, der svarer til den pågældende nøgle. Grupperne vælges fra SparkDataFrames
en eller flere kolonner. Outputtet af funktionen skal være en data.frame
. Skemaet angiver rækkeformatet for den resulterende SparkDataFrame
. Den skal repræsentere R-funktionens outputskema fra Spark-datatyper. Kolonnenavnene for den returnerede data.frame
angives af brugeren.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
Ligesom gapply
anvender en funktion på hver gruppe af en SparkDataFrame
og indsamler resultatet tilbage til R data.frame
. Outputtet af funktionen skal være en data.frame
. Men skemaet kræves ikke for at blive overført. Bemærk, at det gapplyCollect
kan mislykkes, hvis outputtet af funktionen, der kører på hele partitionen, ikke kan trækkes til driveren og passe til driverhukommelsen.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
lapply
På samme måde som i oprindelig R spark.lapply
kører en funktion over en liste over elementer og distribuerer beregningerne med Spark. Anvender en funktion på en måde, der ligner doParallel
eller lapply
på elementer på en liste. Resultaterne af alle beregningerne skal være i en enkelt maskine. Hvis det ikke er tilfældet, kan de gøre noget i stil df <- createDataFrame(list)
med og derefter bruge dapply
.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
En SparkR-dataramme kan også registreres som en midlertidig visning, der giver dig mulighed for at køre SQL-forespørgsler over dataene. Sql-funktionen gør det muligt for programmer at køre SQL-forespørgsler programmeringsmæssigt og returnerer resultatet som en SparkR-dataramme.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
SparkR fremviser de fleste MLLib-algoritmer. Under hjelmen bruger SparkR MLlib til at oplære modellen.
I følgende eksempel kan du se, hvordan du opretter en Gaussisk GLM-model ved hjælp af SparkR. Hvis du vil køre lineær regression, skal du angive familie til "gaussian"
. Hvis du vil køre logistisk regression, skal du angive familie til "binomial"
. Når du bruger SparkML GLM
SparkR, udføres der automatisk en-hot kodning af kategoriske funktioner, så det ikke behøver at blive udført manuelt. Ud over funktionerne String og Double type er det også muligt at overskrive funktioner i MLlib Vector for kompatibilitet med andre MLlib-komponenter.
Du kan få mere at vide om, hvilke algoritmer til maskinel indlæring der understøttes, i dokumentationen til SparkR og MLlib.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)