Tutoriel : Se connecter à Azure Cosmos DB for NoSQL à l’aide de Spark
S’APPLIQUE À : NoSQL
Dans ce tutoriel, vous utilisez le connecteur Spark Azure Cosmos DB pour lire ou écrire des données à partir d’un compte Azure Cosmos DB for NoSQL. Ce tutoriel utilise Azure Databricks et un notebook Jupyter pour illustrer l’intégration à l’API pour NoSQL à partir de Spark. Ce tutoriel est axé sur Python et Scala, mais vous pouvez utiliser n’importe quel langage ou interface pris en charge par Spark.
Dans ce tutoriel, vous allez apprendre à :
- Se connecter à un compte API pour NoSQL à l’aide de Spark et d’un notebook Jupyter
- Créer des ressources de base de données et de conteneur
- Ingérer des données dans le conteneur
- Interroger des données dans le conteneur
- Effectuer des opérations courantes sur les éléments du conteneur
Prérequis
- Un compte Azure Cosmos DB for NoSQL déjà créé :
- Si vous disposez d’un abonnement Azure, créez un compte.
- Vous ne possédez pas d’abonnement Azure ? Vous pouvez essayer Azure Cosmos DB gratuitement sans carte de crédit.
- Un espace de travail Azure Databricks existant.
Se connecter à l’aide de Spark et Jupyter
Utilisez votre espace de travail Azure Databricks existant pour créer un cluster de calcul prêt à utiliser Apache Spark 3.4.x pour vous connecter à votre compte Azure Cosmos DB for NoSQL.
Ouvrez votre espace de travail Azure Databricks.
Dans l’interface de l’espace de travail, créez un cluster. Configurez le cluster avec ces paramètres, au minimum :
Valeur Version du runtime 13.3 LTS (Scala 2.12, Spark 3.4.1) Utilisez l’interface de l’espace de travail pour rechercher les packages Maven à partir de Maven Central avec l’ID de groupe
com.azure.cosmos.spark
. Installez le package propre à Spark 3.4 avec un ID d’artefact précédé deazure-cosmos-spark_3-4
dans le cluster.Pour finir, créez un notebook.
Conseil
Par défaut, le notebook est attaché au cluster récemment créé.
Dans le notebook, définissez les paramètres de configuration OLTP pour le point de terminaison du compte NoSQL, le nom de la base de données et le nom du conteneur.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Créer une base de données et un conteneur
Utilisez l’API Catalogue pour gérer les ressources de compte telles que les bases de données et les conteneurs. Vous pouvez ensuite utiliser OLTP pour gérer les données au sein des ressources de conteneur.
Configurez l’API Catalogue afin de gérer les ressources d’API pour NoSQL à l’aide de Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Créez une base de données sous le nom
cosmicworks
à l’aide deCREATE DATABASE IF NOT EXISTS
.# Create a database using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Créez un conteneur nommé
products
à l’aide deCREATE TABLE IF NOT EXISTS
. Veillez à bien définir le chemin de la clé de partition sur/category
et à activer le débit de mise à l’échelle automatique avec un débit maximal de1000
unités de requête par seconde (RU/s).# Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Créez un autre conteneur nommé
employees
à l’aide d’une configuration de clé de partition hiérarchique avec/organization
,/department
et/team
comme jeu de chemins de clé de partition dans cet ordre spécifique. Définissez aussi manuellement le débit sur400
RU/s.# Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Exécutez les cellules de notebook afin de vérifier que votre base de données et vos conteneurs sont créés dans votre compte API pour NoSQL.
Ingérer des données
Créez un exemple de jeu de données, puis utilisez OLTP pour ingérer ces données dans le conteneur API pour NoSQL.
Créez un exemple de jeu de données.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Utilisez
spark.createDataFrame
et la configuration OLTP précédemment enregistrée pour ajouter des exemples de données au conteneur cible.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Interroger des données
Chargez les données OLTP dans un DataFrame pour effectuer des requêtes courantes sur les données. Vous pouvez utiliser différents filtres de syntaxes ou données de requête.
Utilisez
spark.read
pour charger les données OLTP dans un objet DataFrame. Utilisez la même configuration que celle utilisée précédemment dans ce tutoriel. En outre, affectez la valeur true àspark.cosmos.read.inferSchema.enabled
pour autoriser le connecteur Spark à déduire le schéma en échantillonnant les éléments existants.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Affichez le schéma des données chargées dans le DataFrame à l’aide de
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Affichez les lignes de données où la colonne
quantity
est inférieure à20
. Utilisez les fonctionswhere
etshow
pour effectuer cette requête.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Affichez la première ligne de données où la colonne
clearance
est true. Utilisez la fonctionfilter
pour effectuer cette requête.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Affichez cinq lignes de données sans filtre ni troncation. Utilisez la fonction
show
pour personnaliser l’apparence et le nombre de lignes affichées.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Interrogez vos données à l’aide de cette chaîne de requête NoSQL brute :
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Effectuer des opérations courantes
Lorsque vous utilisez des données d’API pour NoSQL dans Spark, vous pouvez effectuer des mises à jour partielles ou utiliser des données sous forme de JSON brut.
Pour effectuer une mise à jour partielle d’un élément, effectuez ces étapes :
Copiez la variable de configuration existante
config
et modifiez les propriétés dans la nouvelle copie. Plus spécifiquement, configurez la stratégie d’écriture surItemPatch
, désactivez la prise en charge en bloc, définissez les colonnes et les opérations mappées, puis définissez le type d’opération par défaut surSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Créez des variables pour la clé de partition d’élément et l’identificateur unique que vous envisagez de cibler dans le cadre de cette opération corrective.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Créez un ensemble d’objets de correctif pour spécifier l’élément cible, et spécifiez les champs qui doivent être modifiés.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Créez un DataFrame à l’aide de l’ensemble d’objets de correctif, et utilisez
write
pour effectuer l’opération corrective.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Exécutez une requête pour examiner les résultats de l’opération corrective. L’élément doit maintenant être nommé
Yamba New Surfboard
sans aucune autre modification.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Pour utiliser des données JSON brutes, effectuez ces étapes :
Copiez la variable de configuration existante
config
et modifiez les propriétés dans la nouvelle copie. Plus spécifiquement, remplacez le conteneur cible paremployees
et configurez la colonne/le champcontacts
de façon à utiliser des données JSON brutes.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Créez un ensemble d’employés à ingérer dans le conteneur.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Créez un DataFrame et utilisez
write
pour ingérer les données des employés.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Affichez les données du DataFrame à l’aide de
show
. Notez que la colonnecontacts
produit du JSON brut dans la sortie.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Contenu connexe
- Apache Spark
- API Catalogue Azure Cosmos DB
- Référence des paramètres de configuration
- Exemple de notebook « New York City Taxi data »
- Migrer de Spark 2.4 vers Spark 3.*
- Compatibilité des versions
- Notes de publication
- Liens de téléchargement