Esercitazione: Connessione ad Azure Cosmos DB per NoSQL con Spark

SI APPLICA A: NoSQL

In questa esercitazione si usa il connettore Spark di Azure Cosmos DB per leggere o scrivere dati da un account Azure Cosmos DB per NoSQL. Questa esercitazione usa Azure Databricks e un notebook jupyter per illustrare come eseguire l'integrazione con l'API per NoSQL da Spark. Questa esercitazione è incentrata su Python e Scala anche se è possibile usare qualsiasi linguaggio o interfaccia supportata da Spark.

In questa esercitazione apprenderai a:

  • Connessione a un account API per NoSQL usando Spark e un notebook di Jupyter
  • Creare risorse di database e contenitori
  • Inserire dati nel contenitore
  • Eseguire query sui dati nel contenitore
  • Eseguire operazioni comuni sugli elementi nel contenitore

Prerequisiti

Connessione con Spark e Jupyter

Usare l'area di lavoro di Azure Databricks esistente per creare un cluster di calcolo pronto per usare Apache Spark 3.4.x per connettersi all'account Azure Cosmos DB per NoSQL.

  1. Aprire l'area di lavoro di Azure Databricks.

  2. Nell'interfaccia dell'area di lavoro creare un nuovo cluster. Configurare il cluster con queste impostazioni, almeno:

    valore
    Versione di runtime 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Usare l'interfaccia dell'area di lavoro per cercare i pacchetti Maven da Maven Central con un ID gruppo di com.azure.cosmos.spark. Installare il pacchetto specifico per Spark 3.4 con un ID artefatto preceduto azure-cosmos-spark_3-4 dal prefisso nel cluster.

  4. Infine, creare un nuovo notebook.

    Suggerimento

    Per impostazione predefinita, il notebook verrà collegato al cluster creato di recente.

  5. Nel notebook impostare le impostazioni di configurazione OLTP per l'endpoint dell'account NoSQL, il nome del database e il nome del contenitore.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Creare un database e un contenitore

Usare l'API catalogo per gestire le risorse dell'account, ad esempio database e contenitori. È quindi possibile usare OLTP per gestire i dati all'interno della risorsa contenitore[s].

  1. Configurare l'API catalogo per gestire l'API per le risorse NoSQL usando Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Creare un nuovo database denominato cosmicworks usando CREATE DATABASE IF NOT EXISTS.

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Creare un nuovo contenitore denominato products usando CREATE TABLE IF NOT EXISTS. Assicurarsi di impostare il percorso della chiave di partizione su /category e abilitare la velocità effettiva con scalabilità automatica con una velocità effettiva massima di 1000 unità richiesta al secondo (UR/sec).

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Creare un altro contenitore denominato employees usando una configurazione gerarchica della chiave di partizione con /organization, /departmente /team come set di percorsi di chiave di partizione in tale ordine specifico. Impostare anche la velocità effettiva su una quantità manuale di 400 UR/sec

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Eseguire la cella del notebook [s] per verificare che il database e i contenitori vengano creati nell'account API per NoSQL.

Inserire i dati

Creare un set di dati di esempio e quindi usare OLTP per inserire tali dati nel contenitore API per NoSQL.

  1. Creare un set di dati di esempio.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Usare spark.createDataFrame e la configurazione OLTP salvata in precedenza per aggiungere dati di esempio al contenitore di destinazione.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Eseguire query sui dati

Caricare dati OLTP in un frame di dati per eseguire query comuni sui dati. È possibile usare varie sintassi per filtrare o eseguire query sui dati.

  1. Usare spark.read per caricare i dati OLTP in un oggetto dataframe. Usare la stessa configurazione usata in precedenza in questa esercitazione. Impostare anche su spark.cosmos.read.inferSchema.enabled true per consentire al connettore Spark di dedurre lo schema eseguendo il campionamento degli elementi esistenti.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Eseguire il rendering dello schema dei dati caricati nel dataframe usando printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Eseguire il rendering delle righe di dati in cui la quantity colonna è minore di 20. Usare le where funzioni e show per eseguire questa query.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Eseguire il rendering della prima riga di dati in cui la clearance colonna è true. Usare la filter funzione per eseguire questa query.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Eseguire il rendering di cinque righe di dati senza filtro o troncamento. Usare la show funzione per personalizzare l'aspetto e il numero di righe di cui viene eseguito il rendering.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Eseguire query sui dati usando questa stringa di query NoSQL non elaborata: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Eseguire operazioni comuni

Quando si usa l'API per i dati NoSQL in Spark, è possibile eseguire aggiornamenti parziali o usare i dati come JSON non elaborati.

  1. Per eseguire un aggiornamento parziale di un elemento, seguire questa procedura:

    1. Copiare la variabile di configurazione esistente config e modificare le proprietà nella nuova copia. Specificamente; configurare la strategia di scrittura su ItemPatch, disabilitare il supporto bulk, impostare le colonne e le operazioni mappate e infine impostare il tipo di operazione predefinito su Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Creare variabili per la chiave di partizione dell'elemento e l'identificatore univoco di destinazione come parte di questa operazione di patch.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Creare un set di oggetti patch per specificare l'elemento di destinazione e specificare i campi da modificare.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Creare un frame di dati usando il set di oggetti patch e usare write per eseguire l'operazione di patch.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Eseguire una query per esaminare i risultati dell'operazione di patch. L'elemento dovrebbe ora essere denominato Yamba New Surfboard senza altre modifiche.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Per usare dati JSON non elaborati, seguire questa procedura:

    1. Copiare la variabile di configurazione esistente config e modificare le proprietà nella nuova copia. Specificamente; modificare il contenitore di destinazione in employees e configurare la contacts colonna/campo in modo da usare dati JSON non elaborati.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Creare un set di dipendenti da inserire nel contenitore.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Creare un frame di dati e usare write per inserire i dati dei dipendenti.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Eseguire il rendering dei dati dal frame di dati usando show. Osservare che la contacts colonna è JSON non elaborato nell'output.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Passaggio successivo