Partager via


Copier des données depuis Azure Cosmos DB vers un pool SQL dédié avec Apache Spark

Azure Synapse Link pour Azure Cosmos DB permet aux utilisateurs d’exécuter une analytique en quasi-temps réel sur les données opérationnelles dans Azure Cosmos DB. Toutefois, il peut arriver que certaines données doivent être agrégées et enrichies pour traiter les utilisateurs de l’entrepôt de données. L’organisation et l’exportation de données Azure Synapse Link peuvent s’effectuer en utilisant seulement quelques cellules d’un notebook.

Prérequis

Étapes

Dans ce tutoriel, vous allez vous connecter au magasin analytique pour qu’il n’y ait aucun impact sur le magasin transactionnel (aucune unité de requête ne sera consommée). Nous allons suivre les étapes suivantes :

  1. Lire le conteneur HTAP Azure Cosmos DB dans un dataframe Spark
  2. Agréger les résultats dans un nouveau dataframe
  3. Ingérer les données dans un pool SQL dédié

Étapes Spark vers SQL 1

Données

Dans cet exemple, nous utilisons un conteneur HTAP appelé RetailSales. Il fait partie d’un service lié appelé ConnectedData et présente le schéma suivant :

  • _rid: string (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: string (nullable = true)
  • advertising: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: string (nullable = true)

Nous allons regrouper les ventes (quantity, revenue (prix x quantité) par productCode et weekStarting à des fins de création de rapports. Nous terminerons par l’exportation de ces données dans une table de pools SQL dédiés appelée dbo.productsales.

Configurer un notebook Spark

Créez un notebook Spark avec Scala, Spark (Scala) étant le langage principal. Nous utilisons le paramètre par défaut du notebook pour la session.

Lire les données dans Spark

Lisez le conteneur HTAP Azure Cosmos DB avec Spark dans un dataframe au niveau de la première cellule.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Agréger les résultats dans un nouveau dataframe

Dans la deuxième cellule, nous exécutons la transformation et les agrégats nécessaires pour le nouveau dataframe avant de le charger dans une base de données de pools SQL dédiés.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Charger les résultats dans un pool SQL dédié

Dans la troisième cellule, nous chargeons les données dans un pool SQL dédié. Cette opération crée automatiquement une table externe temporaire, une source de données externe et un format de fichier externe qui seront supprimés une fois le travail terminé.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Interroger les résultats avec SQL

Vous pouvez interroger le résultat au moyen d’une requête SQL simple, telle que le script SQL suivant :

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

Votre requête doit présenter les résultats suivants en mode graphique : Étapes Spark vers SQL 2

Étapes suivantes