Zelfstudie: Verbinding maken naar Azure Cosmos DB for NoSQL met behulp van Spark

VAN TOEPASSING OP: NoSQL

In deze zelfstudie gebruikt u de Azure Cosmos DB Spark-connector voor het lezen of schrijven van gegevens uit een Azure Cosmos DB for NoSQL-account. In deze zelfstudie wordt gebruikgemaakt van Azure Databricks en een Jupyter-notebook om te laten zien hoe u integreert met de API voor NoSQL vanuit Spark. Deze zelfstudie is gericht op Python en Scala, ook al kunt u elke taal of interface gebruiken die wordt ondersteund door Spark.

In deze zelfstudie leert u het volgende:

  • Verbinding maken naar een API voor NoSQL-account met behulp van Spark en een Jupyter-notebook
  • Database- en containerbronnen maken
  • Gegevens opnemen in de container
  • Query's uitvoeren op gegevens in de container
  • Algemene bewerkingen uitvoeren op items in de container

Vereisten

  • Een bestaand Azure Cosmos DB for NoSQL-account.
  • Een bestaande Azure Databricks-werkruimte.

Verbinding maken spark en Jupyter gebruiken

Gebruik uw bestaande Azure Databricks-werkruimte om een rekencluster te maken dat gereed is voor het gebruik van Apache Spark 3.4.x om verbinding te maken met uw Azure Cosmos DB for NoSQL-account.

  1. Open uw Azure Databricks-werkruimte.

  2. Maak een nieuw cluster in de werkruimte-interface. Configureer het cluster met deze instellingen minimaal:

    Value
    Runtime-versie 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Gebruik de werkruimte-interface om te zoeken naar Maven-pakketten van Maven Central met een groeps-id vancom.azure.cosmos.spark. Installeer het pakket dat specifiek is voor Spark 3.4 met een artefact-id voorafgegaan door azure-cosmos-spark_3-4 het cluster.

  4. Maak ten slotte een nieuw notitieblok.

    Tip

    Standaard wordt het notebook gekoppeld aan het onlangs gemaakte cluster.

  5. Stel in het notebook OLTP-configuratie-instellingen in voor noSQL-accounteindpunt, databasenaam en containernaam.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Een database en container maken

Gebruik de Catalogus-API om accountbronnen, zoals databases en containers, te beheren. Vervolgens kunt u OLTP gebruiken om gegevens te beheren binnen de containerresource[s].

  1. Configureer de Catalogus-API voor het beheren van API voor NoSQL-resources met behulp van Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Maak een nieuwe database met de naam cosmicworks .CREATE DATABASE IF NOT EXISTS

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Maak een nieuwe container met de naam products .CREATE TABLE IF NOT EXISTS Zorg ervoor dat u het pad /category naar de partitiesleutel instelt en doorvoer voor automatische schaalaanpassing inschakelt met een maximale doorvoer van 1000 aanvraageenheden per seconde (RU/s).

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Maak een andere container met de naam employees met behulp van een hiƫrarchische partitiesleutelconfiguratie met /organization, /departmenten /team als de set partitiesleutelpaden in die specifieke volgorde. Stel de doorvoer ook in op een handmatige 400 hoeveelheid RU/s

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Voer de notebookcel[s] uit om te controleren of uw database en containers zijn gemaakt in uw API voor NoSQL-account.

Gegevens opnemen

Maak een voorbeeldgegevensset en gebruik vervolgens OLTP om die gegevens op te nemen in de API voor NoSQL-container.

  1. Maak een voorbeeldgegevensset.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Gebruik spark.createDataFrame en de eerder opgeslagen OLTP-configuratie om voorbeeldgegevens toe te voegen aan de doelcontainer.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Querygegevens

LAAD OLTP-gegevens in een gegevensframe om algemene query's uit te voeren op de gegevens. U kunt verschillende syntaxisfilters gebruiken of gegevens opvragen.

  1. Gebruik spark.read deze functie om de OLTP-gegevens in een dataframeobject te laden. Gebruik dezelfde configuratie die eerder in deze zelfstudie is gebruikt. spark.cosmos.read.inferSchema.enabled Stel deze eigenschap ook in op true zodat de Spark-connector het schema kan afleiden door bestaande items te bemonsteren.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Geef het schema weer van de gegevens die in het dataframe zijn geladen met behulp van printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Gegevensrijen weergeven waarin de quantity kolom kleiner is dan 20. Gebruik de where en show functies om deze query uit te voeren.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Geef de eerste gegevensrij weer waarin de clearance kolom waar is. Gebruik de filter functie om deze query uit te voeren.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Geef vijf rijen met gegevens weer zonder filter of afkapping. Gebruik de show functie om het uiterlijk en het aantal rijen aan te passen dat wordt weergegeven.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Voer een query uit op uw gegevens met behulp van deze onbewerkte NoSQL-queryreeks: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Algemene bewerkingen uitvoeren

Wanneer u werkt met API voor NoSQL-gegevens in Spark, kunt u gedeeltelijke updates uitvoeren of met gegevens werken als onbewerkte JSON.

  1. Voer de volgende stappen uit om een gedeeltelijke update van een item uit te voeren:

    1. Kopieer de bestaande configuratievariabele config en wijzig de eigenschappen in de nieuwe kopie. Specifiek; configureer de schrijfstrategie om bulkondersteuning uit te ItemPatchschakelen, de kolommen en toegewezen bewerkingen in te stellen en ten slotte het standaardbewerkingstype in te Setstellen op .

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Maak variabelen voor de partitiesleutel van het item en de unieke id die u wilt targeten als onderdeel van deze patchbewerking.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Maak een set patchobjecten om het doelitem op te geven en geef velden op die moeten worden gewijzigd.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Maak een gegevensframe met behulp van de set patchobjecten en gebruik write deze om de patchbewerking uit te voeren.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Voer een query uit om de resultaten van de patchbewerking te bekijken. Het item moet nu worden benoemd Yamba New Surfboard zonder andere wijzigingen.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Voer de volgende stappen uit om te werken met onbewerkte JSON-gegevens:

    1. Kopieer de bestaande configuratievariabele config en wijzig de eigenschappen in de nieuwe kopie. Specifiek; wijzig de doelcontainer in employees en configureer de kolom/het contacts veld om onbewerkte JSON-gegevens te gebruiken.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Maak een set werknemers die u wilt opnemen in de container.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Maak een gegevensframe en gebruik write deze om de werknemersgegevens op te nemen.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Geef de gegevens uit het gegevensframe weer met behulp van show. U ziet dat de contacts kolom onbewerkte JSON in de uitvoer is.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Volgende stap