SparkR kullanma

SparkR , R'den Apache Spark'ı kullanmak için hafif bir ön uç sağlayan bir R paketidir. SparkR, seçim, filtreleme, toplama gibi işlemleri destekleyen dağıtılmış bir veri çerçevesi uygulaması sağlar. SparkR, MLlib kullanarak dağıtılmış makine öğrenmesini de destekler.

Spark toplu iş tanımları aracılığıyla veya etkileşimli Microsoft Fabric not defterleriyle SparkR kullanın.

R desteği yalnızca Spark3.1 veya üzerinde kullanılabilir. Spark 2.4'te R desteklenmez.

Önkoşullar

  • Not defterini açın veya oluşturun. Nasıl yapılacağını öğrenmek için bkz . Microsoft Fabric not defterlerini kullanma.

  • Birincil dili değiştirmek için dil seçeneğini SparkR (R) olarak ayarlayın.

  • Not defterinizi bir göle ekleyin. Sol tarafta Ekle'yi seçerek mevcut bir göl evi ekleyin veya bir göl evi oluşturun.

SparkR DataFrame'leri okuma ve yazma

Yerel R data.frame'den SparkR DataFrame okuma

DataFrame oluşturmanın en basit yolu, yerel bir R data.frame'i Spark DataFrame'e dönüştürmektir.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Lakehouse'dan SparkR DataFrame okuma ve yazma

Veriler küme düğümlerinin yerel dosya sisteminde depolanabilir. Lakehouse'dan SparkR DataFrame okumak ve yazmak için genel yöntemler ve write.dfşeklindedirread.df. Bu yöntemler dosyanın yüklenmesi için yolu ve veri kaynağı türünü alır. SparkR, CSV, JSON, metin ve Parquet dosyalarını yerel olarak okumayı destekler.

Bir Lakehouse'ı okumak ve yazmak için önce oturumunuza ekleyin. Not defterinin sol tarafında Ekle'yi seçerek mevcut bir Lakehouse ekleyin veya bir Lakehouse oluşturun.

Not

veya gibi read.df Spark paketlerini kullanarak Lakehouse dosyalarına erişmek için, Spark için ADFS yolunu veya göreli yolunu kullanın.write.df Lakehouse gezgininde, erişmek istediğiniz dosya veya klasöre sağ tıklayın ve bağlam menüsünden Spark'ın ADFS yolunu veya göreli yolunu kopyalayın.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric tidyverse önceden yüklenmiş. ve readr::write_csv()kullanarak readr::read_csv() Lakehouse dosyalarını okuma ve yazma gibi tanıdık R paketlerinizde Lakehouse dosyalarına erişebilirsiniz.

Not

R paketlerini kullanarak Lakehouse dosyalarına erişmek için Dosya API'sinin yolunu kullanmanız gerekir. Lakehouse gezgininde, erişmek istediğiniz dosyaya veya klasöre sağ tıklayın ve bağlam menüsünden Dosya API'sinin yolunu kopyalayın.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

SparkSQL sorgularını kullanarak Lakehouse'unuzda bir SparkR Dataframe de okuyabilirsiniz.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

RODBC aracılığıyla SQL tablolarını okuma ve yazma

BIR ODBC arabirimi aracılığıyla SQL tabanlı veritabanlarına bağlanmak için RODBC kullanın. Örneğin, aşağıdaki örnek kodda gösterildiği gibi Synapse ayrılmış SQL havuzuna bağlanabilirsiniz. , , <uid><password>ve <table>için <database>kendi bağlantı ayrıntılarınızı değiştirin.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

DataFrame işlemleri

SparkR DataFrames, yapılandırılmış veri işleme gerçekleştirmek için birçok işlevi destekler. Aşağıda bazı temel örnekler verilmiştir. SparkR API belgelerinde tam bir liste bulunabilir.

Satır ve sütun seçme

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Gruplandırma ve toplama

SparkR veri çerçeveleri, gruplandırma sonrasında verileri toplamak için yaygın olarak kullanılan birçok işlevi destekler. Örneğin, aşağıda gösterildiği gibi sadık veri kümesinde bekleme süresinin histogramını hesaplayabiliriz

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Sütun işlemleri

SparkR, veri işleme ve toplama için sütunlara doğrudan uygulanabilen birçok işlev sağlar. Aşağıdaki örnekte temel aritmetik işlevlerin kullanımı gösterilmektedir.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Kullanıcı tanımlı işlevi uygulama

