Sdílet prostřednictvím


Práce se službou Cosmos DB v Microsoft Fabric s využitím konektoru Spark služby Cosmos DB

Ke čtení, zápisu a dotazování dat z účtu Azure Cosmos DB for NoSQL můžete použít Spark a konektor Azure Cosmos DB Spark. Kromě toho můžete pomocí konektoru vytvářet a spravovat kontejnery Cosmos DB.

Použití Sparku a konektoru se liší od použití Sparku ke čtení dat ze služby Cosmos DB v zrcadlených datech uložených v OneLake, protože k provádění operací se připojuje přímo k koncovému bodu Cosmos DB.

Konektor Spark služby Cosmos DB se dá použít k podpoře reverzních scénářů ETL, kdy potřebujete obsluhovat data z koncového bodu Cosmos DB s nízkou latencí nebo vysokou souběžností.

Poznámka:

Reverse ETL (extrakce, transformace, načtení) odkazuje na proces přebírání transformovaných analytických dat z analytického systému a jejich načítání zpět do operačních systémů (jako jsou CRM, ERP, POS nebo marketingové nástroje), aby obchodní týmy mohly pracovat na přehledech přímo v aplikacích, které používají každý den.

Požadavky

Poznámka:

Tento článek používá integrovanou ukázku Cosmos DB vytvořenou s názvem databáze CosmosSampleDatabase a názvem kontejneru SampleData.

Načtení koncového bodu Cosmos DB

