Udostępnij za pośrednictwem


Korzystanie z aparatu SparkR

SparkR to pakiet języka R, który zapewnia lekki fronton do korzystania z platformy Apache Spark z języka R. SparkR udostępnia rozproszoną implementację ramek danych, która obsługuje operacje, takie jak wybór, filtrowanie, agregacja itp. Usługa SparkR obsługuje również rozproszone uczenie maszynowe przy użyciu biblioteki MLlib.

Używaj aparatu SparkR za pomocą definicji zadań wsadowych platformy Spark lub interaktywnych notesów usługi Microsoft Fabric.

Obsługa języka R jest dostępna tylko w środowisku Spark3.1 lub nowszym. Język R na platformie Spark 2.4 nie jest obsługiwany.

Wymagania wstępne

  • Uzyskaj subskrypcję usługi Microsoft Fabric. Możesz też utworzyć konto bezpłatnej wersji próbnej usługi Microsoft Fabric.

  • Zaloguj się do usługi Microsoft Fabric.

  • Użyj przełącznika środowiska po lewej stronie głównej, aby przełączyć się na środowisko usługi Synapse Nauka o danych.

    Screenshot of the experience switcher menu, showing where to select Data Science.

  • Otwórz lub utwórz notes. Aby dowiedzieć się, jak używać notesów usługi Microsoft Fabric, zobacz Jak używać notesów usługi Microsoft Fabric.

  • Ustaw opcję języka na SparkR (R), aby zmienić język podstawowy.

  • Dołącz notes do magazynu lakehouse. Po lewej stronie wybierz pozycję Dodaj , aby dodać istniejący obiekt lakehouse lub utworzyć jezioro.

Odczytywanie i zapisywanie ramek danych platformy SparkR

Odczytywanie ramki danych SparkR z lokalnej ramki danych języka R

Najprostszym sposobem utworzenia ramki danych jest przekonwertowanie lokalnej ramki danych języka R na ramkę danych platformy Spark.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Odczytywanie i zapisywanie ramki danych SparkR z usługi Lakehouse

Dane mogą być przechowywane w lokalnym systemie plików węzłów klastra. Ogólne metody odczytywania i zapisywania ramki danych SparkR z usługi Lakehouse to read.df i write.df. Te metody przyjmują ścieżkę do załadowania pliku i typu źródła danych. Usługa SparkR obsługuje natywne odczytywanie plików CSV, JSON, tekstowych i Parquet.

Aby odczytać i zapisać w usłudze Lakehouse, najpierw dodaj go do sesji. Po lewej stronie notesu wybierz pozycję Dodaj , aby dodać istniejącą usługę Lakehouse lub utworzyć usługę Lakehouse.

Uwaga

Aby uzyskać dostęp do plików usługi Lakehouse przy użyciu pakietów platformy Spark, takich jak read.df lub write.df, użyj ścieżki usługi ADFS lub ścieżki względnej dla platformy Spark. W eksploratorze usługi Lakehouse kliknij prawym przyciskiem myszy pliki lub folder, do których chcesz uzyskać dostęp, i skopiuj ścieżkę usługi ADFS lub ścieżkę względną platformy Spark z menu kontekstowego.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Usługa Microsoft Fabric została tidyverse wstępnie zainstalowana. Dostęp do plików Lakehouse można uzyskać w znanych pakietach języka R, takich jak odczytywanie i zapisywanie plików Lakehouse przy użyciu narzędzi readr::read_csv() i readr::write_csv().

Uwaga

Aby uzyskać dostęp do plików Lakehouse przy użyciu pakietów języka R, musisz użyć ścieżki interfejsu API plików. W Eksploratorze usługi Lakehouse kliknij prawym przyciskiem myszy plik lub folder, do którego chcesz uzyskać dostęp, i skopiuj ścieżkę interfejsu API plików z menu kontekstowego.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

Możesz również odczytać ramkę danych SparkR w usłudze Lakehouse przy użyciu zapytań SparkSQL.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Odczytywanie i zapisywanie tabel SQL za pośrednictwem kontrolera RODBC

Użyj kontrolera RODBC, aby nawiązać połączenie z bazami danych opartymi na języku SQL za pośrednictwem interfejsu ODBC. Możesz na przykład nawiązać połączenie z dedykowaną pulą SQL usługi Synapse, jak pokazano w poniższym przykładowym kodzie. Zastąp własne szczegóły połączenia , <database>, <uid><password>i <table>.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

Operacje ramki danych

Ramki danych platformy SparkR obsługują wiele funkcji do przetwarzania danych ze strukturą. Oto kilka podstawowych przykładów. Pełną listę można znaleźć w dokumentacji interfejsu API platformy SparkR.

Wybieranie wierszy i kolumn

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Grupowanie i agregacja

