Bagikan melalui


Menggunakan SparkR

SparkR adalah paket R yang menyediakan frontend ringan untuk menggunakan Apache Spark dari R. SparkR menyediakan implementasi bingkai data terdistribusi yang mendukung operasi seperti pemilihan, pemfilteran, agregasi, dll. SparkR juga mendukung pembelajaran mesin terdistribusi menggunakan MLlib.

Gunakan SparkR melalui definisi pekerjaan batch Spark atau dengan notebook Microsoft Fabric interaktif.

Dukungan R hanya tersedia di Spark3.1 atau lebih tinggi. R di Spark 2.4 tidak didukung.

Prasyarat

  • Buka atau buat buku catatan. Untuk mempelajari caranya, lihat Cara menggunakan notebook Microsoft Fabric.

  • Atur opsi bahasa ke SparkR (R) untuk mengubah bahasa utama.

  • Lampirkan buku catatan Anda ke lakehouse. Di sisi kiri, pilih Tambahkan untuk menambahkan lakehouse yang ada atau untuk membuat lakehouse.

Membaca dan menulis SparkR DataFrames

Membaca SparkR DataFrame dari data.frame R lokal

Cara paling sederhana untuk membuat DataFrame adalah dengan mengonversi data.frame R lokal menjadi Spark DataFrame.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Membaca dan menulis SparkR DataFrame dari Lakehouse

Data dapat disimpan di sistem file lokal simpul kluster. Metode umum untuk membaca dan menulis SparkR DataFrame dari Lakehouse adalah read.df dan write.df. Metode ini mengambil jalur untuk memuat file dan jenis sumber data. SparkR mendukung membaca file CSV, JSON, teks, dan Parquet secara native.

Untuk membaca dan menulis ke Lakehouse, pertama-tama tambahkan ke sesi Anda. Di sisi kiri buku catatan, pilih Tambahkan untuk menambahkan Lakehouse yang sudah ada atau buat Lakehouse.

Catatan

Untuk mengakses file Lakehouse menggunakan paket Spark, seperti read.df atau , gunakan jalur ADFS atau jalur relatifnya untuk Sparkwrite.df. Di penjelajah Lakehouse, klik kanan pada file atau folder yang ingin Anda akses dan salin jalur ADFS atau jalur relatifnya untuk Spark dari menu kontekstual.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric telah tidyverse diinstal sebelumnya. Anda dapat mengakses file Lakehouse dalam paket R anda yang sudah dikenal, seperti membaca dan menulis file Lakehouse menggunakan readr::read_csv() dan readr::write_csv().

Catatan

Untuk mengakses file Lakehouse menggunakan paket R, Anda perlu menggunakan jalur FILE API. Di penjelajah Lakehouse, klik kanan pada file atau folder yang ingin Anda akses dan salin jalur API File-nya dari menu kontekstual.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

Anda juga dapat membaca SparkR Dataframe di Lakehouse Anda menggunakan kueri SparkSQL.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Operasi DataFrame

SparkR DataFrames mendukung banyak fungsi untuk melakukan pemrosesan data terstruktur. Berikut adalah beberapa contoh dasar. Daftar lengkap dapat ditemukan di dokumen SparkR API.

Pilih baris dan kolom

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Pengelompokan dan agregasi

Bingkai data SparkR mendukung banyak fungsi yang umum digunakan untuk menggabungkan data setelah pengelompokan. Misalnya, kita dapat menghitung histogram waktu tunggu dalam himpunan data yang setia seperti yang ditunjukkan di bawah ini

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Operasi kolom

SparkR menyediakan banyak fungsi yang dapat langsung diterapkan ke kolom untuk pemrosesan dan agregasi data. Contoh berikut menunjukkan penggunaan fungsi aritmatika dasar.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Menerapkan fungsi yang ditentukan pengguna

SparkR mendukung beberapa jenis fungsi yang ditentukan pengguna:

