Brug SparkR

SparkR er en R-pakke, der giver en let frontend til brug af Apache Spark fra R. SparkR leverer en distribueret implementering af dataramme, der understøtter handlinger som valg, filtrering, sammenlægning osv. SparkR understøtter også distribueret maskinel indlæring ved hjælp af MLlib.

Brug SparkR via Spark-batchjobdefinitioner eller med interaktive Microsoft Fabric-notesbøger.

R-understøttelse er kun tilgængelig i Spark3.1 eller nyere. R i Spark 2.4 understøttes ikke.

Forudsætninger

  • Få et Microsoft Fabric-abonnement. Du kan også tilmelde dig en gratis Prøveversion af Microsoft Fabric.

  • Log på Microsoft Fabric.

  • Brug oplevelsesskifteren i venstre side af startsiden til at skifte til Synapse Data Science-oplevelsen.

    Screenshot of the experience switcher menu, showing where to select Data Science.

  • Åbn eller opret en notesbog. Du kan få mere at vide under Sådan bruger du Microsoft Fabric-notesbøger.

  • Angiv sprogindstillingen til SparkR (R) for at ændre det primære sprog.

  • Vedhæft din notesbog til et lakehouse. I venstre side skal du vælge Tilføj for at tilføje et eksisterende lakehouse eller for at oprette et lakehouse.

Læs og skriv SparkR DataFrames

Læs en SparkR-dataramme fra en lokal R-data.frame

Den nemmeste måde at oprette en DataFrame på er ved at konvertere en lokal R-data.frame til en Spark DataFrame.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Læs og skriv SparkR DataFrame fra Lakehouse

Data kan gemmes i det lokale filsystem for klyngenoder. De generelle metoder til at læse og skrive en SparkR-dataramme fra Lakehouse er read.df og write.df. Disse metoder tager stien til den fil, der skal indlæses, og typen af datakilde. SparkR understøtter læsning af CSV-, JSON-, tekst- og parquetfiler oprindeligt.

Hvis du vil læse og skrive til et Lakehouse, skal du først føje det til din session. I venstre side af notesbogen skal du vælge Tilføj for at tilføje et eksisterende Lakehouse eller oprette et Lakehouse.

Bemærk

Hvis du vil have adgang til Lakehouse-filer ved hjælp af Spark-pakker, f.eks. eller write.df, skal du bruge dens ADFS-sti eller relative sti til Spark.read.df I Lakehouse Explorer skal du højreklikke på de filer eller mapper, du vil have adgang til, og kopiere dens ADFS-sti eller relative sti til Spark fra genvejsmenuen.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric er tidyverse forudinstalleret. Du kan få adgang til Lakehouse-filer i dine velkendte R-pakker, f.eks. læse og skrive Lakehouse-filer ved hjælp af readr::read_csv() og readr::write_csv().

Bemærk

Hvis du vil have adgang til Lakehouse-filer ved hjælp af R-pakker, skal du bruge stien til fil-API'en. Højreklik på den fil eller mappe, du vil have adgang til, i Stifinder i Lakehouse, og kopiér dens fil-API-sti fra genvejsmenuen.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

Du kan også læse en SparkR-dataramme i dit Lakehouse ved hjælp af SparkSQL-forespørgsler.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Læs og skriv SQL-tabeller via RODBC

Brug RODBC til at oprette forbindelse til SQL-baserede databaser via en ODBC-grænseflade. Du kan f.eks. oprette forbindelse til en Synapse-dedikeret SQL-gruppe som vist i følgende eksempelkode. Udskift dine egne forbindelsesoplysninger med <database>, <uid>, <password>og <table>.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

DataFrame-handlinger

SparkR DataFrames understøtter mange funktioner til struktureret databehandling. Her er nogle grundlæggende eksempler. Du kan finde en komplet liste i SparkR API-dokumentationerne.

Vælg rækker og kolonner

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Gruppering og sammenlægning

SparkR-datarammer understøtter mange almindeligt anvendte funktioner til at aggregere data efter gruppering. Vi kan f.eks. beregne et histogram over ventetiden i det trofaste datasæt som vist nedenfor

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Kolonnehandlinger

SparkR indeholder mange funktioner, der kan anvendes direkte på kolonner til databehandling og aggregering. I følgende eksempel vises brugen af grundlæggende aritmetiske funktioner.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Anvend brugerdefineret funktion

SparkR understøtter flere typer brugerdefinerede funktioner:

Kør en funktion på et stort datasæt med dapply eller dapplyCollect

dapply

Anvend en funktion på hver partition i en SparkDataFrame. Den funktion, der skal anvendes på hver partition i SparkDataFrame , og den skal kun have én parameter, som en data.frame svarer til hver partition til, overføres. Outputtet af funktionen skal være en data.frame. Skemaet angiver rækkeformatet for den resulterende .SparkDataFrame Den skal svare til datatyperne for den returnerede værdi.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

På samme måde som dapply skal du anvende en funktion på hver partition af en SparkDataFrame og indsamle resultatet tilbage. Outputtet af funktionen skal være en data.frame. Men denne gang kræves det ikke, at skemaet overføres. Bemærk, at det dapplyCollect kan mislykkes, hvis outputtet af funktionen, der kører på hele partitionen, ikke kan trækkes til driveren og passe til driverhukommelsen.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Kør en funktion på en gruppering af store datasæt efter inputkolonne(er) med gapply eller gapplyCollect

gapply

Anvend en funktion på hver gruppe af en SparkDataFrame. Funktionen skal anvendes på hver gruppe af SparkDataFrame og skal kun have to parametre: grupperingsnøgle og R data.frame , der svarer til den pågældende nøgle. Grupperne vælges fra SparkDataFrames en eller flere kolonner. Outputtet af funktionen skal være en data.frame. Skemaet angiver rækkeformatet for den resulterende SparkDataFrame. Den skal repræsentere R-funktionens outputskema fra Spark-datatyper. Kolonnenavnene for den returnerede data.frame angives af brugeren.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Ligesom gapplyanvender en funktion på hver gruppe af en SparkDataFrame og indsamler resultatet tilbage til R data.frame. Outputtet af funktionen skal være en data.frame. Men skemaet kræves ikke for at blive overført. Bemærk, at det gapplyCollect kan mislykkes, hvis outputtet af funktionen, der kører på hele partitionen, ikke kan trækkes til driveren og passe til driverhukommelsen.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Kør lokale R-funktioner, der distribueres med spark.lapply

spark.lapply

lapply På samme måde som i oprindelig R spark.lapply kører en funktion over en liste over elementer og distribuerer beregningerne med Spark. Anvender en funktion på en måde, der ligner doParallel eller lapply på elementer på en liste. Resultaterne af alle beregningerne skal være i en enkelt maskine. Hvis det ikke er tilfældet, kan de gøre noget i stil df <- createDataFrame(list) med og derefter bruge dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Kør SQL-forespørgsler fra SparkR

En SparkR-dataramme kan også registreres som en midlertidig visning, der giver dig mulighed for at køre SQL-forespørgsler over dataene. Sql-funktionen gør det muligt for programmer at køre SQL-forespørgsler programmeringsmæssigt og returnerer resultatet som en SparkR-dataramme.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Maskinel indlæring

SparkR fremviser de fleste MLLib-algoritmer. Under hjelmen bruger SparkR MLlib til at oplære modellen.

I følgende eksempel kan du se, hvordan du opretter en Gaussisk GLM-model ved hjælp af SparkR. Hvis du vil køre lineær regression, skal du angive familie til "gaussian". Hvis du vil køre logistisk regression, skal du angive familie til "binomial". Når du bruger SparkML GLM SparkR, udføres der automatisk en-hot kodning af kategoriske funktioner, så det ikke behøver at blive udført manuelt. Ud over funktionerne String og Double type er det også muligt at overskrive funktioner i MLlib Vector for kompatibilitet med andre MLlib-komponenter.

Du kan få mere at vide om, hvilke algoritmer til maskinel indlæring der understøttes, i dokumentationen til SparkR og MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)