Copie os dados da Azure Cosmos DB numa piscina de SQL dedicada usando o Apache Spark

Azure Synapse Link for Azure Cosmos DB permite que os utilizadores corram perto de análises em tempo real sobre dados operacionais em Azure Cosmos DB. No entanto, há momentos em que alguns dados precisam de ser agregados e enriquecidos para servir os utilizadores de armazéns de dados. A curadoria e exportação Azure Synapse dados do Link podem ser feitos com apenas algumas células num caderno.

Pré-requisitos

Passos

Neste tutorial, irá ligar-se à loja analítica para que não haja impacto na loja transacional (não consumirá nenhuma Unidade de Pedido). Vamos percorrer os seguintes passos:

  1. Leia o recipiente Cosmos DB HTAP num dataframe spark
  2. Agregar os resultados num novo dataframe
  3. Ingerir os dados numa piscina de SQL dedicada

Spark to SQL Steps 1

Dados

Neste exemplo, utilizamos um contentor HTAP chamado RetailSales. Faz parte de um serviço ligado chamado ConnectedData, e tem o seguinte esquema:

  • _rid: corda (anulado = verdadeiro)
  • _ts: longo (anulado = verdadeiro)
  • logQuantity: duplo (anulado = verdadeiro)
  • produto Código: cadeia (anulado = verdadeiro)
  • quantidade: longa (anulada = verdadeira)
  • preço: longo (anulado = verdadeiro)
  • id: corda (anulado = verdadeiro)
  • publicidade: longo (anulado = verdadeiro)
  • storeId: longo (anulado = verdadeiro)
  • weekStarting: longo (anulado = verdadeiro)
  • _etag: corda (anulado = verdadeiro)

Agregaremos as vendas (quantidade, receita (preço x quantidade) por produtoCode e weekStarting para efeitos de reporte. Finalmente, exportaremos esses dados para uma mesa de SQL dedicada chamada dbo.productsales.

Configure um caderno de faíscas

Crie um caderno Spark com Scala como Spark (Scala) como a língua principal. Utilizamos a definição padrão do caderno para a sessão.

Leia os dados em Spark

Leia o recipiente Cosmos DB HTAP com a Spark num dataframe na primeira célula.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Agregar os resultados num novo dataframe

Na segunda célula, fazemos a transformação e os agregados necessários para o novo dataframe antes de o carregar numa base de dados dedicada SQL piscina.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Carregue os resultados numa piscina de SQL dedicada

Na terceira célula, colocamos os dados numa piscina dedicada SQL. Criará automaticamente uma tabela externa temporária, fonte de dados externa e formato de ficheiro externo que será eliminado assim que o trabalho for feito.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Consultar os resultados com SQL

Pode consultar o resultado usando uma consulta de SQL simples, como o seguinte script SQL:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

A sua consulta apresentará os seguintes resultados num modo gráfico: Spark to SQL Steps 2

Passos seguintes