Použití SparkR

SparkR je balíček R, který poskytuje lehký front-end pro použití Apache Sparku z R. SparkR poskytuje implementaci distribuovaného datového rámce, která podporuje operace, jako je výběr, filtrování, agregace atd. SparkR podporuje také distribuované strojové učení pomocí knihovny MLlib.

Používejte SparkR prostřednictvím definic dávkových úloh Sparku nebo s interaktivními poznámkovými bloky Microsoft Fabric.

Podpora jazyka R je dostupná jenom ve Sparku 3.1 nebo novějším. R ve Sparku 2.4 se nepodporuje.

Požadavky

  • Otevřete nebo vytvořte poznámkový blok. Postup najdete v tématu Použití poznámkových bloků Microsoft Fabric.

  • Nastavte možnost jazyka na SparkR (R) a změňte primární jazyk.

  • Připojte poznámkový blok k jezeru. Na levé straně vyberte Přidat, pokud chcete přidat existující jezerní dům nebo vytvořit jezero.

Čtení a zápis datových rámců SparkR

Čtení datového rámce SparkR z místního datového rámce R

Nejjednodušší způsob, jak vytvořit datový rámec, je převést místní R data.frame na datový rámec Spark.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Čtení a zápis datového rámce SparkR z Lakehouse

Data mohou být uložena v místním systému souborů uzlů clusteru. Obecné metody čtení a zápisu datového rámce SparkR z Lakehouse je read.df a write.df. Tyto metody přebírají cestu pro načtení souboru a typ zdroje dat. SparkR nativně podporuje čtení souborů CSV, JSON, textu a Parquet.

Pokud chcete číst a zapisovat do Lakehouse, nejprve ho přidejte do relace. Na levé straně poznámkového bloku vyberte Přidat a přidejte existující lakehouse nebo vytvořte Lakehouse.

Poznámka:

Pokud chcete získat přístup k souborům Lakehouse pomocí balíčků Sparku, například read.df nebo write.df, použijte jeho cestu ADFS nebo relativní cestu pro Spark. V Průzkumníku Lakehouse klikněte pravým tlačítkem na soubory nebo složku, ke které chcete získat přístup, a zkopírujte jeho cestu ADFS nebo relativní cestu pro Spark z místní nabídky.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric je tidyverse předinstalovaný. K souborům Lakehouse můžete přistupovat ve známých balíčcích R, jako je čtení a zápis souborů Lakehouse pomocí readr::read_csv() a readr::write_csv().

Poznámka:

Pokud chcete získat přístup k souborům Lakehouse pomocí balíčků R, musíte použít cestu k rozhraní File API. V Průzkumníku Lakehouse klikněte pravým tlačítkem na soubor nebo složku, ke které chcete získat přístup, a zkopírujte jeho cestu k rozhraní File API z místní nabídky.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

Datový rámec SparkR ve službě Lakehouse můžete také číst pomocí dotazů SparkSQL.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Čtení a zápis tabulek SQL prostřednictvím rodBC

Pomocí rodBC se připojte k databázím založeným na SQL prostřednictvím rozhraní ODBC. Můžete se například připojit k vyhrazenému fondu Synapse SQL, jak je znázorněno v následujícím ukázkovém kódu. Nahraďte vlastní podrobnosti o připojení , <database><uid>, <password>a <table>.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

Operace datového rámce

Datové rámce SparkR podporují mnoho funkcí pro zpracování strukturovaných dat. Tady je několik základních příkladů. Úplný seznam najdete v dokumentaci k rozhraní API SparkR.

Výběr řádků a sloupců

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Seskupení a agregace

Datové rámce SparkR podporují mnoho běžně používaných funkcí pro agregaci dat po seskupení. Můžeme například vypočítat histogram doby čekání v věrné datové sadě, jak je znázorněno níže.

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Operace se sloupci

SparkR poskytuje mnoho funkcí, které lze přímo použít u sloupců pro zpracování a agregaci dat. Následující příklad ukazuje použití základních aritmetických funkcí.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Použití uživatelem definované funkce

SparkR podporuje několik druhů uživatelem definovaných funkcí:

Spuštění funkce u velké datové sady s dapply nebo dapplyCollect

dapply

Použití funkce pro každý oddíl oddílu SparkDataFrame. Funkce, která se má použít pro každý oddíl oddílu SparkDataFrame a měla by mít pouze jeden parametr, do kterého bude předána hodnota data.frame každému oddílu. Výstup funkce by měl být .data.frame Schéma určuje formát řádku výsledného objektu SparkDataFrame. Musí odpovídat datovým typům vrácených hodnot.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Podobně jako dapply použijte funkci na každý oddíl oddílu SparkDataFrame a shromážděte výsledek zpět. Výstupem funkce by měl být data.frame. Tentokrát ale není nutné předat schéma. Všimněte si, že dapplyCollect pokud výstupy funkce běží ve všech oddílech, nelze na ovladač načíst a vejít do paměti ovladače.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Spuštění funkce pro velké seskupení datových sad podle vstupních sloupců s gapply nebo gapplyCollect

gapply

Použití funkce pro každou skupinu .SparkDataFrame Funkce se použije pro každou skupinu a SparkDataFrame měla by mít pouze dva parametry: seskupovací klíč a R data.frame odpovídající danému klíči. Skupiny se vyberou ze SparkDataFrames sloupců. Výstupem funkce by měl být data.frame. Schéma určuje formát řádku výsledného SparkDataFramesouboru . Musí reprezentovat výstupní schéma funkce R z datových typů Sparku. Názvy vrácených data.frame sloupců jsou nastaveny uživatelem.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Podobně jako gapplyfunkce použije funkci pro každou skupinu a SparkDataFrame shromáždí výsledek zpět do R data.frame. Výstupem funkce by měl být data.frame. Schéma ale není nutné předat. Všimněte si, že gapplyCollect pokud výstupy funkce běží ve všech oddílech, nelze na ovladač načíst a vejít do paměti ovladače.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Spouštění místních funkcí R distribuovaných pomocí spark.lapply

spark.lapply

lapply Podobně jako v nativním jazyce R spark.lapply spustí funkci nad seznamem prvků a distribuuje výpočty pomocí Sparku. Použije funkci způsobem, který je podobný doParallel prvkům seznamu nebo lapply prvkům seznamu. Výsledky všechvýpočtůch Pokud tomu tak není, mohou udělat něco jako df <- createDataFrame(list) a pak použít dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Spouštění dotazů SQL ze SparkR

Datový rámec SparkR lze také zaregistrovat jako dočasné zobrazení, které umožňuje spouštět dotazy SQL na jeho data. Funkce SQL umožňuje aplikacím programově spouštět dotazy SQL a vrací výsledek jako datový rámec SparkR.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

strojové učení

SparkR zveřejňuje většinu algoritmů MLLib. SparkR pod kapotou používá K trénování modelu MLlib.

Následující příklad ukazuje, jak pomocí SparkR sestavit model Gaussian GLM. Chcete-li spustit lineární regresi, nastavte rodinu na "gaussian". Pokud chcete spustit logistickou regresi, nastavte rodinu na "binomial"hodnotu . Pokud používáte SparkML GLM SparkR, automaticky provádí kódování kategorických funkcí s jedním žhavým kódováním, aby se nemuselo provádět ručně. Kromě funkcí typu String a Double je také možné přizpůsobit funkce MLlib Vector, aby byly kompatibilní s jinými komponentami knihovny MLlib.

Další informace o podporovaných algoritmech strojového učení najdete v dokumentaci pro SparkR a MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)