Partager via


Tutoriel : Se connecter à Azure Cosmos DB for NoSQL à l’aide de Spark

S’APPLIQUE À : NoSQL

Dans ce tutoriel, vous utilisez le connecteur Spark Azure Cosmos DB pour lire ou écrire des données à partir d’un compte Azure Cosmos DB for NoSQL. Ce tutoriel utilise Azure Databricks et un notebook Jupyter pour illustrer l’intégration à l’API pour NoSQL à partir de Spark. Ce tutoriel est axé sur Python et Scala, mais vous pouvez utiliser n’importe quel langage ou interface pris en charge par Spark.

Dans ce tutoriel, vous allez apprendre à :

  • Connectez-vous à un compte API pour NoSQL à l’aide de Spark et d’un notebook Jupyter.
  • Créer des ressources de base de données et de conteneur.
  • Ingérez des données dans le conteneur.
  • Interrogez les données du conteneur.
  • Effectuez des opérations courantes sur les éléments du conteneur.

Prérequis

  • Un compte Azure Cosmos DB for NoSQL déjà créé :
  • Un espace de travail Azure Databricks existant.

Se connecter à l’aide de Spark et Jupyter

Utilisez votre espace de travail Azure Databricks existant pour créer un cluster de calcul prêt à utiliser Apache Spark 3.4.x pour vous connecter à votre compte Azure Cosmos DB for NoSQL.

  1. Ouvrez votre espace de travail Azure Databricks.

  2. Dans l’interface de l’espace de travail, créez un cluster. Configurez le cluster avec ces paramètres, au minimum :

    Version Valeur
    Version du runtime 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Utilisez l’interface de l’espace de travail pour rechercher les packages Maven à partir de Maven Central avec l’ID de groupe com.azure.cosmos.spark. Installez sur le cluster le package prévu spécifiquement pour Spark 3.4 dont l’ID d’artefact est précédé du préfixe azure-cosmos-spark_3-4.

  4. Pour finir, créez un notebook.

    Conseil

    Par défaut, le notebook est attaché au cluster récemment créé.

  5. Dans le notebook, définissez les paramètres de configuration OLTP (traitement transactionnel en ligne) pour le point de terminaison du compte NoSQL, le nom de la base de données et le nom du conteneur.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Créer une base de données et un conteneur

Utilisez l’API Catalogue pour gérer les ressources de compte telles que les bases de données et les conteneurs. Vous pouvez ensuite utiliser OLTP pour gérer les données au sein des ressources du conteneur.

  1. Configurez l’API Catalogue de façon à gérer les ressources d’API pour NoSQL en utilisant Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Créez une base de données sous le nom cosmicworks à l’aide de CREATE DATABASE IF NOT EXISTS.

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Créez un conteneur sous le nom products à l’aide de CREATE TABLE IF NOT EXISTS. Veillez à bien définir le chemin de la clé de partition sur /category et à activer le débit de mise à l’échelle automatique avec un débit maximal de 1000 unités de requête (RU) par seconde.

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Créez un autre conteneur sous le nom employees en utilisant une configuration de clé de partition hiérarchique. Utilisez /organization, /department et /team comme jeu de chemins de clé de partition. Suivez cet ordre spécifique. De même, définissez manuellement un débit de 400 RU.

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Exécutez les cellules de notebook pour vérifier que votre base de données et vos conteneurs sont créés dans votre compte API pour NoSQL.

Ingérer des données

Créez un exemple de jeu de données. Utilisez ensuite OLTP pour ingérer ces données dans le conteneur API pour NoSQL.

  1. Créez un exemple de jeu de données.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Utilisez spark.createDataFrame et la configuration OLTP précédemment enregistrée pour ajouter des exemples de données au conteneur cible.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Interroger des données

Chargez les données OLTP dans un DataFrame pour effectuer des requêtes courantes sur les données. Vous pouvez utiliser différentes syntaxes pour filtrer ou interroger les données.

  1. Utilisez spark.read pour charger les données OLTP dans un objet data-frame (trame de données). Utilisez la même configuration que celle que vous avez utilisée précédemment dans ce tutoriel. De même, affectez la valeur true à spark.cosmos.read.inferSchema.enabled pour autoriser le connecteur Spark à déduire le schéma en échantillonnant les éléments existants.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Affichez le schéma des données chargées dans la trame de données à l’aide de printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Affichez les lignes de données où la colonne quantity est inférieure à 20. Utilisez les fonctions where et show pour effectuer cette requête.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Affichez la première ligne de données où la colonne clearance est true. Utilisez la fonction filter pour effectuer cette requête.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Affichez cinq lignes de données sans filtre ni troncation. Utilisez la fonction show pour personnaliser l’apparence et le nombre de lignes affichées.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Interrogez vos données à l’aide de cette chaîne de requête NoSQL brute : SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Effectuer des opérations courantes

Lorsque vous utilisez des données d’API pour NoSQL dans Spark, vous pouvez effectuer des mises à jour partielles ou utiliser des données sous forme de JSON brut.

  1. Pour effectuer une mise à jour partielle d’un élément :

    1. Copiez la variable de configuration existante config et modifiez les propriétés dans la nouvelle copie. Plus précisément, configurez la stratégie d’écriture sur ItemPatch. Activez la prise en charge des opérations en bloc. Définissez les colonnes et les opérations mappées. Enfin, définissez le type d’opération par défaut sur Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Créez des variables pour la clé de partition d’élément et l’identificateur unique que vous envisagez de cibler dans le cadre de cette opération corrective.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Créez un ensemble d’objets de correctif pour spécifier l’élément cible, et spécifiez les champs qui doivent être modifiés.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Créez une trame de données en utilisant l’ensemble d’objets patch (correctif). Utilisez write pour effectuer l’opération patch.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Exécutez une requête pour examiner les résultats de l’opération corrective. L’élément doit maintenant être nommé Yamba New Surfboard sans aucune autre modification.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Pour utiliser des données JSON brutes :

    1. Copiez la variable de configuration existante config et modifiez les propriétés dans la nouvelle copie. Plus précisément, remplacez le conteneur cible par employees. Configurez ensuite la colonne/le champ contacts pour utiliser les données JSON brutes.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Créez un ensemble d’employés à ingérer dans le conteneur.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Créez un DataFrame et utilisez write pour ingérer les données des employés.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Affichez les données de la trame de données à l’aide de show. Notez que la colonne contacts produit du JSON brut dans la sortie.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Étape suivante