Share via


Zelfstudie: Verbinding maken met Azure Cosmos DB for NoSQL met behulp van Spark

VAN TOEPASSING OP: NoSQL

In deze zelfstudie gebruikt u de Azure Cosmos DB Spark-connector voor het lezen of schrijven van gegevens uit een Azure Cosmos DB for NoSQL-account. In deze zelfstudie wordt gebruikgemaakt van Azure Databricks en een Jupyter-notebook om te laten zien hoe u integreert met de API voor NoSQL vanuit Spark. Deze zelfstudie is gericht op Python en Scala, hoewel u elke taal of interface kunt gebruiken die wordt ondersteund door Spark.

In deze zelfstudie leert u het volgende:

  • Maak verbinding met een API voor NoSQL-account met behulp van Spark en een Jupyter-notebook.
  • Database- en containerbronnen maken.
  • Gegevens opnemen in de container.
  • Query's uitvoeren op gegevens in de container.
  • Voer algemene bewerkingen uit op items in de container.

Vereisten

  • Een bestaand Azure Cosmos DB for NoSQL-account.
  • Een bestaande Azure Databricks-werkruimte.

Verbinding maken met behulp van Spark en Jupyter

Gebruik uw bestaande Azure Databricks-werkruimte om een rekencluster te maken dat gereed is voor het gebruik van Apache Spark 3.4.x om verbinding te maken met uw Azure Cosmos DB for NoSQL-account.

  1. Open uw Azure Databricks-werkruimte.

  2. Maak een nieuw cluster in de werkruimte-interface. Configureer het cluster met deze instellingen minimaal:

    Versie Weergegeven als
    Runtime-versie 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Gebruik de werkruimte-interface om te zoeken naar Maven-pakketten van Maven Central met een groeps-id van com.azure.cosmos.spark. Installeer het pakket specifiek voor Spark 3.4 met een artefact-id die is voorafgegaan door azure-cosmos-spark_3-4 het cluster.

  4. Maak ten slotte een nieuw notitieblok.

    Tip

    Standaard wordt het notebook gekoppeld aan het onlangs gemaakte cluster.

  5. Stel in het notebook configuratie-instellingen voor online transaction processing (OLTP) in voor het NoSQL-accounteindpunt, de databasenaam en de containernaam.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Een database en een container maken

Gebruik de Catalogus-API om accountbronnen, zoals databases en containers, te beheren. Vervolgens kunt u OLTP gebruiken om gegevens binnen de containerbronnen te beheren.

  1. Configureer de Catalogus-API voor het beheren van API voor NoSQL-resources met behulp van Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Maak een nieuwe database met de naam cosmicworks met behulp van CREATE DATABASE IF NOT EXISTS.

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Maak een nieuwe container met de naam products met behulp van CREATE TABLE IF NOT EXISTS. Zorg ervoor dat u het pad /category naar de partitiesleutel instelt en doorvoer voor automatische schaalaanpassing inschakelt met een maximale doorvoer van 1000 aanvraageenheden (RU's) per seconde.

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Maak een andere container met de naam employees met behulp van een hiƫrarchische partitiesleutelconfiguratie. Gebruik /organization, /departmenten /team als de set partitiesleutelpaden. Volg die specifieke volgorde. Stel de doorvoer ook in op een handmatige 400 hoeveelheid RU's.

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Voer de notebookcellen uit om te controleren of uw database en containers zijn gemaakt in uw API voor NoSQL-account.

Gegevens opnemen

Maak een voorbeeldgegevensset. Gebruik vervolgens OLTP om die gegevens op te nemen in de API voor NoSQL-container.

  1. Maak een voorbeeldgegevensset.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Gebruik spark.createDataFrame en de eerder opgeslagen OLTP-configuratie om voorbeeldgegevens toe te voegen aan de doelcontainer.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Querygegevens

LAAD OLTP-gegevens in een gegevensframe om algemene query's uit te voeren op de gegevens. U kunt verschillende syntaxis gebruiken om gegevens te filteren of op te vragen.

  1. Gebruik spark.read dit om de OLTP-gegevens in een gegevensframeobject te laden. Gebruik dezelfde configuratie die u eerder in deze zelfstudie hebt gebruikt. Stel ook in spark.cosmos.read.inferSchema.enabled dat true de Spark-connector het schema kan afleiden door bestaande items te bemonsteren.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Geef het schema weer van de gegevens die in het gegevensframe zijn geladen met behulp van printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Gegevensrijen weergeven waarin de quantity kolom kleiner is dan 20. Gebruik de where en show functies om deze query uit te voeren.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Geef de eerste gegevensrij weer waarin de clearance kolom zich bevindt true. Gebruik de filter functie om deze query uit te voeren.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Geef vijf rijen met gegevens weer zonder filter of afkapping. Gebruik de show functie om het uiterlijk en het aantal rijen aan te passen dat wordt weergegeven.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Voer een query uit op uw gegevens met behulp van deze onbewerkte NoSQL-queryreeks: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Algemene bewerkingen uitvoeren

Wanneer u werkt met API voor NoSQL-gegevens in Spark, kunt u gedeeltelijke updates uitvoeren of met gegevens werken als onbewerkte JSON.

  1. Een gedeeltelijke update van een item uitvoeren:

    1. Kopieer de bestaande configuratievariabele config en wijzig de eigenschappen in de nieuwe kopie. Configureer met name de schrijfstrategie naar ItemPatch. Schakel vervolgens bulkondersteuning uit. Stel de kolommen en toegewezen bewerkingen in. Stel ten slotte het standaardbewerkingstype Setin op .

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Maak variabelen voor de partitiesleutel van het item en de unieke id die u wilt targeten als onderdeel van deze patchbewerking.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Maak een set patchobjecten om het doelitem op te geven en geef velden op die moeten worden gewijzigd.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Maak een gegevensframe met behulp van de set patchobjecten. Gebruik write dit om de patchbewerking uit te voeren.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Voer een query uit om de resultaten van de patchbewerking te bekijken. Het item moet nu worden benoemd Yamba New Surfboard zonder andere wijzigingen.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Werken met onbewerkte JSON-gegevens:

    1. Kopieer de bestaande configuratievariabele config en wijzig de eigenschappen in de nieuwe kopie. Wijzig met name de doelcontainer in employees. Configureer vervolgens de kolom/het contacts veld voor het gebruik van onbewerkte JSON-gegevens.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Maak een set werknemers die u wilt opnemen in de container.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Maak een gegevensframe en gebruik write deze om de werknemersgegevens op te nemen.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Geef de gegevens uit het gegevensframe weer met behulp van show. U ziet dat de contacts kolom onbewerkte JSON in de uitvoer is.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Volgende stap