Tutorial: Conexión a Azure Cosmos DB para NoSQL mediante Spark
SE APLICA A: NoSQL
En este tutorial, usará el conector Spark de Azure Cosmos DB para leer o escribir datos de una cuenta de Azure Cosmos DB for NoSQL. En este tutorial se usa Azure Databricks y un cuaderno de Jupyter para ilustrar cómo integrar la API para NoSQL desde Spark. Este tutorial se centra en Python y Scala, aunque puede usar cualquier lenguaje o interfaz compatible con Spark.
En este tutorial, aprenderá a:
- Conéctese a una cuenta de API para NoSQL mediante Spark y un cuaderno de Jupyter Notebook.
- Creación de recursos de bases de datos y contenedores.
- Ingerir datos en el contenedor.
- Consulta de datos en el contenedor.
- Realice operaciones comunes en los elementos del contenedor.
- Una cuenta existente de Azure Cosmos DB for NoSQL.
- Si tiene una suscripción de Azure, cree una nueva cuenta.
- ¿No tiene una suscripción de Azure? Puede probar Azure Cosmos DB de forma gratuita, sin necesidad de usar su tarjeta de crédito.
- Un área de trabajo existente de Azure Databricks.
Use el área de trabajo existente de Azure Databricks para crear un clúster de proceso listo para usar Apache Spark 3.4.x para conectarse a la cuenta de Azure Cosmos DB for NoSQL.
Abra el área de trabajo de Azure Databricks.
En la interfaz del área de trabajo, cree un nuevo clúster. Configure el clúster con estas opciones, como mínimo:
Versión Valor Versión del entorno de ejecución 13.3 LTS (Scala 2.12, Spark 3.4.1) Use la interfaz del área de trabajo para buscar paquetes de Maven desde Central de Maven con un Id. de grupo de
com.azure.cosmos.spark
. Instale el paquete específicamente para Spark 3.4 con unId.de Artefacto prefijo conazure-cosmos-spark_3-4
en el clúster.Por último, cree un nuevo cuaderno.
Sugerencia
De manera predeterminada, el cuaderno está asociado al clúster creado recientemente.
En el cuaderno, establezca los valores de configuración de procesamiento de transacciones en línea (OLTP) para el punto de conexión de la cuenta NoSQL, el nombre de la base de datos y el nombre del contenedor.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Use la API de catálogo para administrar los recursos de la cuenta, como las bases de datos y los contenedores. A continuación, puede usar OLTP para administrar datos dentro de los recursos del contenedor.
Configure la API de catálogo para administrar la API para recursos NoSQL mediante Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Cree una base de datos denominada
cosmicworks
medianteCREATE DATABASE IF NOT EXISTS
.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Cree un contenedor denominado
products
medianteCREATE TABLE IF NOT EXISTS
. Asegúrese de establecer la ruta de acceso de la clave de partición en/category
y habilite el rendimiento de escalabilidad automática con un rendimiento máximo de unidades de solicitud (RUs) de1000
por segundo.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Cree otro contenedor denominado
employees
mediante una configuración jerárquica de clave de partición. Use/organization
,/department
y/team
como conjunto de rutas de acceso de clave de partición. Siga ese orden específico. Además, establece el rendimiento en una cantidad manual del400
RUs.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Ejecute las celdas del cuaderno para validar que la base de datos y los contenedores se crean en la API para la cuenta NoSQL.
Cree un conjunto de datos de ejemplo. A continuación, use OLTP para ingerir esos datos en el contenedor API para NoSQL.
Cree un conjunto de datos de ejemplo.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Use
spark.createDataFrame
y la configuración de OLTP guardada anteriormente para agregar datos de muestra al contenedor de destino.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Cargue los datos de OLTP en un objeto DataFrame para realizar consultas comunes en los datos. Puede usar varias sintaxis para filtrar o consultar datos.
Use
spark.read
para cargar los datos OLTP en un objeto de trama de datos. Use la misma configuración que usó anteriormente en este tutorial. Además, establezcaspark.cosmos.read.inferSchema.enabled
entrue
para permitir que el conector de Spark infiere el esquema mediante el muestreo de elementos existentes.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Represente el esquema de los datos cargados en el marco de datos mediante
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Represente filas de datos en las que la columna
quantity
es menor que20
. Use las funcioneswhere
yshow
para realizar esta consulta.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Represente la primera fila de datos donde se
clearance
la columnatrue
. Use la funciónfilter
para realizar esta consulta.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Represente cinco filas de datos sin filtro ni truncamiento. Use la función
show
para personalizar la apariencia y el número de filas que se representan.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Consulte los datos mediante esta cadena de consulta NoSQL sin procesar:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Al trabajar con la API para datos NoSQL en Spark, puede realizar actualizaciones parciales o trabajar con datos como JSON sin procesar.
Para realizar una actualización parcial de un elemento:
Copie la variable de configuración
config
existente y modifique las propiedades de la nueva copia. En concreto, configure la estrategia de escritura paraItemPatch
. A continuación, deshabilite el soporte masivo. Establezca las columnas y las operaciones asignadas. Por último, establezca el tipo de operación predeterminado enSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Cree variables para la clave de partición del elemento y el identificador único que quiere tener como destino como parte de esta operación de revisión.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Cree un conjunto de objetos de revisión para definir el elemento de destino y especifique los campos que se deben modificar.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Cree una trama de datos mediante el conjunto de objetos de revisión. Use
write
para realizar la operación de revisión.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Ejecute una consulta para revisar los resultados de la operación de revisión. El elemento ahora debería denominarse
Yamba New Surfboard
sin ningún otro cambio.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Para trabajar con datos JSON sin procesar:
Copie la variable de configuración
config
existente y modifique las propiedades de la nueva copia. En concreto, cambie el contenedor de destino aemployees
. A continuación, configure elcontacts
columna o campo para usar datos JSON sin procesar.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Cree un conjunto de empleados que se ingerirán en el contenedor.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Cree un objeto DataFrame y use
write
para ingerir los datos de los empleados.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Represente los datos del marco de datos mediante
show
. Observe que la columnacontacts
son datos JSON sin procesar en la salida.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
- Spark de Apache
- API de catálogo de Azure Cosmos DB
- Referencia de parámetros de configuración
- Ejemplos del conector Spark de Azure Cosmos DB
- Migración de Spark 2.4 a Spark 3.*
- Compatibilidad de versiones:
- Notas de la versión:
- Vínculos de descarga: