Partage via


Interagir avec Azure Cosmos DB à l’aide d’Apache Spark 2 dans Azure Synapse Link

Notes

Pour Azure Synapse Link pour Azure Cosmos DB avec Spark 3, reportez-vous à cet article Azure Synapse Link pour Azure Cosmos DB sur Spark 3

Dans cet article, vous allez découvrir comment interagir avec Azure Cosmos DB à l’aide de Synapse Apache Spark 2. Avec sa prise en charge complète de Scala, Python, SparkSQL et C#, Synapse Apache Spark est au cœur des scénarios d'analyse, d'Engineering données, de science des données et d'exploration des données d'Azure Synapse Link pour Azure Cosmos DB.

Les fonctionnalités suivantes sont prises en charge lors de l'interaction avec Azure Cosmos DB :

  • Synapse Apache Spark vous permet d'analyser les données des conteneurs Azure Cosmos DB activés avec Azure Synapse Link en quasi-temps réel sans affecter les performances de vos charges de travail transactionnelles. Les deux options suivantes sont disponibles pour interroger le magasin analytique Azure Cosmos DB à partir de Spark :
    • Charger dans un DataFrame Spark
    • Créer une table Spark
  • Synapse Apache Spark vous permet également d'ingérer des données dans Azure Cosmos DB. Il convient de noter que les données sont toujours ingérées dans les conteneurs Azure Cosmos DB par le biais du magasin transactionnel. Lorsque Synapse Link est activé, les nouvelles insertions, mises à jour et suppressions sont automatiquement synchronisées avec le magasin analytique.
  • Synapse Apache Spark prend également en charge le flux structuré Spark avec Azure Cosmos DB comme source et comme récepteur.

Les sections suivantes présentent la syntaxe des fonctionnalités ci-dessus. Vous pouvez également consulter le module d’apprentissage sur la façon d’interroger Azure Cosmos DB avec Apache Spark pour Azure Synapse Analytics. Les mouvements au sein de l'espace de travail Azure Synapse Analytics sont conçus afin de fournir une expérience prête à l'emploi facile à prendre en main. Les mouvements sont visibles lorsque vous cliquez avec le bouton droit sur un conteneur Azure Cosmos DB dans l'onglet Données de l'espace de travail Synapse. Avec les mouvements, vous pouvez rapidement générer du code et l’adapter à vos besoins. Les mouvements sont également bien adaptés pour découvrir des données en un seul clic.

Important

Vous devez tenir compte de certaines contraintes dans le schéma analytique qui peuvent entraîner un comportement inattendu dans les opérations de chargement de données. Par exemple, seules les 1000 premières propriétés du schéma transactionnel sont disponibles dans le schéma analytique, les propriétés avec espaces ne sont pas disponibles, etc. Si vous rencontrez des résultats inattendus, vérifiez les contraintes du schéma du magasin analytique pour plus d’informations.

Interroger le magasin analytique Azure Cosmos DB

Avant d'examiner les deux options disponibles pour interroger le magasin analytique Azure Cosmos DB, à savoir le chargement dans un DataFrame Spark et la création d'une table Spark, il convient d'explorer les différences d'expérience afin de pouvoir choisir l'option adéquate.

La différence d'expérience réside dans le fait de savoir si les modifications de données sous-jacentes du conteneur Azure Cosmos DB doivent être automatiquement reflétées dans l'analyse effectuée dans Spark. Lorsqu'un DataFrame Spark est enregistré ou qu'une table Spark est créée dans le magasin analytique d'un conteneur, les métadonnées entourant l'instantané actuel des données dans le magasin analytique sont extraites vers Spark pour une transmission efficace de l'analyse ultérieure. Il convient de noter que dans la mesure où Spark suit une stratégie d'évaluation différée, si aucune action n'est appelée sur le DataFrame Spark ou si aucune requête SparkSQL n'est exécutée sur la table Spark, les données réelles ne sont pas extraites du magasin analytique du conteneur sous-jacent.

Dans le cas du chargement dans un DataFrame Spark, les métadonnées extraites sont mises en cache pendant toute la durée de vie de la session Spark, et les actions ultérieures appelées sur le DataFrame sont évaluées par rapport à l'instantané du magasin analytique au moment de la création du DataFrame.

En revanche, dans le cas de la création d'une table Spark, les métadonnées de l'état du magasin analytique ne sont pas mises en cache dans Spark et sont rechargées chaque fois qu'une requête SparkSQL est exécutée sur la table Spark.

Par conséquent, le choix entre le chargement dans un DataFrame Spark et la création d'une table Spark dépend du type d'évaluation que vous souhaitez effectuer pour votre analyse Spark : par rapport à un instantané fixe du magasin analytique ou par rapport au dernier instantané du magasin analytique.

Si vos requêtes analytiques ont des filtres fréquemment utilisés, vous avez la possibilité d’opérer un partitionnement sur la base de ces champs pour améliorer les performances des requêtes. Vous pouvez exécuter régulièrement un travail de partitionnement à partir d’un notebook Spark Azure Synapse pour déclencher le partitionnement sur le magasin analytique. Ce magasin partitionné pointe vers le compte de stockage principal ADLS Gen2 qui est lié à votre espace de travail Azure Synapse. Pour plus d’informations, consultez les articles Présentation du partitionnement personnalisé et Comment configurer un partitionnement personnalisé.

Notes

Pour interroger les comptes Azure Cosmos DB for MongoDB, découvrez-en plus sur la représentation du schéma de fidélité optimale dans le magasin analytique et sur les noms de propriété étendue à utiliser.

Notes

Notez que toutes les commandes options ci-dessous respectent la casse. Par exemple, vous devez utiliser Gateway tandis que gateway retourne une erreur.

Charger dans un DataFrame Spark

Dans cet exemple, vous allez créer un DataFrame Spark qui pointe vers le magasin analytique Azure Cosmos DB. Vous pourrez ensuite effectuer une analyse supplémentaire en appelant des actions Spark sur le DataFrame. Cette opération n’a aucun impact sur le magasin transactionnel.

La syntaxe Python est la suivante :

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

La syntaxe Scala équivalente est la suivante :

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Créer une table Spark

Dans cet exemple, vous allez créer une table Spark qui pointe vers le magasin analytique Azure Cosmos DB. Vous pourrez ensuite effectuer une analyse supplémentaire en appelant des requêtes SparkSQL sur la table. Cette opération n'a aucun impact sur le magasin transactionnel et n'entraîne aucun déplacement de données. Si vous décidez de supprimer cette table Spark, le conteneur Azure Cosmos DB sous-jacent et le magasin analytique correspondant ne seront pas affectés.

Ce scénario permet de réutiliser des tables Spark à l'aide d'outils tiers et de fournir une accessibilité aux données sous-jacentes au moment de l'exécution.

La syntaxe permettant de créer une table Spark est la suivante :

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Notes

Pour les scénarios dans lesquels le schéma du conteneur Azure Cosmos DB sous-jacent change au fil du temps, et si vous souhaitez que le schéma mis à jour se reflète automatiquement dans les requêtes adressées à la table Spark, vous pouvez définir l’option spark.cosmos.autoSchemaMerge sur true dans les options de la table Spark.

Écrire un DataFrame Spark dans un conteneur Azure Cosmos DB

Dans cet exemple, vous allez écrire un DataFrame Spark dans un conteneur Azure Cosmos DB. Cette opération aura un impact sur les performances des charges de travail transactionnelles et consommera une partie des unités de requête approvisionnées sur le conteneur Azure Cosmos DB ou la base de données partagée.

La syntaxe Python est la suivante :

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

La syntaxe Scala équivalente est la suivante :

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

Charger la diffusion en continu de tramedonnées à partir d’un conteneur

Dans ce mouvement, vous utiliserez la fonctionnalité de diffusion en continu Spark pour charger des données à partir d’un conteneur dans un tramedonnées. Les données seront stockées dans le compte du lac de données principal (et le système de fichiers) que vous avez connecté à l’espace de travail.

Notes

Si vous souhaitez référencer des bibliothèques externes dans Synapse Apache Spark, cliquez ici pour en savoir plus. Par exemple, si vous souhaitez ingérer un DataFrame Spark dans un conteneur d’Azure Cosmos DB for MongoDB, vous pouvez utiliser le connecteur Mongo DB pour Spark.

Charger un DataFrame de diffusion en continu à partir d'un conteneur Azure Cosmos DB

Dans cet exemple, vous allez utiliser la fonctionnalité de flux structuré de Spark pour charger les données d'un conteneur Azure Cosmos DB dans un DataFrame de diffusion en continu Spark à l'aide de la fonctionnalité de flux de modification d'Azure Cosmos DB. Les données de point de contrôle utilisées par Spark seront stockées sur le compte (et dans le système de fichiers) du lac de données principal que vous avez connecté à l'espace de travail.

Si le dossier /localReadCheckpointFolder n'existe pas (dans l'exemple ci-dessous), il est automatiquement créé. Cette opération aura un impact sur les performances des charges de travail transactionnelles et consommera une partie des unités de requête approvisionnées sur le conteneur Azure Cosmos DB ou la base de données partagée.

La syntaxe Python est la suivante :

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

La syntaxe Scala équivalente est la suivante :

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

Écrire un DataFrame de diffusion en continu à partir d'un conteneur Azure Cosmos DB

Dans cet exemple, vous allez écrire un DataFrame de diffusion en continu dans un conteneur Azure Cosmos DB. Cette opération aura un impact sur les performances des charges de travail transactionnelles et consommera une partie des unités de requête approvisionnées sur le conteneur Azure Cosmos DB ou la base de données partagée. Si le dossier /localWriteCheckpointFolder n'existe pas (dans l'exemple ci-dessous), il est automatiquement créé.

La syntaxe Python est la suivante :

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

def writeBatchToCosmos(batchDF, batchId):
  batchDF.persist()
  print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()
  print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.unpersist()

streamQuery = dfStream\
        .writeStream\
        .foreachBatch(writeBatchToCosmos) \
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .start()

streamQuery.awaitTermination()

La syntaxe Scala équivalente est la suivante :

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.persist()
              batchDF.write.format("cosmos.oltp").
                option("spark.synapse.linkedService", "<enter linked service name>").
                option("spark.cosmos.container", "<enter container name>"). 
                option("spark.cosmos.write.upsertEnabled", "true").
                mode(SaveMode.Overwrite).
                save()
              println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
              batchDF.unpersist()
              ()
            }.        
            option("checkpointLocation", "/localWriteCheckpointFolder").
            start()

query.awaitTermination()

Étapes suivantes