Compartir por


Uso de sparklyr

sparklyr es una interfaz de R para Apache Spark. Esta interfaz proporciona un mecanismo para interactuar con Spark mediante interfaces de R conocidas. Puede usar sparklyr a través de definiciones de trabajo por lotes de Spark o con cuadernos interactivos de Microsoft Fabric.

sparklyr se usa junto con otros paquetes de tidyverse, como dplyr. Microsoft Fabric distribuye la versión estable más reciente de sparklyr y tidyverse con cada versión en tiempo de ejecución. Puede importarlos y empezar a usar la API.

Requisitos previos

  • Abra o cree un cuaderno. Para obtener información sobre cómo hacerlo, consulte Uso de cuadernos de Microsoft Fabric.

  • Establezca la opción de lenguaje en SparkR (R) para cambiar el lenguaje principal.

  • Adjunte el cuaderno a un almacén de lago. En el lado izquierdo, seleccione Añadir para añadir un almacén de lago existente o crear uno.

Conexión de sparklyr al clúster de Synapse Spark

Puede utilizar el siguiente método de conexión en spark_connect() para establecer una conexión sparklyr. Se admite un nuevo método de conexión denominado synapse, que permite conectarse a una sesión de Spark existente. Reduce drásticamente la hora de inicio de la sesión sparklyr. Además, hemos contribuido este método de conexión al proyecto sparklyr de código abierto. Con method = "synapse", puede usar sparklyr y SparkR en la misma sesión y compartir datos entre ellos fácilmente.

# connect sparklyr to your spark cluster
spark_version <- sparkR.version()
config <- spark_config()
sc <- spark_connect(master = "yarn", version = spark_version, spark_home = "/opt/spark", method = "synapse", config = config)

Usar sparklyr para leer datos

Una nueva sesión de Spark no contiene datos. El primer paso consiste en cargar los datos en la memoria de la sesión de Spark o indicar a Spark la ubicación de los datos para que pueda acceder a ellos a petición.

# load the sparklyr package
library(sparklyr)

# copy data from R environment to the Spark session's memory
mtcars_tbl <- copy_to(sc, mtcars, "spark_mtcars", overwrite = TRUE)

head(mtcars_tbl)

Con sparklyr, también puede write y read datos de un archivo de Lakehouse mediante la ruta de acceso de ABFS. Para leer y escribir en una instancia de Lakehouse, agréguela primero a la sesión. En el lado izquierdo del cuaderno, seleccione Agregar para agregar una instancia de Lakehouse existente o crear una instancia de Lakehouse.

Para encontrar la ruta de acceso de ABFS, haga clic con el botón derecho en la carpeta Archivos de Lakehouse y seleccione Copiar ruta de acceso de ABFS. Pegue la ruta de acceso para reemplazar abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files en este código:

temp_csv = "abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files/data/mtcars.csv"

# write the table to your lakehouse using the ABFS path
spark_write_csv(mtcars_tbl, temp_csv, header = TRUE, mode = 'overwrite')

# read the data as CSV from lakehouse using the ABFS path
mtcarsDF <- spark_read_csv(sc, temp_csv) 
head(mtcarsDF)

Use sparklyr para manipular datos

sparklyr proporciona varios métodos para procesar datos dentro de Spark mediante:

  • Comandos de dplyr
  • SparkSQL
  • Transformadores de características de Spark

Use dplyr

Puede usar comandos dplyr conocidos para preparar los datos dentro de Spark. Los comandos se ejecutan dentro de Spark, por lo que no hay transferencias de datos innecesarias entre R y Spark.

Haga clic en Manipulación de datos con dplyr para ver documentación adicional sobre el uso de dplyr con Spark.

# count cars by the number of cylinders the engine contains (cyl), order the results descendingly
library(dplyr)

cargroup <- group_by(mtcars_tbl, cyl) %>%
  count() %>%
  arrange(desc(n))

cargroup

sparklyr y dplyr traducen los comandos de R a Spark SQL para nosotros. Para ver la consulta resultante use show_query():

# show the dplyr commands that are to run against the Spark connection
dplyr::show_query(cargroup)

Uso de SQL

También es posible ejecutar consultas SQL directamente en tablas dentro de un clúster de Spark. El spark_connection() objeto implementa una interfaz DBI para Spark, por lo que puede usar dbGetQuery() para ejecutar SQL y devolver el resultado como una trama de datos de R:

library(DBI)
dbGetQuery(sc, "select cyl, count(*) as n from spark_mtcars
GROUP BY cyl
ORDER BY n DESC")

Uso de transformadores de características

Ambos métodos anteriores se basan en instrucciones SQL. Spark proporciona comandos que facilitan la transformación de datos y sin el uso de SQL.

Por ejemplo, el comando ft_binarizer() simplifica la creación de una nueva columna que indica si el valor de otra columna está por encima de un umbral determinado.

Puede encontrar la lista completa de los transformadores de características de Spark disponibles a través de sparklyr en lareferencia -FT.

mtcars_tbl %>% 
  ft_binarizer("mpg", "over_20", threshold = 20) %>% 
  select(mpg, over_20) %>% 
  head(5)

Uso compartido de datos entre sparklyr y SparkR

Al conectar sparklyr al clúster de Spark de Synapse con method = "synapse", puede usar tanto sparklyr como SparkR en la misma sesión y compartir fácilmente datos entre ellos. Puede crear una tabla de Spark en sparklyr y leerla desde SparkR.

# load the sparklyr package
library(sparklyr)

# Create table in `sparklyr`
mtcars_sparklyr <- copy_to(sc, df = mtcars, name = "mtcars_tbl", overwrite = TRUE, repartition = 3L)

# Read table from `SparkR`
mtcars_sparklr <- SparkR::sql("select cyl, count(*) as n
from mtcars_tbl
GROUP BY cyl
ORDER BY n DESC")

head(mtcars_sparklr)

Machine Learning

A continuación, se muestra un ejemplo en el que se usa ml_linear_regression() para ajustar un modelo de regresión lineal. Usamos el conjunto de datos integrado mtcars y vemos si podemos predecir el consumo de combustible de un automóvil (mpg) en función de su peso (wt) y el número de cilindros que contiene el motor (cyl). Se supone en cada caso que la relación entre mpg y cada una de nuestras características es lineal.

Generar conjuntos de datos de prueba y entrenamiento

Use una división, el 70 % para el entrenamiento y el 30 % para probar el modelo. Jugar con esta relación da lugar a diferentes modelos.

# split the dataframe into test and training dataframes

partitions <- mtcars_tbl %>%
  select(mpg, wt, cyl) %>% 
  sdf_random_split(training = 0.7, test = 0.3, seed = 2023)

Entrenamiento del modelo

Entrenar el modelo de regresión logística.

fit <- partitions$training %>%
  ml_linear_regression(mpg ~ .)

fit

Ahora utilice summary() para saber un poco más sobre la calidad de nuestro modelo y la importancia estadística de cada uno de nuestros predictores.

summary(fit)

Uso del modelo

Puede aplicar el modelo en el conjunto de datos de prueba llamando a ml_predict().

pred <- ml_predict(fit, partitions$test)

head(pred)

Para obtener una lista de los modelos de Spark ML disponibles a través de sparklyr, visite Referencia: ML

Desconectar el clúster de Spark

Puede llamar a spark_disconnect() o seleccionar el botón Detener sesión en la parte superior de la cinta de opciones del cuaderno para finalizar la sesión de Spark.

spark_disconnect(sc)

Obtenga más información sobre las funcionalidades de R: