Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Ke čtení, zápisu a dotazování dat z účtu Azure Cosmos DB for NoSQL můžete použít Spark a konektor Azure Cosmos DB Spark. Kromě toho můžete pomocí konektoru vytvářet a spravovat kontejnery Cosmos DB.
Použití Sparku a konektoru se liší od použití Sparku ke čtení dat ze služby Cosmos DB v zrcadlených datech uložených v OneLake, protože k provádění operací se připojuje přímo k koncovému bodu Cosmos DB.
Konektor Spark služby Cosmos DB se dá použít k podpoře reverzních scénářů ETL, kdy potřebujete obsluhovat data z koncového bodu Cosmos DB s nízkou latencí nebo vysokou souběžností.
Poznámka:
Reverse ETL (extrakce, transformace, načtení) odkazuje na proces přebírání transformovaných analytických dat z analytického systému a jejich načítání zpět do operačních systémů (jako jsou CRM, ERP, POS nebo marketingové nástroje), aby obchodní týmy mohly pracovat na přehledech přímo v aplikacích, které používají každý den.
Návod
Stáhněte si kompletní ukázku z práce se službou Cosmos DB v Microsoft Fabric pomocí konektoru Spark služby Cosmos DB na GitHubu.
Požadavky
Existující kapacita Fabric
- Pokud kapacitu Fabric nemáte, spusťte zkušební verzi Fabric.
Existující databáze Cosmos DB v systému Fabric
- Pokud ji ještě nemáte, vytvořte novou databázi Cosmos DB ve Fabric.
Existující kontejner s daty
- Pokud ho ještě nemáte, doporučujeme načíst ukázkový kontejner dat.
Poznámka:
Tento článek používá integrovanou ukázku Cosmos DB vytvořenou s názvem databáze CosmosSampleDatabase a názvem kontejneru SampleData.
Načtení koncového bodu Cosmos DB
Nejprve najděte koncový bod pro databázi Cosmos DB ve Fabricu. Tento koncový bod se vyžaduje pro připojení pomocí konektoru Spark služby Cosmos DB.
Otevřete portál Fabric (https://app.fabric.microsoft.com).
Přejděte do existující databáze Cosmos DB.
V řádku nabídek databáze vyberte možnost Nastavení .
V dialogovém okně nastavení přejděte do části Připojení . Pak zkopírujte hodnotu koncového bodu pro databázi NoSQL služby Cosmos DB. Tuto hodnotu použijete v pozdějším kroku.
Konfigurace Spark v notebooku Fabric
Pokud se chcete připojit ke službě Cosmos DB pomocí konektoru Spark, musíte nakonfigurovat vlastní prostředí Spark. Tato část vás provede vytvořením vlastního prostředí Spark a nahráním knihoven konektorů Spark pro Cosmos DB.
Stáhněte si nejnovější soubory knihovny konektoru Spark služby Cosmos DB z úložiště Maven (ID skupiny: com.azure.cosmos.spark) pro Spark 3.5.
Vytvořte nový poznámkový blok.
Jako jazyk, který chcete použít, vyberte Spark (Scala).
Vyberte rozbalovací nabídku prostředí.
Zkontrolujte nastavení pracovního prostoru a ujistěte se, že používáte Runtime 1.3 (Spark 3.5).
Vyberte Nové prostředí.
Zadejte název nového prostředí.
Ujistěte se, že je runtime nakonfigurován pro Runtime 1.3 (Spark 3.5).
V levém panelu zvolte Vlastní knihovna ze složky Knihovny .
Nahrajte dva soubory knihovny
.jar, které jste si předtím stáhli.Vyberte Uložit.
Vyberte Publikovat, pak Publikovat vše a nakonec Publikovat.
Po publikování by měly mít uživatelské knihovny stav úspěšného dokončení.
Vraťte se do poznámkového bloku a vyberte nově nakonfigurované prostředí kliknutím na rozevírací seznam prostředí, výběrem možnosti Změnit prostředí a zvolením názvu nově vytvořeného prostředí.
Připojení pomocí Sparku
Pokud se chcete připojit ke Cosmos DB v databázi a kontejneru Fabric, zadejte konfiguraci připojení, která se má použít při čtení z kontejneru a zápisu do něj.
V poznámkovém bloku vložte dříve zachovaný název koncového bodu, databáze a kontejneru cosmos DB a pak nastavte nastavení konfigurace online zpracování transakcí (OLTP) pro koncový bod účtu NoSQL, název databáze a název kontejneru.
// User values for Cosmos DB val ENDPOINT = "https://{YourAccountEndpoint....cosmos.fabric.microsoft.com:443/}" val DATABASE = "{your-cosmos-artifact-name}" val CONTAINER = "{your-container-name}" // Set configuration settings val config = Map( "spark.cosmos.accountendpoint" -> ENDPOINT, "spark.cosmos.database" -> DATABASE, "spark.cosmos.container" -> CONTAINER, // auth config options "spark.cosmos.accountDataResolverServiceName" -> "com.azure.cosmos.spark.fabric.FabricAccountDataResolver", "spark.cosmos.auth.type" -> "AccessToken", "spark.cosmos.useGatewayMode" -> "true", "spark.cosmos.auth.aad.audience" -> "https://cosmos.azure.com/" )
Dotazování dat z kontejneru
Načtením dat OLTP do datového rámce provedete některé základní operace Sparku.
Slouží
spark.readk načtení dat OLTP do objektu DataFrame. Použijte konfiguraci vytvořenou v předchozím kroku. Nastavte takéspark.cosmos.read.inferSchema.enablednatrue, aby konektor Spark mohl odvodit schéma vzorkováním existujících položek.// Read Cosmos DB container into a dataframe val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()Zobrazí prvních pět řádků dat v datovém rámci.
// Show the first 5 rows of the dataframe df.show(5)Poznámka:
Kontejner SampleData, který jste vytvořili dříve, obsahuje dvě různé entity se dvěma samostatnými schématy, produktem a recenzí. Možnost odvozeníSchema detekuje dvě různá schémata v rámci tohoto kontejneru Cosmos DB a kombinuje je.
Zobrazte schéma dat načtených do datového rámce pomocí
printSchemaa ujistěte se, že schéma odpovídá ukázkové struktuře dokumentu.// Render schema df.printSchema()Výsledek by měl vypadat podobně jako v následujícím příkladu:
root |-- inventory: integer (nullable = true) |-- name: string (nullable = true) |-- priceHistory: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- date: string (nullable = true) | | |-- price: double (nullable = true) |-- stars: integer (nullable = true) |-- description: string (nullable = true) |-- currentPrice: double (nullable = true) |-- reviewDate: string (nullable = true) |-- countryOfOrigin: string (nullable = true) |-- id: string (nullable = false) |-- categoryName: string (nullable = true) |-- productId: string (nullable = true) |-- firstAvailable: string (nullable = true) |-- userName: string (nullable = true) |-- docType: string (nullable = true)Tato dvě schémata a jejich data lze filtrovat pomocí vlastnosti docType v kontejneru. Vyfiltrujte datový rámec jenom pro produkty, které používají
wherefunkci.// Render filtered rows by specific document type val productsDF = df.where("docType = 'product'") productsDF.show(5)Zobrazí schéma filtrovaných entit produktu .
// Render schema productsDF.printSchema()Vyfiltrujte datový rámec pomocí
filterfunkce tak, aby zobrazoval pouze produkty v určité kategorii.// Render filtered rows by specific document type and categoryName val filteredDF = df .where("docType = 'product'") .filter($"categoryName" === "Computers, Laptops") filteredDF.show(10)
Dotazování cosmos DB v Microsoft Fabric pomocí Spark SQL
Nakonfigurujte rozhraní API katalogu, abyste mohli odkazovat na službu Cosmos DB v prostředcích infrastruktury a spravovat ji pomocí dotazů Sparku s použitím dříve definované hodnoty koncového bodu.
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", ENDPOINT) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", "AccessToken") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.useGatewayMode", "true") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountDataResolverServiceName", "com.azure.cosmos.spark.fabric.FabricAccountDataResolver")Dotazování dat pomocí informací o katalogu a řetězce dotazu SQL pomocí funkce Spark SQL
// Show results of query val queryDF = spark.sql( " SELECT " + " categoryName, " + " productId, " + " docType, " + " name, " + " currentPrice, " + " stars " + " FROM cosmosCatalog." + DATABASE + "." + CONTAINER ) queryDF.show(10)Výsledek ukazuje, že vlastnosti chybějící v jednotlivých dokumentech se vrátí jako hodnoty NULL a měly by vypadat podobně jako v následujícím příkladu:
+------------------+--------------------+-------+--------------------+------------+-----+ | categoryName| productId|docType| name|currentPrice|stars| +------------------+--------------------+-------+--------------------+------------+-----+ |Computers, Laptops|77be013f-4036-431...|product|TechCorp SwiftEdg...| 2655.33| NULL| |Computers, Laptops|77be013f-4036-431...| review| NULL| NULL| 4| |Computers, Laptops|77be013f-4036-431...| review| NULL| NULL| 1| |Computers, Laptops|d4df3f4e-5a90-41e...|product|AeroTech VortexBo...| 2497.71| NULL| |Computers, Laptops|d4df3f4e-5a90-41e...| review| NULL| NULL| 1| |Computers, Laptops|d4df3f4e-5a90-41e...| review| NULL| NULL| 2| |Computers, Laptops|d4df3f4e-5a90-41e...| review| NULL| NULL| 1| |Computers, Laptops|d4df3f4e-5a90-41e...| review| NULL| NULL| 2| |Computers, Laptops|d4df3f4e-5a90-41e...| review| NULL| NULL| 5| |Computers, Laptops|e8b100f0-166d-43d...|product|NovaTech EdgeBook...| 1387.45| NULL| +------------------+--------------------+-------+--------------------+------------+-----+Tento příklad ukazuje, jak pracovat s vloženým polem v dokumentu JSON uloženém ve službě Cosmos DB. Nejprve zadejte dotaz na kontejner a potom pomocí operátoru
explodepriceHistoryrozbalte prvky pole na řádky a pak vypočítejte nejnižší cenu pro každý produkt uložený v historii produktů.// Retrieve the product data from the SampleData container val productPriceMinDF = spark.sql( "SELECT " + " productId, " + " categoryName, " + " name, " + " currentPrice, " + " priceHistory " + "FROM cosmosCatalog." + DATABASE + "." + CONTAINER + " " + "WHERE " + CONTAINER + ".docType = 'product'" ) // Prepare an exploded result set containing one row for every member of the priceHistory array val explodedDF = productPriceMinDF .withColumn("priceHistory", explode(col("priceHistory"))) .withColumn("priceDate", col("priceHistory").getField("date")) .withColumn("newPrice", col("priceHistory").getField("price")) // Aggregate just the lowest price ever recorded in the priceHistory val lowestPriceDF = explodedDF .filter(col("docType") === "product") .groupBy("productId", "categoryName", "name") .agg(min("newPrice").as("lowestPrice")) // Show 10 rows of the result data lowestPriceDF.show(10)Výsledky by měly vypadat takto.
+--------------------+--------------------+--------------------+-----------+ | productId| categoryName| name|lowestPrice| +--------------------+--------------------+--------------------+-----------+ |5d81221f-79ad-4ae...|Accessories, High...|PulseCharge Pro X120| 79.99| |9173595c-2b5c-488...|Accessories, Desi...| Elevate ProStand X2| 117.16| |a5d1be8f-ef18-484...|Computers, Gaming...|VoltStream Enigma...| 1799.0| |c9e3a6ce-432f-496...|Peripherals, Keyb...|HyperKey Pro X77 ...| 117.12| |f786eb9e-de01-45f...| Devices, Tablets|TechVerse TabPro X12| 469.93| |59f21059-e9d4-492...|Peripherals, Moni...|GenericGenericPix...| 309.77| |074d2d7a-933e-464...|Devices, Smartwat...| PulseSync Orion X7| 170.43| |dba39ca4-f94a-4b6...|Accessories, Desi...|Elevate ProStand ...| 129.0| |4775c430-1470-401...|Peripherals, Micr...|EchoStream Pro X7...| 119.65| |459a191a-21d1-42f...|Computers, Workst...|VertexPro Ultima ...| 3750.4| +--------------------+--------------------+--------------------+-----------+
Použití Cosmos DB pro implementaci zpětného ETL pomocí Spark
Cosmos DB je výjimečnou obslužnou vrstvou pro analytické úlohy kvůli své architektuře. Následující příklad ukazuje, jak provést reverzní ETL s analytickými daty a obsluhovat je pomocí Cosmos DB.
Vytvořte Cosmos DB v kontejneru Fabric pomocí Spark
Vytvořte
MinPricePerProductkontejner pomocí rozhraní API katalogu Spark aCREATE TABLE IF NOT EXISTS. Nastavte cestu klíče oddílu na/ida nakonfigurujte propustnost pro automatické škálování na minimální hodnotu1000RU/s, protože se očekává, že kontejner zůstane malý.// Create a MinPricePerProduct container by using the Catalog API val NEW_CONTAINER = "MinPricePerProduct" spark.sql( "CREATE TABLE IF NOT EXISTS cosmosCatalog." + DATABASE + "." + NEW_CONTAINER + " " + "USING cosmos.oltp " + "TBLPROPERTIES(partitionKeyPath = '/id', autoScaleMaxThroughput = '1000')" )
Zápis dat do Cosmos DB v kontejneru Fabric pomocí Sparku
Pokud chcete zapisovat data přímo do cosmos DB v kontejneru Fabric, potřebujete:
- správně formátovaný datový rámec obsahující klíč oddílu kontejneru a
idsloupce. - správně zadanou konfiguraci kontejneru, do kterého chcete zapisovat.
Všechny dokumenty ve službě Cosmos DB vyžadují vlastnost ID , což je také klíč oddílu
Productszvolený pro kontejner. Vytvořteidsloupec v datovémProductsDFrámci s hodnotouproductId.// Create an id column and copy productId value into it val ProductsDF = lowestPriceDF.withColumn("id", col("productId")) ProductsDF.show(10)Vytvořte novou konfiguraci kontejneru
MinPricePerProduct, do kterého chcete zapisovat. Jespark.cosmos.write.strategynastaveno naItemOverwrite, což znamená, že všechny existující dokumenty se stejným ID a klíčem oddílu se přepíší.// Configure the Cosmos DB connection information for the database and the new container. val configWrite = Map( "spark.cosmos.accountendpoint" -> ENDPOINT, "spark.cosmos.database" -> DATABASE, "spark.cosmos.container" -> NEW_CONTAINER, "spark.cosmos.write.strategy" -> "ItemOverwrite", // auth config options "spark.cosmos.accountDataResolverServiceName" -> "com.azure.cosmos.spark.fabric.FabricAccountDataResolver", "spark.cosmos.auth.type" -> "AccessToken", "spark.cosmos.useGatewayMode" -> "true", "spark.cosmos.auth.aad.audience" -> "https://cosmos.azure.com/" )Zapište datový rámec do kontejneru.
ProductsDF.write .format("cosmos.oltp") .options(configWrite) .mode("APPEND") .save()Zadejte dotaz na kontejner a ověřte, že teď obsahuje správná data.
// Test that the write operation worked val queryString = s"SELECT * FROM cosmosCatalog.$DATABASE.$NEW_CONTAINER" val queryDF = spark.sql(queryString) queryDF.show(10)Výsledek by měl vypadat podobně jako v následujícím příkladu:
+--------------------+--------------------+-----------+--------------------+--------------------+ | name| categoryName|lowestPrice| id| productId| +--------------------+--------------------+-----------+--------------------+--------------------+ |PulseCharge Pro X120|Accessories, High...| 79.99|5d81221f-79ad-4ae...|5d81221f-79ad-4ae...| | Elevate ProStand X2|Accessories, Desi...| 117.16|9173595c-2b5c-488...|9173595c-2b5c-488...| |VoltStream Enigma...|Computers, Gaming...| 1799.0|a5d1be8f-ef18-484...|a5d1be8f-ef18-484...| |HyperKey Pro X77 ...|Peripherals, Keyb...| 117.12|c9e3a6ce-432f-496...|c9e3a6ce-432f-496...| |TechVerse TabPro X12| Devices, Tablets| 469.93|f786eb9e-de01-45f...|f786eb9e-de01-45f...| |GenericGenericPix...|Peripherals, Moni...| 309.77|59f21059-e9d4-492...|59f21059-e9d4-492...| | PulseSync Orion X7|Devices, Smartwat...| 170.43|074d2d7a-933e-464...|074d2d7a-933e-464...| |Elevate ProStand ...|Accessories, Desi...| 129.0|dba39ca4-f94a-4b6...|dba39ca4-f94a-4b6...| |EchoStream Pro X7...|Peripherals, Micr...| 119.65|4775c430-1470-401...|4775c430-1470-401...| |VertexPro Ultima ...|Computers, Workst...| 3750.4|459a191a-21d1-42f...|459a191a-21d1-42f...| +--------------------+--------------------+-----------+--------------------+--------------------+