SparkR, kullanıcı tanımlı işlevlerin çeşitli türlerini destekler:

veya ile dapply büyük bir veri kümesinde işlev çalıştırma dapplyCollect

dapply

bir öğesinin her bölümüne bir SparkDataFrameişlev uygulama. her bölümüne uygulanacak işlevin SparkDataFrame tek bir parametresi olmalıdır ve data.frame her bölüme karşılık gelir. İşlevin çıkışı bir data.frameolmalıdır. Şema, sonuçta SparkDataFrameelde edilen öğesinin satır biçimini belirtir. Döndürülen değerin veri türleriyle eşleşmelidir.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Depply gibi, bir SparkDataFrame öğesinin her bölümüne bir işlev uygulayın ve sonucu geri toplayın. işlevinin çıkışı bir data.frameolmalıdır. Ancak bu kez şemanın geçirilmesi gerekli değildir. dapplyCollect İşlevin çıkışları tüm bölümde çalıştırıldığında sürücüye çekilemezse ve sürücü belleğine sığmazsa başarısız olabileceğini unutmayın.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

veya ile gapply giriş sütunlarına göre gruplandırma yaparak büyük bir veri kümesinde işlev çalıştırma gapplyCollect

gapply

Bir öğesinin SparkDataFrameher grubuna bir işlev uygulama. işlevi, öğesinin her grubuna SparkDataFrame uygulanacaktır ve yalnızca iki parametre içermelidir: gruplandırma anahtarı ve bu anahtara karşılık gelen R data.frame . Gruplar sütunlardan SparkDataFrames seçilir. işlevinin çıkışı bir data.frameolmalıdır. Şema, sonuçta SparkDataFrameelde edilen öğesinin satır biçimini belirtir. Spark veri türlerinden R işlevinin çıkış şemasını temsil etmelidir. Döndürülen data.frame sütun adları kullanıcı tarafından ayarlanır.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

gibi gapply, bir öğesinin SparkDataFrame her grubuna bir işlev uygular ve sonucu R'ye data.framegeri toplar. işlevinin çıkışı bir data.frameolmalıdır. Ancak, şemanın geçirilmesi gerekmez. gapplyCollect İşlevin çıkışları tüm bölümde çalıştırıldığında sürücüye çekilemezse ve sürücü belleğine sığmazsa başarısız olabileceğini unutmayın.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

spark.lapply ile dağıtılmış yerel R işlevlerini çalıştırma

spark.lapply

Yerel R'dekine benzer şekilde lapply , spark.lapply bir öğe listesi üzerinde bir işlev çalıştırır ve hesaplamaları Spark ile dağıtır. Bir işlevi, bir listenin öğelerine doParallellapply benzer bir şekilde uygular. Tüm hesaplamaların sonuçları tek bir makineye sığmalıdır. Böyle bir durum söz konusu değilse, gibi df <- createDataFrame(list) bir şey yapabilir ve kullanabilirler dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

SparkR'den SQL sorguları çalıştırma

SparkR DataFrame, verileri üzerinde SQL sorguları çalıştırmanızı sağlayan geçici bir görünüm olarak da kaydedilebilir. sql işlevi, uygulamaların SQL sorgularını program aracılığıyla çalıştırmasına olanak tanır ve sonucu SparkR DataFrame olarak döndürür.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Makine öğrenmesi

SparkR, MLLib algoritmalarının çoğunu kullanıma sunar. SparkR, modeli eğitmek için arka planda MLlib kullanır.

Aşağıdaki örnekte SparkR kullanarak Gauss GLM modelinin nasıl derlendği gösterilmektedir. Doğrusal regresyon çalıştırmak için aileyi olarak "gaussian"ayarlayın. Lojistik regresyon çalıştırmak için aileyi olarak "binomial"ayarlayın. SparkML GLM SparkR kullanırken otomatik olarak kategorik özelliklerin tek sık kodlamasını gerçekleştirir, böylece el ile yapılması gerekmez. Dize ve Çift tür özelliklerinin ötesinde, diğer MLlib bileşenleriyle uyumluluk için MLlib Vector özelliklerine de sığabilir.

Hangi makine öğrenmesi algoritmalarının desteklendiği hakkında daha fazla bilgi edinmek için SparkR ve MLlib belgelerini ziyaret edebilirsiniz.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)