Menjalankan fungsi pada himpunan data besar dengan dapply atau dapplyCollect

dapply

Terapkan fungsi ke setiap partisi SparkDataFrame. Fungsi yang akan diterapkan ke setiap partisi SparkDataFrame dan seharusnya hanya memiliki satu parameter, di mana data.frame sesuai dengan setiap partisi akan diteruskan. Output fungsi harus berupa data.frame. Skema menentukan format baris dari hasil .SparkDataFrame Ini harus cocok dengan jenis data dari nilai yang dikembalikan.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Seperti dapply, terapkan fungsi ke setiap partisi SparkDataFrame dan kumpulkan hasilnya kembali. Output fungsi harus berupa data.frame. Tapi, kali ini, skema tidak diperlukan untuk dilewati. Perhatikan bahwa dapplyCollect dapat gagal jika output fungsi berjalan pada semua partisi tidak dapat ditarik ke driver dan pas di memori driver.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Menjalankan fungsi pada pengelompokan himpunan data besar menurut kolom input dengan gapply atau gapplyCollect

gapply

Terapkan fungsi ke setiap grup SparkDataFrame. Fungsi ini akan diterapkan ke setiap grup dan seharusnya hanya memiliki dua parameter: mengelompokkan kunci dan R data.frame yang sesuai dengan kunci tersebutSparkDataFrame. Grup dipilih dari SparkDataFrames kolom. Output fungsi harus berupa data.frame. Skema menentukan format baris dari hasil SparkDataFrame. Ini harus mewakili skema output fungsi R dari jenis data Spark. Nama kolom yang dikembalikan data.frame diatur oleh pengguna.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Seperti gapply, menerapkan fungsi ke setiap grup SparkDataFrame dan mengumpulkan hasilnya kembali ke R data.frame. Output fungsi harus berupa data.frame. Tapi, skema tidak diperlukan untuk diteruskan. Perhatikan bahwa gapplyCollect dapat gagal jika output fungsi berjalan pada semua partisi tidak dapat ditarik ke driver dan pas di memori driver.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Jalankan fungsi R lokal yang didistribusikan dengan spark.lapply

spark.lapply

Mirip lapply dengan dalam R asli, spark.lapply menjalankan fungsi melalui daftar elemen dan mendistribusikan komputasi dengan Spark. Menerapkan fungsi dengan cara yang mirip doParallel dengan atau lapply dengan elemen daftar. Hasil dari semua komputasi harus sesuai dalam satu komputer. Jika itu tidak terjadi, mereka dapat melakukan sesuatu seperti df <- createDataFrame(list) dan kemudian menggunakan dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Menjalankan kueri SQL dari SparkR

SparkR DataFrame juga dapat didaftarkan sebagai tampilan sementara yang memungkinkan Anda menjalankan kueri SQL melalui datanya. Fungsi sql memungkinkan aplikasi menjalankan kueri SQL secara terprogram dan mengembalikan hasilnya sebagai SparkR DataFrame.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Pembelajaran Mesin

SparkR mengekspos sebagian besar algoritma MLLib. Di balik layanan, SparkR menggunakan MLlib untuk melatih model.

Contoh berikut menunjukkan cara membangun model GLM Gaussian menggunakan SparkR. Untuk menjalankan regresi linier, atur keluarga ke "gaussian". Untuk menjalankan regresi logistik, atur keluarga ke "binomial". Saat menggunakan SparkML GLM SparkR secara otomatis melakukan pengodean satu panas fitur kategoris sehingga tidak perlu dilakukan secara manual. Selain fitur jenis String dan Double, dimungkinkan juga untuk menyesuaikan fitur MLlib Vector, untuk kompatibilitas dengan komponen MLlib lainnya.

Untuk mempelajari selengkapnya tentang algoritma pembelajaran mesin mana yang didukung, Anda dapat mengunjungi dokumentasi untuk SparkR dan MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)