Partilhar via


Copiar dados do Azure Cosmos DB para um pool SQL dedicado usando o Apache Spark

O Azure Synapse Link for Azure Cosmos DB permite que os usuários executem análises quase em tempo real sobre dados operacionais no Azure Cosmos DB. No entanto, há momentos em que alguns dados precisam ser agregados e enriquecidos para atender aos usuários do data warehouse. A curadoria e a exportação de dados do Azure Synapse Link podem ser feitas com apenas algumas células de código num notebook.

Pré-requisitos

Passos

Neste tutorial, você se conectará ao repositório analítico para que não haja impacto no repositório transacional (ele não consumirá nenhuma unidade de solicitação). Passaremos pelas seguintes etapas:

  1. Leia o contêiner HTAP do Azure Cosmos DB em um dataframe do Spark
  2. Agregar os resultados em um novo dataframe
  3. Ingerir os dados em um pool SQL dedicado

Spark para SQL Passos 1

Dados

Nesse exemplo, usamos um contêiner HTAP chamado RetailSales. Ele faz parte de um serviço vinculado chamado ConnectedData e tem o seguinte esquema:

  • _rid: string (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nulo = verdadeiro)
  • productCode: string (nullable = true)
  • quantidade: longo (nulo = verdadeiro)
  • preço: longo (anulável = verdadeiro)
  • id: string (nullable = true)
  • publicidade: longo (passível de anulação = verdadeiro)
  • storeId: long (com possibilidade de ser nulo = true)
  • semanaInício: longo (anulável = verdadeiro)
  • _etag: cadeia de caracteres (passível de nulidade = true)

Vamos agregar as vendas (quantidade, receita (preço x quantidade) por códigoDoProduto e inícioDaSemana para fins de relatório. Por fim, exportaremos esses dados para uma tabela de pool SQL dedicada chamada dbo.productsales.

Configurar um Bloco de Anotações do Spark

Crie um bloco de notas do Spark utilizando Scala como linguagem principal. Usamos a configuração padrão do bloco de anotações para a sessão.

Leia os dados no Spark

Leia o contentor HTAP do Azure Cosmos DB para um dataframe usando o Spark na célula inicial.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Agregar os resultados em um novo dataframe

Na segunda célula, executamos a transformação e as agregações necessárias para o novo dataframe antes de carregá-lo em um banco de dados de pool SQL dedicado.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Carregue os resultados em um pool SQL dedicado

Na terceira célula, carregamos os dados em um pool SQL dedicado. Ele criará automaticamente uma tabela externa temporária, fonte de dados externa e formato de arquivo externo que serão excluídos assim que o trabalho for concluído.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Consultar os resultados com SQL

Você pode consultar o resultado usando uma consulta SQL simples, como o seguinte script SQL:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

Sua consulta apresentará os seguintes resultados no modo de gráfico: Passos do Spark para SQL 2

Próximos passos