Ramki danych platformy SparkR obsługują wiele często używanych funkcji do agregowania danych po zgrupowaniu. Na przykład możemy obliczyć histogram czasu oczekiwania w wiernym zestawie danych, jak pokazano poniżej

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Operacje na kolumnach

Platforma SparkR udostępnia wiele funkcji, które można bezpośrednio stosować do kolumn na potrzeby przetwarzania i agregacji danych. W poniższym przykładzie pokazano użycie podstawowych funkcji arytmetycznych.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Stosowanie funkcji zdefiniowanej przez użytkownika

Usługa SparkR obsługuje kilka rodzajów funkcji zdefiniowanych przez użytkownika:

Uruchamianie funkcji w dużym zestawie danych za pomocą dapply polecenia lub dapplyCollect

dapply

Zastosuj funkcję do każdej partycji obiektu SparkDataFrame. Funkcja, która ma zostać zastosowana do każdej partycji SparkDataFrame obiektu i powinna mieć tylko jeden parametr, do którego zostanie przekazana ramka data.frame. Dane wyjściowe funkcji powinny mieć wartość data.frame. Schemat określa format wiersza wynikowego SparkDataFrameelementu . Musi być zgodna z typami danych zwracanej wartości.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Podobnie jak w przypadku aplikacji, zastosuj funkcję do każdej partycji obiektu SparkDataFrame i zbierz wynik z powrotem. Dane wyjściowe funkcji powinny mieć wartość data.frame. Jednak tym razem schemat nie jest wymagany do przekazania. Należy pamiętać, że może zakończyć się niepowodzeniem, dapplyCollect jeśli dane wyjściowe funkcji są uruchamiane na wszystkich partycjach, nie można ściągnąć do sterownika i zmieścić się w pamięci sterownika.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Uruchamianie funkcji na dużym zestawie danych grupowania według kolumn wejściowych z gapply lub gapplyCollect

gapply

Zastosuj funkcję do każdej grupy obiektu SparkDataFrame. Funkcja ma być stosowana do każdej grupy elementów SparkDataFrame i powinna mieć tylko dwa parametry: klucz grupowania i R data.frame odpowiadający temu kluczowi. Grupy są wybierane z SparkDataFrames kolumn. Dane wyjściowe funkcji powinny mieć wartość data.frame. Schemat określa format wiersza wynikowego SparkDataFrame. Musi reprezentować schemat danych wyjściowych funkcji języka R z typów danych platformy Spark. Nazwy zwracanych kolumn są ustawiane data.frame przez użytkownika.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Podobnie jak gapply, stosuje funkcję do każdej grupy obiektu SparkDataFrame i zbiera wynik z powrotem do języka R data.frame. Dane wyjściowe funkcji powinny mieć wartość data.frame. Jednak schemat nie jest wymagany do przekazania. Należy pamiętać, że może zakończyć się niepowodzeniem, gapplyCollect jeśli dane wyjściowe funkcji są uruchamiane na wszystkich partycjach, nie można ściągnąć do sterownika i zmieścić się w pamięci sterownika.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Uruchamianie lokalnych funkcji języka R dystrybuowanych za pomocą biblioteki spark.lapply

spark.lapply

Podobnie jak lapply w natywnym języku R, spark.lapply uruchamia funkcję na liście elementów i dystrybuuje obliczenia za pomocą platformy Spark. Stosuje funkcję w sposób podobny do doParallel lub lapply do elementów listy. Wyniki wszystkich obliczeń powinny mieścić się na jednej maszynie. Jeśli tak nie jest, mogą zrobić coś takiego, df <- createDataFrame(list) a następnie użyć polecenia dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Uruchamianie zapytań SQL z platformy SparkR

Ramka danych platformy SparkR może być również zarejestrowana jako widok tymczasowy, który umożliwia uruchamianie zapytań SQL na danych. Funkcja SQL umożliwia aplikacjom programowe uruchamianie zapytań SQL i zwraca wynik jako ramkę danych SparkR.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Uczenie maszynowe

Usługa SparkR uwidacznia większość algorytmów MLLib. Pod maską platforma SparkR używa biblioteki MLlib do trenowania modelu.

W poniższym przykładzie pokazano, jak utworzyć model Gaussian GLM przy użyciu aparatu SparkR. Aby uruchomić regresję liniową, ustaw dla rodziny wartość "gaussian". Aby uruchomić regresję logistyczną, ustaw dla rodziny wartość "binomial". W przypadku korzystania z sparkML GLM SparkR automatycznie wykonuje jednorazowe kodowanie cech kategorii, aby nie trzeba było wykonywać go ręcznie. Poza funkcjami typu String i Double można również dopasować funkcje MLlib Vector, aby zapewnić zgodność z innymi składnikami MLlib.

Aby dowiedzieć się więcej na temat obsługiwanych algorytmów uczenia maszynowego, zapoznaj się z dokumentacją platform SparkR i MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)