Tutoriel : Se connecter à Azure Cosmos DB for NoSQL à l’aide de Spark
S’APPLIQUE À : NoSQL
Dans ce tutoriel, vous utilisez le connecteur Spark Azure Cosmos DB pour lire ou écrire des données à partir d’un compte Azure Cosmos DB for NoSQL. Ce tutoriel utilise Azure Databricks et un notebook Jupyter pour illustrer l’intégration à l’API pour NoSQL à partir de Spark. Ce tutoriel est axé sur Python et Scala, mais vous pouvez utiliser n’importe quel langage ou interface pris en charge par Spark.
Dans ce tutoriel, vous allez apprendre à :
- Connectez-vous à un compte API pour NoSQL à l’aide de Spark et d’un notebook Jupyter.
- Créer des ressources de base de données et de conteneur.
- Ingérez des données dans le conteneur.
- Interrogez les données du conteneur.
- Effectuez des opérations courantes sur les éléments du conteneur.
Prérequis
- Un compte Azure Cosmos DB for NoSQL déjà créé :
- Si vous disposez d’un abonnement Azure, créez un compte.
- Vous ne possédez pas d’abonnement Azure ? Vous pouvez essayer Azure Cosmos DB gratuitement sans carte de crédit.
- Un espace de travail Azure Databricks existant.
Se connecter à l’aide de Spark et Jupyter
Utilisez votre espace de travail Azure Databricks existant pour créer un cluster de calcul prêt à utiliser Apache Spark 3.4.x pour vous connecter à votre compte Azure Cosmos DB for NoSQL.
Ouvrez votre espace de travail Azure Databricks.
Dans l’interface de l’espace de travail, créez un cluster. Configurez le cluster avec ces paramètres, au minimum :
Version Valeur Version du runtime 13.3 LTS (Scala 2.12, Spark 3.4.1) Utilisez l’interface de l’espace de travail pour rechercher les packages Maven à partir de Maven Central avec l’ID de groupe
com.azure.cosmos.spark
. Installez sur le cluster le package prévu spécifiquement pour Spark 3.4 dont l’ID d’artefact est précédé du préfixeazure-cosmos-spark_3-4
.Pour finir, créez un notebook.
Conseil
Par défaut, le notebook est attaché au cluster récemment créé.
Dans le notebook, définissez les paramètres de configuration OLTP (traitement transactionnel en ligne) pour le point de terminaison du compte NoSQL, le nom de la base de données et le nom du conteneur.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Créer une base de données et un conteneur
Utilisez l’API Catalogue pour gérer les ressources de compte telles que les bases de données et les conteneurs. Vous pouvez ensuite utiliser OLTP pour gérer les données au sein des ressources du conteneur.
Configurez l’API Catalogue de façon à gérer les ressources d’API pour NoSQL en utilisant Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Créez une base de données sous le nom
cosmicworks
à l’aide deCREATE DATABASE IF NOT EXISTS
.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Créez un conteneur sous le nom
products
à l’aide deCREATE TABLE IF NOT EXISTS
. Veillez à bien définir le chemin de la clé de partition sur/category
et à activer le débit de mise à l’échelle automatique avec un débit maximal de1000
unités de requête (RU) par seconde.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Créez un autre conteneur sous le nom
employees
en utilisant une configuration de clé de partition hiérarchique. Utilisez/organization
,/department
et/team
comme jeu de chemins de clé de partition. Suivez cet ordre spécifique. De même, définissez manuellement un débit de400
RU.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Exécutez les cellules de notebook pour vérifier que votre base de données et vos conteneurs sont créés dans votre compte API pour NoSQL.
Ingérer des données
Créez un exemple de jeu de données. Utilisez ensuite OLTP pour ingérer ces données dans le conteneur API pour NoSQL.
Créez un exemple de jeu de données.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Utilisez
spark.createDataFrame
et la configuration OLTP précédemment enregistrée pour ajouter des exemples de données au conteneur cible.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Interroger des données
Chargez les données OLTP dans un DataFrame pour effectuer des requêtes courantes sur les données. Vous pouvez utiliser différentes syntaxes pour filtrer ou interroger les données.
Utilisez
spark.read
pour charger les données OLTP dans un objet data-frame (trame de données). Utilisez la même configuration que celle que vous avez utilisée précédemment dans ce tutoriel. De même, affectez la valeurtrue
àspark.cosmos.read.inferSchema.enabled
pour autoriser le connecteur Spark à déduire le schéma en échantillonnant les éléments existants.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Affichez le schéma des données chargées dans la trame de données à l’aide de
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Affichez les lignes de données où la colonne
quantity
est inférieure à20
. Utilisez les fonctionswhere
etshow
pour effectuer cette requête.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Affichez la première ligne de données où la colonne
clearance
esttrue
. Utilisez la fonctionfilter
pour effectuer cette requête.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Affichez cinq lignes de données sans filtre ni troncation. Utilisez la fonction
show
pour personnaliser l’apparence et le nombre de lignes affichées.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Interrogez vos données à l’aide de cette chaîne de requête NoSQL brute :
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Effectuer des opérations courantes
Lorsque vous utilisez des données d’API pour NoSQL dans Spark, vous pouvez effectuer des mises à jour partielles ou utiliser des données sous forme de JSON brut.
Pour effectuer une mise à jour partielle d’un élément :
Copiez la variable de configuration existante
config
et modifiez les propriétés dans la nouvelle copie. Plus précisément, configurez la stratégie d’écriture surItemPatch
. Activez la prise en charge des opérations en bloc. Définissez les colonnes et les opérations mappées. Enfin, définissez le type d’opération par défaut surSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Créez des variables pour la clé de partition d’élément et l’identificateur unique que vous envisagez de cibler dans le cadre de cette opération corrective.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Créez un ensemble d’objets de correctif pour spécifier l’élément cible, et spécifiez les champs qui doivent être modifiés.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Créez une trame de données en utilisant l’ensemble d’objets patch (correctif). Utilisez
write
pour effectuer l’opération patch.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Exécutez une requête pour examiner les résultats de l’opération corrective. L’élément doit maintenant être nommé
Yamba New Surfboard
sans aucune autre modification.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Pour utiliser des données JSON brutes :
Copiez la variable de configuration existante
config
et modifiez les propriétés dans la nouvelle copie. Plus précisément, remplacez le conteneur cible paremployees
. Configurez ensuite la colonne/le champcontacts
pour utiliser les données JSON brutes.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Créez un ensemble d’employés à ingérer dans le conteneur.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Créez un DataFrame et utilisez
write
pour ingérer les données des employés.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Affichez les données de la trame de données à l’aide de
show
. Notez que la colonnecontacts
produit du JSON brut dans la sortie.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Contenu connexe
- Apache Spark
- API Catalogue Azure Cosmos DB
- Informations de référence sur les paramètres de configuration
- Exemples de connecteurs Azure Cosmos DB Spark
- Migrer de Spark 2.4 vers Spark 3.*
- Compatibilité des versions :
- Notes de publication :
- Liens de téléchargement :