Copiar dados do Azure Cosmos DB para um pool de SQL dedicado usando o Apache Spark

O Link do Azure Synapse para Azure Cosmos DB permite que os usuários executem análises quase em tempo real nos dados operacionais no Azure Cosmos DB. No entanto, há ocasiões em que alguns dados precisam ser agregados e enriquecidos para atender aos usuários do data warehouse. A coleta e a exportação de dados do Link do Azure Synapse pode ser feita com apenas algumas células em um notebook.

Pré-requisitos

Etapas

Neste tutorial, você se conectará ao repositório analítico para que não haja impacto sobre o repositório transacional (ele não consumirá nenhuma Unidade de Solicitação). Percorreremos as seguintes etapas:

  1. Leia o contêiner HTAP do Azure Cosmos DB em um dataframe do Spark
  2. Agregar os resultados em um novo dataframe
  3. Ingerir os dados em um pool de SQL dedicado

Spark para SQL, Etapa 1

Dados

Nesse exemplo, usamos um contêiner HTAP chamado RetailSales. Ele faz parte de um serviço vinculado chamado ConnectedData e tem o seguinte esquema:

  • _rid: string (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: string (nullable = true)
  • advertising: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: string (nullable = true)

Agregaremos as vendas (quantity, revenue – preço x quantidade) por productCode e weekStarting para fins de relatório. Por fim, exportaremos esses dados para uma tabela do pool de SQL dedicado chamada dbo.productsales.

Configurar um notebook do Spark

Crie um notebook do Spark com o Scala tendo o Spark (Scala) como a linguagem principal. Usaremos a configuração padrão do notebook para a sessão.

Ler os dados no Spark

Leia o contêiner HTAP do Azure Cosmos DB com Spark em um dataframe na primeira célula.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Agregar os resultados em um novo dataframe

Na segunda célula, executaremos a transformação e as agregações necessárias para o novo dataframe antes de carregá-lo em um banco de dados do pool de SQL dedicado.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Carregar os resultados em um pool de SQL dedicado

Na terceira célula, carregaremos os dados em um pool de SQL dedicado. Ele criará automaticamente uma tabela externa temporária, uma fonte de dados externa e um formato de arquivo externo que será excluído quando o trabalho for concluído.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Consultar os resultados com o SQL

Consulte o resultado usando uma consulta SQL simples, como o seguinte script SQL:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

Sua consulta apresentará os seguintes resultados em um modo de gráfico: Etapas 2 do Spark para SQL

Próximas etapas