Brug SparkR
Vigtigt
Microsoft Fabric findes i øjeblikket i PRØVEVERSION. Disse oplysninger er relateret til et foreløbig produkt, der kan blive ændret væsentligt, før det udgives. Microsoft giver ingen garantier, udtrykt eller stiltiende, med hensyn til de oplysninger, der er angivet her.
SparkR er en R-pakke, der leverer en let frontend til brug af Apache Spark fra R. SparkR leverer en distribueret implementering af dataramme, der understøtter handlinger som markering, filtrering, sammenlægning osv. SparkR understøtter også distribueret maskinel indlæring ved hjælp af MLlib.
Vigtigt
Microsoft Fabric findes i øjeblikket i PRØVEVERSION. Disse oplysninger er relateret til et foreløbig produkt, der kan blive ændret væsentligt, før det udgives. Microsoft giver ingen garantier, udtrykt eller stiltiende, med hensyn til de oplysninger, der er angivet her.
Brug SparkR via Spark-batchjobdefinitioner eller med interaktive Microsoft Fabric-notesbøger.
R-understøttelse er kun tilgængelig i Spark3.1 eller nyere. R i Spark 2.4 understøttes ikke.
Forudsætninger
Et Power BI Premium abonnement. Hvis du ikke har en, kan du se Sådan køber du Power BI Premium.
Et Power BI-arbejdsområde med tildelt Premium-kapacitet. Hvis du ikke har et arbejdsområde, kan du bruge trinnene i Opret et arbejdsområde til at oprette et og tildele det til en Premium-kapacitet.
Log på Microsoft Fabric.
Åbn eller opret en notesbog. Du kan få mere at vide under Sådan bruger du Microsoft Fabric-notesbøger.
Skift det primære sprog ved at angive sprogindstillingen til SparkR (R).
Vedhæft din notesbog til et lakehouse. I venstre side skal du vælge Tilføj for at tilføje et eksisterende lakehouse eller oprette et lakehouse.
Læs og skriv SparkR-datarammer
Læs en SparkR-dataramme fra en lokal R-dataramme
Den nemmeste måde at oprette en DataFrame på er at konvertere en lokal R-data.frame til en Spark DataFrame.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
Læs og skriv SparkR DataFrame fra Lakehouse
Data kan gemmes i det lokale filsystem for klyngenoder. De generelle metoder til at læse og skrive en SparkR-dataramme fra Lakehouse er read.df
og write.df
. Disse metoder tager stien til den fil, der skal indlæses, og typen af datakilde. SparkR understøtter læsning af CSV-, JSON-, tekst- og parquetfiler oprindeligt.
Hvis du vil læse og skrive til et Lakehouse, skal du først føje det til din session. I venstre side af notesbogen skal du vælge Tilføj for at tilføje et eksisterende Lakehouse eller oprette et Lakehouse.
Bemærk
Hvis du vil have adgang til Lakehouse-filer ved hjælp af Spark-pakker, f.eks read.df
. eller write.df
, skal du bruge dens ADFS-sti eller relative sti til Spark. I Stifinder i Lakehouse skal du højreklikke på de filer eller den mappe, du vil have adgang til, og kopiere dens ADFS-sti eller relative sti til Spark fra genvejsmenuen.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
Microsoft Fabric er tidyverse
forudinstalleret. Du kan få adgang til Lakehouse-filer i dine velkendte R-pakker, f.eks. læse og skrive Lakehouse-filer ved hjælp af readr::read_csv()
og readr::write_csv()
.
Bemærk
Hvis du vil have adgang til Lakehouse-filer ved hjælp af R-pakker, skal du bruge stien til fil-API'en. I Stifinder i Lakehouse skal du højreklikke på den fil eller mappe, du vil have adgang til, og kopiere dens fil-API-sti fra genvejsmenuen.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
Du kan også læse en SparkR-dataramme på dit Lakehouse ved hjælp af SparkSQL-forespørgsler.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
Læs og skriv SQL-tabeller via RODBC
Brug RODBC til at oprette forbindelse til SQL-baserede databaser via en ODBC-grænseflade. Du kan f.eks. oprette forbindelse til en Synapse-dedikeret SQL-gruppe som vist i følgende eksempelkode. Erstat dine egne forbindelsesoplysninger med <database>
, <uid>
, <password>
og <table>
.
# load RODBC package
library(RODBC)
# config connection string
DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"
ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)
# connect to driver
channel <-odbcDriverConnect(ConnectionString)
# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)
# use SparkR::as.DataFrame to convert R date.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)
DataFrame-handlinger
SparkR DataFrames understøtter mange funktioner til struktureret databehandling. Her er nogle grundlæggende eksempler. Du kan finde en komplet liste i SparkR API-dokumentationerne.
Vælg rækker og kolonner
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
Gruppering og aggregering
SparkR-datarammer understøtter mange almindeligt anvendte funktioner til at aggregere data efter gruppering. Vi kan f.eks. beregne et histogram over ventetiden i det trofaste datasæt som vist nedenfor
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
Kolonnehandlinger
SparkR indeholder mange funktioner, der kan anvendes direkte på kolonner til databehandling og aggregering. I følgende eksempel vises brugen af grundlæggende aritmetiske funktioner.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
Anvend brugerdefineret funktion
SparkR understøtter flere typer brugerdefinerede funktioner:
Kør en funktion på et stort datasæt med dapply
eller dapplyCollect
dapply
Anvend en funktion på hver partition i en SparkDataFrame
. Den funktion, der skal anvendes på hver partition i SparkDataFrame
, og skal kun have én parameter, som en data.frame svarer til hver partition, overføres. Outputtet af funktionen skal være en data.frame
. Skemaet angiver rækkeformatet for den resulterende .SparkDataFrame
Den skal svare til datatyperne for den returnerede værdi.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
På samme måde som dapply skal du anvende en funktion på hver partition i en SparkDataFrame
og indsamle resultatet igen. Outputtet af funktionen skal være en data.frame
. Men denne gang kræves det ikke, at skemaet overføres. Bemærk, at det dapplyCollect
kan mislykkes, hvis outputtet for funktionen kører på hele partitionen, ikke kan trækkes til driveren og passe i driverhukommelsen.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
Kør en funktion på en stor gruppering af datasæt efter inputkolonner med gapply
eller gapplyCollect
gapply
Anvend en funktion på hver gruppe af en SparkDataFrame
. Funktionen skal anvendes på hver gruppe af SparkDataFrame
og skal kun have to parametre: grupperingsnøgle og R data.frame
, der svarer til den pågældende nøgle. Grupperne vælges fra SparkDataFrames
en eller flere kolonner. Outputtet af funktionen skal være en data.frame
. Skemaet angiver rækkeformatet for den resulterende SparkDataFrame
. Den skal repræsentere R-funktionens outputskema fra Spark-datatyper. Kolonnenavnene for den returnerede data.frame
angives af brugeren.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
På samme måde gapply
anvender en funktion på hver gruppe af en SparkDataFrame
og indsamler resultatet tilbage til R data.frame
. Outputtet af funktionen skal være en data.frame
. Men skemaet skal ikke overføres. Bemærk, at det gapplyCollect
kan mislykkes, hvis outputtet for funktionen kører på hele partitionen, ikke kan trækkes til driveren og passe i driverhukommelsen.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
Kør lokale R-funktioner, der distribueres med spark.lapply
spark.lapply
På samme måde som lapply
i oprindelig R spark.lapply
kører en funktion over en liste over elementer og distribuerer beregningerne med Spark. Anvender en funktion på en måde, der ligner doParallel
eller lapply
på elementer på en liste. Resultaterne af alle beregningerne skal være i en enkelt maskine. Hvis det ikke er tilfældet, kan de gøre noget i stil df <- createDataFrame(list)
med og derefter bruge dapply
.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
Kør SQL-forespørgsler fra SparkR
En SparkR-dataramme kan også registreres som en midlertidig visning, der giver dig mulighed for at køre SQL-forespørgsler over dataene. Sql-funktionen gør det muligt for programmer at køre SQL-forespørgsler programmeringsmæssigt og returnerer resultatet som en SparkR-dataramme.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
Maskinel indlæring
SparkR eksponerer de fleste MLLib-algoritmer. Under hjelmen bruger SparkR MLlib til at oplære modellen.
I følgende eksempel kan du se, hvordan du opretter en gaussisk GLM-model ved hjælp af SparkR. Hvis du vil køre lineær regression, skal du angive serie til "gaussian"
. Hvis du vil køre logistisk regression, skal du angive familie til "binomial"
. Når du bruger SparkML GLM
SparkR, udføres der automatisk en hot kodning af kategoriske funktioner, så det ikke behøver at blive udført manuelt. Ud over funktionerne String og Double er det også muligt at passe over funktioner i MLlib Vector for kompatibilitet med andre MLlib-komponenter.
Du kan få mere at vide om, hvilke algoritmer til maskinel indlæring der understøttes, i dokumentationen til SparkR og MLlib.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)