Kurz: Připojení do služby Azure Cosmos DB for NoSQL pomocí Sparku

PLATÍ PRO: NoSQL

V tomto kurzu použijete konektor Spark služby Azure Cosmos DB ke čtení nebo zápisu dat z účtu Azure Cosmos DB for NoSQL. Tento kurz používá Azure Databricks a poznámkový blok Jupyter k ilustraci integrace s rozhraním API pro NoSQL ze Sparku. Tento kurz se zaměřuje na Python a Scala, i když můžete použít libovolný jazyk nebo rozhraní podporované Sparkem.

V tomto kurzu se naučíte:

  • Připojení k účtu ROZHRANÍ API pro NoSQL pomocí Sparku a poznámkového bloku Jupyter
  • Vytvoření prostředků databáze a kontejneru
  • Příjem dat do kontejneru
  • Dotazování dat v kontejneru
  • Provádění běžných operací s položkami v kontejneru

Požadavky

  • Existující účet Azure Cosmos DB for NoSQL.
  • Existující pracovní prostor Azure Databricks.

Připojení s využitím Sparku a Jupyteru

Pomocí stávajícího pracovního prostoru Azure Databricks vytvořte výpočetní cluster připravený k použití Apache Sparku 3.4.x pro připojení k vašemu účtu Azure Cosmos DB for NoSQL.

  1. Otevřete pracovní prostor Azure Databricks.

  2. V rozhraní pracovního prostoru vytvořte nový cluster. Nakonfigurujte cluster s těmito nastaveními minimálně:

    Hodnota
    Verze modulu runtime 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Pomocí rozhraní pracovního prostoru vyhledejte balíčky Maven z Maven Central s IDcom.azure.cosmos.sparkskupiny . Nainstalujte balíček specifický pro Spark 3.4 s ID artefaktu s azure-cosmos-spark_3-4 předponou clusteru.

  4. Nakonec vytvořte nový poznámkový blok.

    Tip

    Ve výchozím nastavení se poznámkový blok připojí k nedávno vytvořenému clusteru.

  5. V poznámkovém bloku nastavte nastavení konfigurace OLTP pro koncový bod účtu NoSQL, název databáze a název kontejneru.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Vytvoření databáze a kontejneru

Rozhraní API katalogu slouží ke správě prostředků účtu, jako jsou databáze a kontejnery. Pak můžete použít OLTP ke správě dat v rámci prostředků kontejneru.

  1. Nakonfigurujte rozhraní API katalogu pro správu prostředků NoSQL pomocí Sparku.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Vytvořte novou databázi s názvem cosmicworks using CREATE DATABASE IF NOT EXISTS.

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Vytvořte nový kontejner s názvem products using CREATE TABLE IF NOT EXISTS. Ujistěte se, že jste nastavili cestu /category ke klíči oddílu a povolili propustnost automatického 1000 škálování s maximální propustností jednotek žádostí za sekundu (RU/s).

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Vytvořte jiný kontejner pojmenovaný employees pomocí hierarchické konfigurace klíče oddílu s parametrem /organization, /departmenta /team jako sadu cest ke klíči oddílu v daném konkrétním pořadí. Také nastavte propustnost na ruční množství 400 RU/s.

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Spusťte buňku poznámkového bloku a ověřte, že se vaše databáze a kontejnery vytvářejí v rámci vašeho účtu ROZHRANÍ API pro NoSQL.

Ingestace dat

Vytvořte ukázkovou datovou sadu a pak pomocí OLTP ingestujte tato data do kontejneru API for NoSQL.

  1. Vytvořte ukázkovou datovou sadu.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. K přidání ukázkových dat do cílového kontejneru použijte spark.createDataFrame dříve uloženou konfiguraci OLTP.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Zadávání dotazů na data

Načtěte data OLTP do datového rámce, abyste mohli provádět běžné dotazy na data. Můžete použít různé syntaxe filtrování nebo dotazování dat.

  1. Slouží spark.read k načtení dat OLTP do objektu datového rámce. Použijte stejnou konfiguraci, jakou jste použili dříve v tomto kurzu. Pokud chcete konektor Sparku povolit odvození schématu vzorkováním existujících položek, nastavte spark.cosmos.read.inferSchema.enabled na hodnotu true.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Vykreslení schématu dat načtených v datovém rámci pomocí printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Vykreslujte řádky dat, ve kterých quantity je sloupec menší než 20. where K provedení tohoto dotazu použijte funkce a show funkce.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Vykreslí první řádek dat, ve clearance kterém je sloupec pravdivý. filter K provedení tohoto dotazu použijte funkci.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Vykreslí pět řádků dat bez filtru nebo zkrácení. show Pomocí funkce můžete přizpůsobit vzhled a počet vykreslených řádků.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Dotazujte se na data pomocí tohoto nezpracovaného řetězce dotazu NoSQL: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Provádění běžných operací

Při práci s daty API for NoSQL ve Sparku můžete provádět částečné aktualizace nebo pracovat s daty jako nezpracovaným json.

  1. Pokud chcete provést částečnou aktualizaci položky, proveďte tyto kroky:

    1. Zkopírujte existující config konfigurační proměnnou a upravte vlastnosti v nové kopii. Konkrétně; nakonfigurujte strategii zápisu na ItemPatch, zakažte hromadnou podporu, nastavte sloupce a mapované operace a nakonec nastavte výchozí typ operace na Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Vytvořte proměnné pro klíč oddílu položky a jedinečný identifikátor, na který chcete cílit jako součást této operace opravy.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Vytvořte sadu objektů oprav, které určí cílovou položku a určí pole, která se mají upravit.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Vytvořte datový rámec pomocí sady objektů oprav a použijte write k provedení operace opravy.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Spuštěním dotazu zkontrolujte výsledky operace opravy. Položka by teď měla být pojmenovaná Yamba New Surfboard bez dalších změn.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Pokud chcete pracovat s nezpracovanými daty JSON, postupujte takto:

    1. Zkopírujte existující config konfigurační proměnnou a upravte vlastnosti v nové kopii. Konkrétně; změňte cílový kontejner na sloupec nebo pole tak, employees aby používal nezpracovaná data JSON a nakonfigurovali ho contacts .

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Vytvořte sadu zaměstnanců, kteří se mají ingestovat do kontejneru.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Vytvořte datový rámec a použijte write ingestování dat zaměstnanců.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Vykreslení dat z datového rámce pomocí show. Všimněte si, že ve výstupu contacts je nezpracovaný JSON.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Další krok