Självstudie: Anslut till Azure Cosmos DB för NoSQL med Spark

GÄLLER FÖR: NoSQL

I den här självstudien använder du Azure Cosmos DB Spark-anslutningsappen för att läsa eller skriva data från ett Azure Cosmos DB för NoSQL-konto. I den här självstudien används Azure Databricks och en Jupyter Notebook för att illustrera hur du integrerar med API:et för NoSQL från Spark. Den här självstudien fokuserar på Python och Scala även om du kan använda valfritt språk eller gränssnitt som stöds av Spark.

I den här självstudien lär du dig att:

  • Anslut till ett API för NoSQL-konto med spark och en Jupyter-notebook-fil
  • Skapa databas- och containerresurser
  • Mata in data till containern
  • Fråga efter data i containern
  • Utföra vanliga åtgärder på objekt i containern

Förutsättningar

Anslut med Spark och Jupyter

Använd din befintliga Azure Databricks-arbetsyta för att skapa ett beräkningskluster som är redo att använda Apache Spark 3.4.x för att ansluta till ditt Azure Cosmos DB för NoSQL-konto.

  1. Öppna din Azure Databricks-arbetsyta.

  2. Skapa ett nytt kluster i arbetsytans gränssnitt. Konfigurera klustret med de här inställningarna, minst:

    Värde
    Körningsversion 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Använd arbetsytans gränssnitt för att söka efter Maven-paket från Maven Central med ett grupp-ID för com.azure.cosmos.spark. Installera paketet som är specifikt för Spark 3.4 med ett artefakt-ID som är prefix för azure-cosmos-spark_3-4 klustret.

  4. Skapa slutligen en ny notebook-fil.

    Dricks

    Som standard kopplas notebook-filen till det nyligen skapade klustret.

  5. I notebook-filen anger du OLTP-konfigurationsinställningar för NoSQL-kontoslutpunkt, databasnamn och containernamn.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Skapa en databas och container

Använd katalog-API:et för att hantera kontoresurser som databaser och containrar. Sedan kan du använda OLTP för att hantera data i containerresursen[s].

  1. Konfigurera katalog-API:et för att hantera API för NoSQL-resurser med Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Skapa en ny databas med namnet cosmicworks med .CREATE DATABASE IF NOT EXISTS

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Skapa en ny container med namnet products med .CREATE TABLE IF NOT EXISTS Se till att du anger sökvägen för partitionsnyckeln till /category och aktiverar dataflöde för automatisk skalning med ett maximalt dataflöde 1000 för enheter för begäranden per sekund (RU/s).

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Skapa en annan container med namnet employees med en hierarkisk partitionsnyckelkonfiguration med /organization, /departmentoch /team som uppsättning partitionsnyckelsökvägar i den specifika ordningen. Ange också dataflödet till en manuell mängd 400 RU/s

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Kör notebook-cellen för att verifiera att databasen och containrarna har skapats i ditt API för NoSQL-kontot.

Mata in data

Skapa en exempeldatauppsättning och använd sedan OLTP för att mata in dessa data till API:et för NoSQL-containern.

  1. Skapa en exempeldatauppsättning.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Använd spark.createDataFrame och den tidigare sparade OLTP-konfigurationen för att lägga till exempeldata i målcontainern.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Fråga efter data

Läs in OLTP-data i en dataram för att utföra vanliga frågor om data. Du kan använda olika syntaxfilter eller frågedata.

  1. Använd spark.read för att läsa in OLTP-data i ett dataramobjekt. Använd samma konfiguration som användes tidigare i den här självstudien. spark.cosmos.read.inferSchema.enabled Ange också true så att Spark-anslutningsappen kan härleda schemat genom att sampling av befintliga objekt.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Rendera schemat för data som lästs in i dataramen med hjälp av printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Rendera datarader där quantity kolumnen är mindre än 20. where Använd funktionerna och show för att utföra den här frågan.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Rendera den första dataraden clearance där kolumnen är sann. filter Använd funktionen för att utföra den här frågan.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Rendera fem rader med data utan filter eller trunkering. show Använd funktionen för att anpassa utseendet och antalet rader som återges.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Fråga dina data med den här råa NoSQL-frågesträngen: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Utföra vanliga åtgärder

När du arbetar med API för NoSQL-data i Spark kan du utföra partiella uppdateringar eller arbeta med data som rå JSON.

  1. Utför följande steg för att utföra en partiell uppdatering av ett objekt:

    1. Kopiera den befintliga config konfigurationsvariabeln och ändra egenskaperna i den nya kopian. Specifikt; konfigurera skrivstrategin till ItemPatch, inaktivera massstöd, ange kolumner och mappade åtgärder och ange slutligen standardåtgärdstypen till Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Skapa variabler för objektpartitionsnyckeln och den unika identifierare som du tänker rikta in dig på som en del av den här korrigeringsåtgärden.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Skapa en uppsättning korrigeringsobjekt för att ange målobjektet och ange fält som ska ändras.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Skapa en dataram med hjälp av uppsättningen korrigeringsobjekt och använd write för att utföra korrigeringsåtgärden.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Kör en fråga för att granska resultatet av korrigeringsåtgärden. Objektet bör nu namnges Yamba New Surfboard utan några andra ändringar.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Utför följande steg för att arbeta med råa JSON-data:

    1. Kopiera den befintliga config konfigurationsvariabeln och ändra egenskaperna i den nya kopian. Specifikt; ändra målcontainern till employees och konfigurera contacts kolumnen/fältet för att använda råa JSON-data.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Skapa en uppsättning anställda som ska matas in i containern.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Skapa en dataram och använd write för att mata in de anställdas data.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Rendera data från dataramen med hjälp av show. Observera att contacts kolumnen är rå JSON i utdata.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Gå vidare