Copia de datos de Azure Cosmos DB en un grupo de SQL dedicado mediante Apache Spark

Azure Synapse Link para Azure Cosmos DB permite a los usuarios ejecutar análisis casi en tiempo real con datos operativos en Azure Cosmos DB. Sin embargo, hay ocasiones en que es necesario agregar y enriquecer algunos datos para servir a los usuarios del almacenamiento de datos. Tanto el mantenimiento como la exportación de datos de Azure Synapse Link se pueden realizar con unas pocas celdas de un cuaderno.

Requisitos previos

Pasos

En este tutorial, se conectará al almacén analítico para que no haya ningún impacto en el almacén de transacciones (no consumirá unidades de solicitud). Siga estos pasos:

  1. Leer el contenedor HTAP de Cosmos DB en un dataframe de Spark.
  2. Agregar los resultados a un dataframe nuevo.
  3. Ingesta de los datos en el grupo de SQL dedicado

Spark to SQL Steps 1

Datos

En ese ejemplo, se usa un contenedor HTAP denominado RetailSales. Forma parte de un servicio vinculado denominado ConnectedData y tiene el siguiente esquema:

  • _rid: string (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: string (nullable = true)
  • advertising: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: string (nullable = true)

Agregaremos las ventas (quantity, revenue [precio x cantidad] por productCode y weekStarting para la elaboración de informes. Por último, se exportarán los datos en una tabla del grupo de SQL dedicado denominada dbo.productsales.

Configuración de un cuaderno de Spark

Cree un cuaderno de Spark con Scala and Spark (Scala) como lenguaje principal. Se usa la configuración predeterminada del cuaderno para la sesión.

Lectura de datos en Spark

Lea el contenedor HTAP de Cosmos DB con Spark en un dataframe de la primera celda.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Agregación de los resultados a un dataframe nuevo

En la segunda celda, se ejecutan la transformación y los agregados necesarios para el nuevo dataframe antes de cargarlo en una base de datos del grupo de SQL dedicado.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Carga de los resultados en un grupo de SQL dedicado

En la tercera celda, se cargan los datos en un grupo de SQL dedicado. Se creará automáticamente una tabla externa temporal, un origen de datos externo y un formato de archivo externo que se eliminarán una vez que se haya completado el trabajo.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Consulta de los resultados con SQL

El resultado se puede mediante una sencilla consulta SQL, como el siguiente script de SQL:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

La consulta presentará los resultados siguientes en modo de gráfico: Spark to SQL Steps 2

Pasos siguientes