Sdílet prostřednictvím


Kurz: Připojení ke službě Azure Cosmos DB for NoSQL pomocí Sparku

PLATÍ PRO: NoSQL

V tomto kurzu použijete konektor Spark služby Azure Cosmos DB ke čtení nebo zápisu dat z účtu Azure Cosmos DB for NoSQL. Tento kurz používá Azure Databricks a poznámkový blok Jupyter k ilustraci integrace s rozhraním API pro NoSQL ze Sparku. Tento kurz se zaměřuje na Python a Scala, i když můžete použít libovolný jazyk nebo rozhraní podporované Sparkem.

V tomto kurzu se naučíte:

  • Připojte se k účtu ROZHRANÍ API pro NoSQL pomocí Sparku a poznámkového bloku Jupyter.
  • Vytvořte prostředky databáze a kontejneru.
  • Příjem dat do kontejneru
  • Dotazování dat v kontejneru
  • Provádění běžných operací s položkami v kontejneru

Požadavky

  • Existující účet Azure Cosmos DB for NoSQL.
  • Existující pracovní prostor Azure Databricks.

Připojení pomocí Sparku a Jupyteru

Pomocí stávajícího pracovního prostoru Azure Databricks vytvořte výpočetní cluster připravený k použití Apache Sparku 3.4.x pro připojení k vašemu účtu Azure Cosmos DB for NoSQL.

  1. Otevřete pracovní prostor Azure Databricks.

  2. V rozhraní pracovního prostoru vytvořte nový cluster. Nakonfigurujte cluster s těmito nastaveními minimálně:

    Verze Hodnota
    Verze modulu runtime 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Pomocí rozhraní pracovního prostoru vyhledejte balíčky Maven z Maven Central s ID com.azure.cosmos.sparkskupiny . Nainstalujte balíček speciálně pro Spark 3.4 s ID artefaktu předponou azure-cosmos-spark_3-4 clusteru.

  4. Nakonec vytvořte nový poznámkový blok.

    Tip

    Ve výchozím nastavení je poznámkový blok připojený k nedávno vytvořenému clusteru.

  5. V poznámkovém bloku nastavte nastavení konfigurace online zpracování transakcí (OLTP) pro koncový bod účtu NoSQL, název databáze a název kontejneru.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Vytvoření databáze a kontejneru

Rozhraní API katalogu slouží ke správě prostředků účtu, jako jsou databáze a kontejnery. Pak můžete použít OLTP ke správě dat v rámci prostředků kontejneru.

  1. Nakonfigurujte rozhraní API katalogu pro správu prostředků API pro NoSQL pomocí Sparku.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Vytvořte novou databázi s názvem cosmicworks pomocí příkazu CREATE DATABASE IF NOT EXISTS.

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Vytvořte nový kontejner pojmenovaný products pomocí .CREATE TABLE IF NOT EXISTS Ujistěte se, že jste nastavili cestu /category k klíči oddílu a povolili propustnost automatického 1000 škálování s maximální propustností jednotek žádostí (RU) za sekundu.

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Vytvořte jiný kontejner s názvem employees pomocí hierarchie konfigurace klíče oddílu. Použijte /organization, /departmenta /team jako sadu cest klíče oddílu. Postupujte podle tohoto konkrétního pořadí. Také nastavte propustnost na ruční množství 400 RU.

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Spuštěním buněk poznámkového bloku ověřte, že se vaše databáze a kontejnery vytvářejí v rámci účtu rozhraní API pro NoSQL.

Ingestace dat

Vytvořte ukázkovou datovou sadu. Pak pomocí OLTP ingestujte tato data do kontejneru API for NoSQL.

  1. Vytvořte ukázkovou datovou sadu.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. K přidání ukázkových dat do cílového kontejneru použijte spark.createDataFrame dříve uloženou konfiguraci OLTP.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Zadávání dotazů na data

Načtěte data OLTP do datového rámce, abyste mohli provádět běžné dotazy na data. K filtrování nebo dotazování dat můžete použít různé syntaxe.

  1. Slouží spark.read k načtení dat OLTP do objektu datového rámce. Použijte stejnou konfiguraci, kterou jste použili dříve v tomto kurzu. Také nastavte spark.cosmos.read.inferSchema.enabled , aby true konektor Spark mohl odvodit schéma vzorkováním existujících položek.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Vykreslení schématu dat načtených v datovém rámci pomocí printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Vykreslujte řádky dat, ve kterých quantity je sloupec menší než 20. where K provedení tohoto dotazu použijte funkce a show funkce.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Vykreslí první řádek dat, ve clearance kterém je truesloupec . filter K provedení tohoto dotazu použijte funkci.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Vykreslí pět řádků dat bez filtru nebo zkrácení. show Pomocí funkce můžete přizpůsobit vzhled a počet vykreslených řádků.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Dotazování dat pomocí tohoto nezpracovaného řetězce dotazu NoSQL: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Provádění běžných operací

Při práci s daty API for NoSQL ve Sparku můžete provádět částečné aktualizace nebo pracovat s daty jako nezpracovaným kódem JSON.

  1. Provedení částečné aktualizace položky:

    1. Zkopírujte existující config konfigurační proměnnou a upravte vlastnosti v nové kopii. Konkrétně nakonfigurujte strategii zápisu na ItemPatch. Pak hromadnou podporu zakažte. Nastavte sloupce a mapované operace. Nakonec nastavte výchozí typ operace na Sethodnotu .

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Vytvořte proměnné pro klíč oddílu položky a jedinečný identifikátor, na který chcete cílit jako součást této operace opravy.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Vytvořte sadu objektů oprav, které určí cílovou položku a určí pole, která se mají upravit.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Vytvořte datový rámec pomocí sady objektů patch. Slouží write k provedení operace opravy.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Spuštěním dotazu zkontrolujte výsledky operace opravy. Položka by teď měla být pojmenovaná Yamba New Surfboard bez dalších změn.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Práce s nezpracovanými daty JSON:

    1. Zkopírujte existující config konfigurační proměnnou a upravte vlastnosti v nové kopii. Konkrétně změňte cílový kontejner na employees. Potom nakonfigurujte contacts sloupec nebo pole tak, aby používal nezpracovaná data JSON.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Vytvořte sadu zaměstnanců, kteří se mají ingestovat do kontejneru.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Vytvořte datový rámec a použijte write ingestování dat zaměstnanců.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Vykreslení dat z datového rámce pomocí show. Všimněte si, že ve výstupu contacts je nezpracovaný JSON.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Další krok