Compartir por


Uso de SparkR

SparkR es un paquete de R que proporciona un front-end ligero para usar Apache Spark desde R. SparkR proporciona una implementación de trama de datos distribuida que admite operaciones como la selección, el filtrado o la agregación de elementos, entre otras. Igualmente, SparkR también admite el aprendizaje automático distribuido mediante MLlib.

Use SparkR a través de definiciones de trabajos por lotes de Spark o con cuadernos interactivos de Microsoft Fabric.

La compatibilidad con R solo está disponible en Spark3.1 o superior. No se admite R en Spark 2.4.

Requisitos previos

  • Abra o cree un cuaderno. Para obtener información sobre cómo hacerlo, consulte Uso de cuadernos de Microsoft Fabric.

  • Establezca la opción de lenguaje en SparkR (R) para cambiar el lenguaje principal.

  • Adjunte el cuaderno a un almacén de lago. En el lado izquierdo, seleccione Añadir para añadir un almacén de lago existente o crear uno.

Lectura y escritura de DataFrames de SparkR

Lectura de un DataFrame de SparkR a partir de un data.frame local de R

La manera más sencilla de crear un DataFrame es convertir un elemento data.frame de R local en un valor de Spark DataFrame.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Lectura y escritura de DataFrame de SparkR desde Lakehouse

Los datos se pueden almacenar en el sistema de archivos local de los nodos de clúster. Los métodos generales para leer y escribir un SparkR DataFrame desde Lakehouse son read.df y write.df. Estos métodos toman la ruta de acceso del archivo que se va a cargar y el tipo de origen de datos. SparkR admite la lectura de archivos de texto, CSV, JSON y Parquet de forma nativa.

Para leer y escribir en una instancia de Lakehouse, agréguela primero a la sesión. En el lado izquierdo del cuaderno, seleccione Agregar para agregar una instancia de Lakehouse existente o crear una instancia de Lakehouse.

Nota:

Para acceder a los archivos de Lakehouse mediante paquetes de Spark, como read.df o write.df, use su ruta de acceso de ADFS o la ruta de acceso relativa para Spark. En el explorador de Lakehouse, haga clic con el botón derecho del ratón en los archivos o carpetas a los que quiera acceder y copie su ruta de acceso de ADFS o ruta relativa para Spark desde el menú contextual.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric tiene tidyverse preinstalado. Puede acceder a los archivos de Lakehouse en los paquetes de R conocidos, como leer y escribir archivos de Lakehouse mediante readr::read_csv() y readr::write_csv().

Nota:

Para acceder a los archivos de Lakehouse mediante paquetes de R, debe usar la ruta de acceso de la API de archivos. En el explorador de Lakehouse, haga clic con el botón derecho en el archivo o la carpeta a la que desea acceder y copie su ruta de acceso de la API de archivos en el menú contextual.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

También puede leer un dataframe de SparkR en Lakehouse mediante consultas de SparkSQL.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Operaciones de dataframe

Los DataFrames de SparkR son compatibles con muchas funciones para realizar el procesamiento de datos estructurados. A continuación se muestran algunos ejemplos básicos. Encontrará una lista completa en los Documentos de la API de SparkR.

Selección de filas y columnas

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Agrupación y agregación

Los dataframes de SparkR admiten varias funciones de uso frecuente para agregar datos después de la agrupación. Por ejemplo, podemos calcular un histograma del tiempo de espera en el conjunto de datos fiel, como se muestra a continuación.

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Operaciones de columna

SparkR proporciona varias funciones que se pueden aplicar directamente a las columnas para el procesamiento y la agregación de datos. En el ejemplo siguiente se muestra el uso de funciones aritméticas básicas.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Aplicación de la función definida por el usuario

SparkR es compatible con varios tipos de funciones definidas por el usuario:

Ejecución de una función en un conjunto de datos grande con dapply o dapplyCollect

dapply

Se aplica una función a cada partición de SparkDataFrame. La función que se aplicará a cada partición del SparkDataFrame y que solo debe tener un parámetro, al que se le pasará un data.frame correspondiente a cada partición. La salida de la función debe ser data.frame. El esquema especifica el formato de la fila resultante de SparkDataFrame. Debe coincidir con los tipos de datos del valor devuelto.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Al igual que dapply, aplique una función a cada partición de SparkDataFrame y recopile el resultado. La salida de la función debe ser data.frame. Pero, esta vez, no es necesario pasar el esquema. Tenga en cuenta que dapplyCollect puede fallar si las salidas de la función ejecutada en toda la partición no se pueden llevar al controlador y caben en la memoria del controlador.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Ejecución de una función en una agrupación de conjuntos de datos de gran tamaño por columnas de entrada con gapply o gapplyCollect

gapply

Aplique una función a cada grupo de un SparkDataFrame. La función se debe aplicar a cada grupo de SparkDataFrame y solo debe tener dos parámetros: clave de agrupación y data.frame de R correspondientes a esa clave. Los grupos se eligen entre las columnas de SparkDataFrames. La salida de la función debe ser data.frame. El esquema especifica el formato de la fila del SparkDataFrame resultante. Debe representar el esquema de salida de la función de R de los tipos de datos de Spark. El usuario establece los nombres de columna del data.frame devuelto.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Al igual que gapply, aplica una función a cada grupo de un SparkDataFrame y recoge el resultado de nuevo en R data.frame. La salida de la función debe ser data.frame. Pero no es necesario pasar el esquema. Tenga en cuenta que gapplyCollect puede fallar si las salidas de la función ejecutada en toda la partición no se pueden llevar al controlador y caben en la memoria del controlador.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Ejecución de funciones de R locales distribuidas con spark.lapply

spark.lapply

Al igual que lapply en R nativo, spark.lapply ejecuta una función en una lista de elementos y distribuye los cálculos con Spark. Aplica una función de una manera similar a doParallel o lapply a los elementos de una lista. Los resultados de todos los cálculos deben caber en una sola máquina. Si no es así, pueden hacer algo parecido a df <- createDataFrame(list) y, a continuación, usar dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Ejecución de consultas SQL desde SparkR

Un DataFrame de SparkR también se puede registrar como una vista temporal que le permite ejecutar consultas SQL sobre sus datos. La función SQL habilita las aplicaciones para ejecutar consultas SQL mediante programación y devuelve el resultado como un DataFrame de SparkR.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Machine Learning

SparkR expone la mayoría de los algoritmos de MLLib. En segundo plano, SparkR usa MLlib para entrenar el modelo.

En el ejemplo siguiente se muestra cómo crear un modelo GLM gausiano mediante SparkR. Para ejecutar la regresión lineal, establezca el valor de "family" en "gaussian". Para ejecutar la regresión logística, establezca el valor de "family" en "binomial". Cuando se usa SparkML GLM, SparkR realiza automáticamente la codificación "one-hot" de características categóricas para que no sea necesario realizarla manualmente. Además de las características de los tipos String y Double, también es posible ajustarse a las características de vector de MLlib, con el fin de obtener compatibilidad con otros componentes de MLlib.

Para obtener más información sobre qué algoritmos de aprendizaje automático se admiten, puede consultar la documentación de SparkR y MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)