Compartir a través de


Tutorial: Conexión a Azure Cosmos DB para NoSQL mediante Spark

SE APLICA A: NoSQL

En este tutorial, usará el conector Spark de Azure Cosmos DB para leer o escribir datos de una cuenta de Azure Cosmos DB for NoSQL. En este tutorial se usa Azure Databricks y un cuaderno de Jupyter para ilustrar cómo integrar la API para NoSQL desde Spark. Este tutorial se centra en Python y Scala, aunque puede usar cualquier lenguaje o interfaz compatible con Spark.

En este tutorial, aprenderá a:

  • Conéctese a una cuenta de API para NoSQL mediante Spark y un cuaderno de Jupyter Notebook.
  • Creación de recursos de bases de datos y contenedores.
  • Ingerir datos en el contenedor.
  • Consulta de datos en el contenedor.
  • Realice operaciones comunes en los elementos del contenedor.

Requisitos previos

Conexión mediante Spark y Jupyter

Use el área de trabajo existente de Azure Databricks para crear un clúster de proceso listo para usar Apache Spark 3.4.x para conectarse a la cuenta de Azure Cosmos DB for NoSQL.

  1. Abra el área de trabajo de Azure Databricks.

  2. En la interfaz del área de trabajo, cree un nuevo clúster. Configure el clúster con estas opciones, como mínimo:

    Versión Valor
    Versión del entorno de ejecución 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Use la interfaz del área de trabajo para buscar paquetes de Maven desde Central de Maven con un Id. de grupo de com.azure.cosmos.spark. Instale el paquete específicamente para Spark 3.4 con unId.de Artefacto prefijo con azure-cosmos-spark_3-4 en el clúster.

  4. Por último, cree un nuevo cuaderno.

    Sugerencia

    De manera predeterminada, el cuaderno está asociado al clúster creado recientemente.

  5. En el cuaderno, establezca los valores de configuración de procesamiento de transacciones en línea (OLTP) para el punto de conexión de la cuenta NoSQL, el nombre de la base de datos y el nombre del contenedor.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Creación de una base de datos y un contenedor

Use la API de catálogo para administrar los recursos de la cuenta, como las bases de datos y los contenedores. A continuación, puede usar OLTP para administrar datos dentro de los recursos del contenedor.

  1. Configure la API de catálogo para administrar la API para recursos NoSQL mediante Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Cree una base de datos denominada cosmicworks medianteCREATE DATABASE IF NOT EXISTS.

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Cree un contenedor denominado products mediante CREATE TABLE IF NOT EXISTS. Asegúrese de establecer la ruta de acceso de la clave de partición en /category y habilite el rendimiento de escalabilidad automática con un rendimiento máximo de unidades de solicitud (RUs) de 1000 por segundo.

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Cree otro contenedor denominado employees mediante una configuración jerárquica de clave de partición. Use /organization, /departmenty /team como conjunto de rutas de acceso de clave de partición. Siga ese orden específico. Además, establece el rendimiento en una cantidad manual del 400RUs.

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Ejecute las celdas del cuaderno para validar que la base de datos y los contenedores se crean en la API para la cuenta NoSQL.

Ingerir datos

Cree un conjunto de datos de ejemplo. A continuación, use OLTP para ingerir esos datos en el contenedor API para NoSQL.

  1. Cree un conjunto de datos de ejemplo.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Use spark.createDataFrame y la configuración de OLTP guardada anteriormente para agregar datos de muestra al contenedor de destino.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Consultar datos

Cargue los datos de OLTP en un objeto DataFrame para realizar consultas comunes en los datos. Puede usar varias sintaxis para filtrar o consultar datos.

  1. Use spark.read para cargar los datos OLTP en un objeto de trama de datos. Use la misma configuración que usó anteriormente en este tutorial. Además, establezca spark.cosmos.read.inferSchema.enabled en true para permitir que el conector de Spark infiere el esquema mediante el muestreo de elementos existentes.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Represente el esquema de los datos cargados en el marco de datos mediante printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Represente filas de datos en las que la columna quantity es menor que 20. Use las funciones where y show para realizar esta consulta.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Represente la primera fila de datos donde se clearance la columnatrue. Use la función filter para realizar esta consulta.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Represente cinco filas de datos sin filtro ni truncamiento. Use la función show para personalizar la apariencia y el número de filas que se representan.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Consulte los datos mediante esta cadena de consulta NoSQL sin procesar: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Realización de operaciones comunes

Al trabajar con la API para datos NoSQL en Spark, puede realizar actualizaciones parciales o trabajar con datos como JSON sin procesar.

  1. Para realizar una actualización parcial de un elemento:

    1. Copie la variable de configuración config existente y modifique las propiedades de la nueva copia. En concreto, configure la estrategia de escritura para ItemPatch. A continuación, deshabilite el soporte masivo. Establezca las columnas y las operaciones asignadas. Por último, establezca el tipo de operación predeterminado en Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Cree variables para la clave de partición del elemento y el identificador único que quiere tener como destino como parte de esta operación de revisión.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Cree un conjunto de objetos de revisión para definir el elemento de destino y especifique los campos que se deben modificar.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Cree una trama de datos mediante el conjunto de objetos de revisión. Use write para realizar la operación de revisión.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Ejecute una consulta para revisar los resultados de la operación de revisión. El elemento ahora debería denominarse Yamba New Surfboard sin ningún otro cambio.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Para trabajar con datos JSON sin procesar:

    1. Copie la variable de configuración config existente y modifique las propiedades de la nueva copia. En concreto, cambie el contenedor de destino a employees. A continuación, configure el contacts columna o campo para usar datos JSON sin procesar.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Cree un conjunto de empleados que se ingerirán en el contenedor.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Cree un objeto DataFrame y use write para ingerir los datos de los empleados.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Represente los datos del marco de datos mediante show. Observe que la columna contacts son datos JSON sin procesar en la salida.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Paso siguiente