Nejprve najděte koncový bod pro databázi Cosmos DB ve Fabricu. Tento koncový bod se vyžaduje pro připojení pomocí konektoru Spark služby Cosmos DB.

  1. Otevřete portál Fabric (https://app.fabric.microsoft.com).

  2. Přejděte do existující databáze Cosmos DB.

  3. V řádku nabídek databáze vyberte možnost Nastavení .

    Snímek obrazovky s možností nabídky Nastavení pro databázi na portálu Fabric

  4. V dialogovém okně nastavení přejděte do části Připojení . Pak zkopírujte hodnotu koncového bodu pro databázi NoSQL služby Cosmos DB. Tuto hodnotu použijete v pozdějším kroku.

    Snímek obrazovky s částí Připojení v dialogovém okně Nastavení pro databázi na portálu Fabric

Konfigurace Spark v notebooku Fabric

Pokud se chcete připojit ke službě Cosmos DB pomocí konektoru Spark, musíte nakonfigurovat vlastní prostředí Spark. Tato část vás provede vytvořením vlastního prostředí Spark a nahráním knihoven konektorů Spark pro Cosmos DB.

  1. Stáhněte si nejnovější soubory knihovny konektoru Spark služby Cosmos DB z úložiště Maven (ID skupiny: com.azure.cosmos.spark) pro Spark 3.5.

  2. Vytvořte nový poznámkový blok.

  3. Jako jazyk, který chcete použít, vyberte Spark (Scala).

    Snímek obrazovky poznámkového bloku znázorňující výběr Sparku (Scala) jako upřednostňovaného jazyka

  4. Vyberte rozbalovací nabídku prostředí.

  5. Zkontrolujte nastavení pracovního prostoru a ujistěte se, že používáte Runtime 1.3 (Spark 3.5).

    Snímek obrazovky poznámkového bloku s rozevírací nabídkou nastavení pracovního prostoru

  6. Vyberte Nové prostředí.

  7. Zadejte název nového prostředí.

  8. Ujistěte se, že je runtime nakonfigurován pro Runtime 1.3 (Spark 3.5).

  9. V levém panelu zvolte Vlastní knihovna ze složky Knihovny .

    Snímek obrazovky prostředí s možností vlastní knihovny

  10. Nahrajte dva soubory knihovny .jar , které jste si předtím stáhli.

  11. Vyberte Uložit.

  12. Vyberte Publikovat, pak Publikovat vše a nakonec Publikovat.

  13. Po publikování by měly mít uživatelské knihovny stav úspěšného dokončení.

    Snímek obrazovky prostředí s potvrzenými soubory vlastní knihovny

  14. Vraťte se do poznámkového bloku a vyberte nově nakonfigurované prostředí kliknutím na rozevírací seznam prostředí, výběrem možnosti Změnit prostředí a zvolením názvu nově vytvořeného prostředí.

Připojení pomocí Sparku

Pokud se chcete připojit ke Cosmos DB v databázi a kontejneru Fabric, zadejte konfiguraci připojení, která se má použít při čtení z kontejneru a zápisu do něj.

  1. V poznámkovém bloku vložte dříve zachovaný název koncového bodu, databáze a kontejneru cosmos DB a pak nastavte nastavení konfigurace online zpracování transakcí (OLTP) pro koncový bod účtu NoSQL, název databáze a název kontejneru.

    // User values for Cosmos DB
    val ENDPOINT = "https://{YourAccountEndpoint....cosmos.fabric.microsoft.com:443/}"
    val DATABASE = "{your-cosmos-artifact-name}"
    val CONTAINER = "{your-container-name}"
    
    // Set configuration settings
    val config = Map(
          "spark.cosmos.accountendpoint" -> ENDPOINT,
          "spark.cosmos.database" -> DATABASE,
          "spark.cosmos.container" -> CONTAINER,
          // auth config options
          "spark.cosmos.accountDataResolverServiceName" -> "com.azure.cosmos.spark.fabric.FabricAccountDataResolver",
          "spark.cosmos.auth.type" -> "AccessToken",
          "spark.cosmos.useGatewayMode" -> "true",
          "spark.cosmos.auth.aad.audience" -> "https://cosmos.azure.com/"
    )
    

Dotazování dat z kontejneru

Načtením dat OLTP do datového rámce provedete některé základní operace Sparku.

  1. Slouží spark.read k načtení dat OLTP do objektu DataFrame. Použijte konfiguraci vytvořenou v předchozím kroku. Nastavte také spark.cosmos.read.inferSchema.enabled na true, aby konektor Spark mohl odvodit schéma vzorkováním existujících položek.

    // Read Cosmos DB container into a dataframe
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Zobrazí prvních pět řádků dat v datovém rámci.

    // Show the first 5 rows of the dataframe
    df.show(5)
    

    Poznámka:

    Kontejner SampleData, který jste vytvořili dříve, obsahuje dvě různé entity se dvěma samostatnými schématy, produktem a recenzí. Možnost odvozeníSchema detekuje dvě různá schémata v rámci tohoto kontejneru Cosmos DB a kombinuje je.

  3. Zobrazte schéma dat načtených do datového rámce pomocí printSchema a ujistěte se, že schéma odpovídá ukázkové struktuře dokumentu.

    // Render schema    
    df.printSchema()
    

    Výsledek by měl vypadat podobně jako v následujícím příkladu:

    
        root
         |-- inventory: integer (nullable = true)
         |-- name: string (nullable = true)
         |-- priceHistory: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- date: string (nullable = true)
         |    |    |-- price: double (nullable = true)
         |-- stars: integer (nullable = true)
         |-- description: string (nullable = true)
         |-- currentPrice: double (nullable = true)
         |-- reviewDate: string (nullable = true)
         |-- countryOfOrigin: string (nullable = true)
         |-- id: string (nullable = false)
         |-- categoryName: string (nullable = true)
         |-- productId: string (nullable = true)
         |-- firstAvailable: string (nullable = true)
         |-- userName: string (nullable = true)
         |-- docType: string (nullable = true)
    
  4. Tato dvě schémata a jejich data lze filtrovat pomocí vlastnosti docType v kontejneru. Vyfiltrujte datový rámec jenom pro produkty, které používají where funkci.

    // Render filtered rows by specific document type
    val productsDF = df.where("docType = 'product'")
    productsDF.show(5)
    
  5. Zobrazí schéma filtrovaných entit produktu .

    // Render schema    
    productsDF.printSchema()
    
  6. Vyfiltrujte datový rámec pomocí filter funkce tak, aby zobrazoval pouze produkty v určité kategorii.

    // Render filtered rows by specific document type and categoryName
    val filteredDF = df
    .where("docType = 'product'")
    .filter($"categoryName" === "Computers, Laptops")
    
    filteredDF.show(10)
    

Dotazování cosmos DB v Microsoft Fabric pomocí Spark SQL

  1. Nakonfigurujte rozhraní API katalogu, abyste mohli odkazovat na službu Cosmos DB v prostředcích infrastruktury a spravovat ji pomocí dotazů Sparku s použitím dříve definované hodnoty koncového bodu.

    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", ENDPOINT)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", "AccessToken")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.useGatewayMode", "true")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountDataResolverServiceName", "com.azure.cosmos.spark.fabric.FabricAccountDataResolver")
    
  2. Dotazování dat pomocí informací o katalogu a řetězce dotazu SQL pomocí funkce Spark SQL

     // Show results of query   
      val queryDF = spark.sql(
      " SELECT " + 
      "  categoryName, " + 
      "  productId, " + 
      "  docType, " + 
      "  name, " + 
      "  currentPrice, " + 
      "  stars " + 
      " FROM cosmosCatalog." + DATABASE + "." + CONTAINER
      )
      queryDF.show(10)
    

    Výsledek ukazuje, že vlastnosti chybějící v jednotlivých dokumentech se vrátí jako hodnoty NULL a měly by vypadat podobně jako v následujícím příkladu:

    +------------------+--------------------+-------+--------------------+------------+-----+
    |      categoryName|           productId|docType|                name|currentPrice|stars|
    +------------------+--------------------+-------+--------------------+------------+-----+
    |Computers, Laptops|77be013f-4036-431...|product|TechCorp SwiftEdg...|     2655.33| NULL|
    |Computers, Laptops|77be013f-4036-431...| review|                NULL|        NULL|    4|
    |Computers, Laptops|77be013f-4036-431...| review|                NULL|        NULL|    1|
    |Computers, Laptops|d4df3f4e-5a90-41e...|product|AeroTech VortexBo...|     2497.71| NULL|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    1|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    2|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    1|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    2|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    5|
    |Computers, Laptops|e8b100f0-166d-43d...|product|NovaTech EdgeBook...|     1387.45| NULL|
    +------------------+--------------------+-------+--------------------+------------+-----+
    
  3. Tento příklad ukazuje, jak pracovat s vloženým polem v dokumentu JSON uloženém ve službě Cosmos DB. Nejprve zadejte dotaz na kontejner a potom pomocí operátoru explodepriceHistory rozbalte prvky pole na řádky a pak vypočítejte nejnižší cenu pro každý produkt uložený v historii produktů.

    // Retrieve the product data from the SampleData container
    val productPriceMinDF = spark.sql(
    "SELECT " +
    "  productId, " +
    "  categoryName, " +
    "  name, " +
    "  currentPrice, " +
    "  priceHistory " +
    "FROM cosmosCatalog." + DATABASE + "." + CONTAINER + " " +
    "WHERE " + CONTAINER + ".docType = 'product'"
    )
    
    // Prepare an exploded result set containing one row for every member of the priceHistory array
    val explodedDF = productPriceMinDF
       .withColumn("priceHistory", explode(col("priceHistory")))
       .withColumn("priceDate", col("priceHistory").getField("date"))
       .withColumn("newPrice", col("priceHistory").getField("price"))
    
    // Aggregate just the lowest price ever recorded in the priceHistory
    val lowestPriceDF = explodedDF
       .filter(col("docType") === "product")
       .groupBy("productId", "categoryName", "name")
       .agg(min("newPrice").as("lowestPrice"))
    
    // Show 10 rows of the result data
    lowestPriceDF.show(10)
    

    Výsledky by měly vypadat takto.

       +--------------------+--------------------+--------------------+-----------+
       |           productId|        categoryName|                name|lowestPrice|
       +--------------------+--------------------+--------------------+-----------+
       |5d81221f-79ad-4ae...|Accessories, High...|PulseCharge Pro X120|      79.99|
       |9173595c-2b5c-488...|Accessories, Desi...| Elevate ProStand X2|     117.16|
       |a5d1be8f-ef18-484...|Computers, Gaming...|VoltStream Enigma...|     1799.0|
       |c9e3a6ce-432f-496...|Peripherals, Keyb...|HyperKey Pro X77 ...|     117.12|
       |f786eb9e-de01-45f...|    Devices, Tablets|TechVerse TabPro X12|     469.93|
       |59f21059-e9d4-492...|Peripherals, Moni...|GenericGenericPix...|     309.77|
       |074d2d7a-933e-464...|Devices, Smartwat...|  PulseSync Orion X7|     170.43|
       |dba39ca4-f94a-4b6...|Accessories, Desi...|Elevate ProStand ...|      129.0|
       |4775c430-1470-401...|Peripherals, Micr...|EchoStream Pro X7...|     119.65|
       |459a191a-21d1-42f...|Computers, Workst...|VertexPro Ultima ...|     3750.4|
       +--------------------+--------------------+--------------------+-----------+
    

Použití Cosmos DB pro implementaci zpětného ETL pomocí Spark

Cosmos DB je výjimečnou obslužnou vrstvou pro analytické úlohy kvůli své architektuře. Následující příklad ukazuje, jak provést reverzní ETL s analytickými daty a obsluhovat je pomocí Cosmos DB.

Vytvořte Cosmos DB v kontejneru Fabric pomocí Spark

  • Vytvořte MinPricePerProduct kontejner pomocí rozhraní API katalogu Spark a CREATE TABLE IF NOT EXISTS. Nastavte cestu klíče oddílu na /id a nakonfigurujte propustnost pro automatické škálování na minimální hodnotu 1000 RU/s, protože se očekává, že kontejner zůstane malý.

    // Create a MinPricePerProduct container by using the Catalog API
      val NEW_CONTAINER = "MinPricePerProduct"
    
      spark.sql(
      "CREATE TABLE IF NOT EXISTS cosmosCatalog." + DATABASE + "." + NEW_CONTAINER + " " +
      "USING cosmos.oltp " + 
      "TBLPROPERTIES(partitionKeyPath = '/id', autoScaleMaxThroughput = '1000')"
      )
    

Zápis dat do Cosmos DB v kontejneru Fabric pomocí Sparku

Pokud chcete zapisovat data přímo do cosmos DB v kontejneru Fabric, potřebujete:

  • správně formátovaný datový rámec obsahující klíč oddílu kontejneru a id sloupce.
  • správně zadanou konfiguraci kontejneru, do kterého chcete zapisovat.
  1. Všechny dokumenty ve službě Cosmos DB vyžadují vlastnost ID , což je také klíč oddílu Products zvolený pro kontejner. Vytvořte id sloupec v datovém ProductsDF rámci s hodnotou productId.

    // Create an id column and copy productId value into it
    val ProductsDF = lowestPriceDF.withColumn("id", col("productId"))
    ProductsDF.show(10)
    
  2. Vytvořte novou konfiguraci kontejneru MinPricePerProduct , do kterého chcete zapisovat. Je spark.cosmos.write.strategy nastaveno na ItemOverwrite, což znamená, že všechny existující dokumenty se stejným ID a klíčem oddílu se přepíší.

    // Configure the Cosmos DB connection information for the database and the new container.
    val configWrite = Map(
       "spark.cosmos.accountendpoint" -> ENDPOINT,
       "spark.cosmos.database" -> DATABASE,
       "spark.cosmos.container" -> NEW_CONTAINER,
       "spark.cosmos.write.strategy" -> "ItemOverwrite",
       // auth config options
       "spark.cosmos.accountDataResolverServiceName" -> "com.azure.cosmos.spark.fabric.FabricAccountDataResolver",
       "spark.cosmos.auth.type" -> "AccessToken",
       "spark.cosmos.useGatewayMode" -> "true",
       "spark.cosmos.auth.aad.audience" -> "https://cosmos.azure.com/"
    )
    
  3. Zapište datový rámec do kontejneru.

    ProductsDF.write
      .format("cosmos.oltp")
      .options(configWrite)
      .mode("APPEND")
      .save()
    
  4. Zadejte dotaz na kontejner a ověřte, že teď obsahuje správná data.

    // Test that the write operation worked
    val queryString = s"SELECT * FROM cosmosCatalog.$DATABASE.$NEW_CONTAINER"
    val queryDF = spark.sql(queryString)
    queryDF.show(10)
    

    Výsledek by měl vypadat podobně jako v následujícím příkladu:

       +--------------------+--------------------+-----------+--------------------+--------------------+
       |                name|        categoryName|lowestPrice|                  id|           productId|
       +--------------------+--------------------+-----------+--------------------+--------------------+
       |PulseCharge Pro X120|Accessories, High...|      79.99|5d81221f-79ad-4ae...|5d81221f-79ad-4ae...|
       | Elevate ProStand X2|Accessories, Desi...|     117.16|9173595c-2b5c-488...|9173595c-2b5c-488...|
       |VoltStream Enigma...|Computers, Gaming...|     1799.0|a5d1be8f-ef18-484...|a5d1be8f-ef18-484...|
       |HyperKey Pro X77 ...|Peripherals, Keyb...|     117.12|c9e3a6ce-432f-496...|c9e3a6ce-432f-496...|
       |TechVerse TabPro X12|    Devices, Tablets|     469.93|f786eb9e-de01-45f...|f786eb9e-de01-45f...|
       |GenericGenericPix...|Peripherals, Moni...|     309.77|59f21059-e9d4-492...|59f21059-e9d4-492...|
       |  PulseSync Orion X7|Devices, Smartwat...|     170.43|074d2d7a-933e-464...|074d2d7a-933e-464...|
       |Elevate ProStand ...|Accessories, Desi...|      129.0|dba39ca4-f94a-4b6...|dba39ca4-f94a-4b6...|
       |EchoStream Pro X7...|Peripherals, Micr...|     119.65|4775c430-1470-401...|4775c430-1470-401...|
       |VertexPro Ultima ...|Computers, Workst...|     3750.4|459a191a-21d1-42f...|459a191a-21d1-42f...|
       +--------------------+--------------------+-----------+--------------------+--------------------+