Copiar dados do Azure Cosmos DB para um pool SQL dedicado usando o Apache Spark
O Azure Synapse Link for Azure Cosmos DB permite que os usuários executem análises quase em tempo real sobre dados operacionais no Azure Cosmos DB. No entanto, há momentos em que alguns dados precisam ser agregados e enriquecidos para atender aos usuários do data warehouse. A curadoria e a exportação de dados do Azure Synapse Link podem ser feitas com apenas algumas células em um bloco de anotações.
- Provisionar um espaço de trabalho Synapse com:
- Provisionar uma conta do Azure Cosmos DB com um contêiner HTAP com dados
- Conectar o contêiner HTAP do Azure Cosmos DB ao espaço de trabalho
- Tenha a configuração certa para importar dados do Spark para um pool SQL dedicado
Neste tutorial, você se conectará ao repositório analítico para que não haja impacto no repositório transacional (ele não consumirá nenhuma unidade de solicitação). Passaremos pelas seguintes etapas:
- Leia o contêiner HTAP do Azure Cosmos DB em um dataframe do Spark
- Agregar os resultados em um novo dataframe
- Ingerir os dados em um pool SQL dedicado
Nesse exemplo, usamos um contêiner HTAP chamado RetailSales. Ele faz parte de um serviço vinculado chamado ConnectedData e tem o seguinte esquema:
- _rid: string (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: string (nullable = true)
- quantidade: longo (anulável = verdadeiro)
- preço: longo (anulável = verdadeiro)
- id: string (nullable = true)
- publicidade: longo (anulável = verdadeiro)
- storeId: long (anulável = true)
- semanaInício: longo (anulável = verdadeiro)
- _etag: string (nullable = true)
Vamos agregar as vendas (quantidade, receita (preço x quantidade) por productCode e weekStarting para fins de relatório. Por fim, exportaremos esses dados para uma tabela de pool SQL dedicada chamada dbo.productsales
.
Crie um bloco de anotações do Spark com o Scala como Spark (Scala) como idioma principal. Usamos a configuração padrão do bloco de anotações para a sessão.
Leia o contêiner HTAP do Azure Cosmos DB com o Spark em um quadro de dados na primeira célula.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
Na segunda célula, executamos a transformação e as agregações necessárias para o novo dataframe antes de carregá-lo em um banco de dados de pool SQL dedicado.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
Na terceira célula, carregamos os dados em um pool SQL dedicado. Ele criará automaticamente uma tabela externa temporária, fonte de dados externa e formato de arquivo externo que serão excluídos assim que o trabalho for concluído.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
Você pode consultar o resultado usando uma consulta SQL simples, como o seguinte script SQL:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
Sua consulta apresentará os seguintes resultados em um modo de gráfico: