Tutoriel : Se connecter à Azure Cosmos DB for NoSQL à l’aide de Spark

S’APPLIQUE À : NoSQL

Dans ce tutoriel, vous utilisez le connecteur Spark Azure Cosmos DB pour lire ou écrire des données à partir d’un compte Azure Cosmos DB for NoSQL. Ce tutoriel utilise Azure Databricks et un notebook Jupyter pour illustrer l’intégration à l’API pour NoSQL à partir de Spark. Ce tutoriel est axé sur Python et Scala, mais vous pouvez utiliser n’importe quel langage ou interface pris en charge par Spark.

Dans ce tutoriel, vous allez apprendre à :

  • Se connecter à un compte API pour NoSQL à l’aide de Spark et d’un notebook Jupyter
  • Créer des ressources de base de données et de conteneur
  • Ingérer des données dans le conteneur
  • Interroger des données dans le conteneur
  • Effectuer des opérations courantes sur les éléments du conteneur

Prérequis

  • Un compte Azure Cosmos DB for NoSQL déjà créé :
  • Un espace de travail Azure Databricks existant.

Se connecter à l’aide de Spark et Jupyter

Utilisez votre espace de travail Azure Databricks existant pour créer un cluster de calcul prêt à utiliser Apache Spark 3.4.x pour vous connecter à votre compte Azure Cosmos DB for NoSQL.

  1. Ouvrez votre espace de travail Azure Databricks.

  2. Dans l’interface de l’espace de travail, créez un cluster. Configurez le cluster avec ces paramètres, au minimum :

    Valeur
    Version du runtime 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Utilisez l’interface de l’espace de travail pour rechercher les packages Maven à partir de Maven Central avec l’ID de groupecom.azure.cosmos.spark. Installez le package propre à Spark 3.4 avec un ID d’artefact précédé de azure-cosmos-spark_3-4 dans le cluster.

  4. Pour finir, créez un notebook.

    Conseil

    Par défaut, le notebook est attaché au cluster récemment créé.

  5. Dans le notebook, définissez les paramètres de configuration OLTP pour le point de terminaison du compte NoSQL, le nom de la base de données et le nom du conteneur.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Créer une base de données et un conteneur

Utilisez l’API Catalogue pour gérer les ressources de compte telles que les bases de données et les conteneurs. Vous pouvez ensuite utiliser OLTP pour gérer les données au sein des ressources de conteneur.

  1. Configurez l’API Catalogue afin de gérer les ressources d’API pour NoSQL à l’aide de Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Créez une base de données sous le nom cosmicworks à l’aide de CREATE DATABASE IF NOT EXISTS.

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Créez un conteneur nommé products à l’aide de CREATE TABLE IF NOT EXISTS. Veillez à bien définir le chemin de la clé de partition sur /category et à activer le débit de mise à l’échelle automatique avec un débit maximal de 1000 unités de requête par seconde (RU/s).

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Créez un autre conteneur nommé employees à l’aide d’une configuration de clé de partition hiérarchique avec /organization, /department et /team comme jeu de chemins de clé de partition dans cet ordre spécifique. Définissez aussi manuellement le débit sur 400 RU/s.

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Exécutez les cellules de notebook afin de vérifier que votre base de données et vos conteneurs sont créés dans votre compte API pour NoSQL.

Ingérer des données

Créez un exemple de jeu de données, puis utilisez OLTP pour ingérer ces données dans le conteneur API pour NoSQL.

  1. Créez un exemple de jeu de données.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Utilisez spark.createDataFrame et la configuration OLTP précédemment enregistrée pour ajouter des exemples de données au conteneur cible.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Interroger des données

Chargez les données OLTP dans un DataFrame pour effectuer des requêtes courantes sur les données. Vous pouvez utiliser différents filtres de syntaxes ou données de requête.

  1. Utilisez spark.read pour charger les données OLTP dans un objet DataFrame. Utilisez la même configuration que celle utilisée précédemment dans ce tutoriel. En outre, affectez la valeur true à spark.cosmos.read.inferSchema.enabled pour autoriser le connecteur Spark à déduire le schéma en échantillonnant les éléments existants.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Affichez le schéma des données chargées dans le DataFrame à l’aide de printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Affichez les lignes de données où la colonne quantity est inférieure à 20. Utilisez les fonctions where et show pour effectuer cette requête.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Affichez la première ligne de données où la colonne clearance est true. Utilisez la fonction filter pour effectuer cette requête.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Affichez cinq lignes de données sans filtre ni troncation. Utilisez la fonction show pour personnaliser l’apparence et le nombre de lignes affichées.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Interrogez vos données à l’aide de cette chaîne de requête NoSQL brute : SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Effectuer des opérations courantes

Lorsque vous utilisez des données d’API pour NoSQL dans Spark, vous pouvez effectuer des mises à jour partielles ou utiliser des données sous forme de JSON brut.

  1. Pour effectuer une mise à jour partielle d’un élément, effectuez ces étapes :

    1. Copiez la variable de configuration existante config et modifiez les propriétés dans la nouvelle copie. Plus spécifiquement, configurez la stratégie d’écriture sur ItemPatch, désactivez la prise en charge en bloc, définissez les colonnes et les opérations mappées, puis définissez le type d’opération par défaut sur Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Créez des variables pour la clé de partition d’élément et l’identificateur unique que vous envisagez de cibler dans le cadre de cette opération corrective.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Créez un ensemble d’objets de correctif pour spécifier l’élément cible, et spécifiez les champs qui doivent être modifiés.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Créez un DataFrame à l’aide de l’ensemble d’objets de correctif, et utilisez write pour effectuer l’opération corrective.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Exécutez une requête pour examiner les résultats de l’opération corrective. L’élément doit maintenant être nommé Yamba New Surfboard sans aucune autre modification.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Pour utiliser des données JSON brutes, effectuez ces étapes :

    1. Copiez la variable de configuration existante config et modifiez les propriétés dans la nouvelle copie. Plus spécifiquement, remplacez le conteneur cible par employees et configurez la colonne/le champ contacts de façon à utiliser des données JSON brutes.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Créez un ensemble d’employés à ingérer dans le conteneur.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Créez un DataFrame et utilisez write pour ingérer les données des employés.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Affichez les données du DataFrame à l’aide de show. Notez que la colonne contacts produit du JSON brut dans la sortie.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Étape suivante