Dela via


Självstudie: Ansluta till Azure Cosmos DB för NoSQL med hjälp av Spark

GÄLLER FÖR: NoSQL

I den här självstudien använder du Azure Cosmos DB Spark-anslutningsappen för att läsa eller skriva data från ett Azure Cosmos DB för NoSQL-konto. I den här självstudien används Azure Databricks och en Jupyter Notebook för att illustrera hur du integrerar med API:et för NoSQL från Spark. Den här självstudien fokuserar på Python och Scala, även om du kan använda valfritt språk eller gränssnitt som stöds av Spark.

I den här självstudien lär du dig att:

  • Anslut till ett API för NoSQL-konto med hjälp av Spark och en Jupyter Notebook.
  • Skapa databas- och containerresurser.
  • Mata in data till containern.
  • Fråga efter data i containern.
  • Utför vanliga åtgärder på objekt i containern.

Förutsättningar

Anslut med Hjälp av Spark och Jupyter

Använd din befintliga Azure Databricks-arbetsyta för att skapa ett beräkningskluster som är redo att använda Apache Spark 3.4.x för att ansluta till ditt Azure Cosmos DB för NoSQL-konto.

  1. Öppna din Azure Databricks-arbetsyta.

  2. Skapa ett nytt kluster i arbetsytans gränssnitt. Konfigurera klustret med de här inställningarna, minst:

    Version Värde
    Körningsversion 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Använd arbetsytans gränssnitt för att söka efter Maven-paket från Maven Central med ett grupp-ID för com.azure.cosmos.spark. Installera paketet specifikt för Spark 3.4 med ett artefakt-ID som är prefix för azure-cosmos-spark_3-4 klustret.

  4. Skapa slutligen en ny notebook-fil.

    Dricks

    Som standard är notebook-filen kopplad till det nyligen skapade klustret.

  5. I notebook-filen anger du konfigurationsinställningar för onlinetransaktionsbearbetning (OLTP) för NoSQL-kontoslutpunkten, databasnamnet och containernamnet.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Skapa en databas och en container

Använd katalog-API:et för att hantera kontoresurser som databaser och containrar. Sedan kan du använda OLTP för att hantera data i containerresurserna.

  1. Konfigurera katalog-API:et för att hantera API för NoSQL-resurser med hjälp av Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Skapa en ny databas med namnet cosmicworks med hjälp CREATE DATABASE IF NOT EXISTSav .

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Skapa en ny container med namnet products med hjälp CREATE TABLE IF NOT EXISTSav . Se till att du anger sökvägen för partitionsnyckeln till /category och aktiverar dataflöde för automatisk skalning med ett maximalt dataflöde 1000 för enheter för begäranden (RU:er) per sekund.

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Skapa en annan container med namnet employees med hjälp av en hierarkisk partitionsnyckelkonfiguration. Använd /organization, /departmentoch /team som uppsättning partitionsnyckelsökvägar. Följ den specifika ordningen. Ange också dataflödet till en manuell mängd 400 RU:er.

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Kör notebook-cellerna för att verifiera att databasen och containrarna har skapats i ditt API för NoSQL-kontot.

Mata in data

Skapa en exempeldatauppsättning. Använd sedan OLTP för att mata in dessa data till API:et för NoSQL-containern.

  1. Skapa en exempeldatauppsättning.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Använd spark.createDataFrame och den tidigare sparade OLTP-konfigurationen för att lägga till exempeldata i målcontainern.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Fråga efter data

Läs in OLTP-data i en dataram för att utföra vanliga frågor om data. Du kan använda olika syntaxer för att filtrera eller fråga efter data.

  1. Använd spark.read för att läsa in OLTP-data i ett dataramobjekt. Använd samma konfiguration som du använde tidigare i den här självstudien. spark.cosmos.read.inferSchema.enabled Ange också så att true Spark-anslutningsappen kan härleda schemat genom att sampling av befintliga objekt.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Rendera schemat för data som läses in i dataramen med hjälp printSchemaav .

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Rendera datarader där quantity kolumnen är mindre än 20. where Använd funktionerna och show för att utföra den här frågan.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Rendera den första dataraden clearance där kolumnen är true. filter Använd funktionen för att utföra den här frågan.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Rendera fem rader med data utan filter eller trunkering. show Använd funktionen för att anpassa utseendet och antalet rader som återges.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Fråga dina data med hjälp av den här råa NoSQL-frågesträngen: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Utföra vanliga åtgärder

När du arbetar med API för NoSQL-data i Spark kan du utföra partiella uppdateringar eller arbeta med data som rå JSON.

  1. Så här utför du en partiell uppdatering av ett objekt:

    1. Kopiera den befintliga config konfigurationsvariabeln och ändra egenskaperna i den nya kopian. Mer specifikt konfigurerar du skrivstrategin till ItemPatch. Inaktivera sedan massstöd. Ange kolumner och mappade åtgärder. Slutligen anger du standardåtgärdstypen till Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Skapa variabler för objektpartitionsnyckeln och den unika identifierare som du tänker rikta in dig på som en del av den här korrigeringsåtgärden.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Skapa en uppsättning korrigeringsobjekt för att ange målobjektet och ange fält som ska ändras.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Skapa en dataram med hjälp av uppsättningen med korrigeringsobjekt. Använd write för att utföra korrigeringsåtgärden.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Kör en fråga för att granska resultatet av korrigeringsåtgärden. Objektet bör nu namnges Yamba New Surfboard utan några andra ändringar.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Så här arbetar du med råa JSON-data:

    1. Kopiera den befintliga config konfigurationsvariabeln och ändra egenskaperna i den nya kopian. Mer specifikt ändrar du målcontainern till employees. Konfigurera contacts sedan kolumnen/fältet för att använda råa JSON-data.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Skapa en uppsättning anställda som ska matas in i containern.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Skapa en dataram och använd write för att mata in de anställdas data.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Rendera data från dataramen med hjälp showav . Observera att contacts kolumnen är rå JSON i utdata.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Gå vidare