Kopírování dat z Azure Cosmos DB do vyhrazeného fondu SQL pomocí Apache Sparku
Azure Synapse Link pro Azure Cosmos DB umožňuje uživatelům spouštět analýzy téměř v reálném čase nad provozními daty ve službě Azure Cosmos DB. Existují ale chvíle, kdy je potřeba některá data agregovat a rozšířit, aby bylo možné obsluhovat uživatele datového skladu. Ke kurátorování a exportu dat Azure Synapse Link můžete použít jenom několik buněk v poznámkovém bloku.
Požadavky
- Zřízení pracovního prostoru Synapse pomocí:
- Zřízení účtu služby Azure Cosmos DB s kontejnerem HTAP s daty
- Připojení kontejneru HTAP služby Azure Cosmos DB k pracovnímu prostoru
- Správné nastavení pro import dat do vyhrazeného fondu SQL ze Sparku
Postup
V tomto kurzu se připojíte k analytickému úložišti, aby to nemělo žádný vliv na transakční úložiště (nebude spotřebovávat žádné jednotky žádostí). Provedeme následující kroky:
- Načtení kontejneru HTAP služby Azure Cosmos DB do datového rámce Sparku
- Agregace výsledků v novém datovém rámci
- Ingestování dat do vyhrazeného fondu SQL
Data
V tomto příkladu použijeme kontejner HTAP s názvem RetailSales. Je součástí propojené služby s názvem ConnectedData a má následující schéma:
- _rid: řetězec (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: string (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: string (nullable = true)
- advertising: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: řetězec (nullable = true)
Pro účely vytváření sestav agregujeme prodej (množství, výnosy (cena x množství) podle kódu productCode a weekStarting . Nakonec tato data vyexportujeme do tabulky vyhrazeného fondu SQL s názvem dbo.productsales
.
Konfigurace poznámkového bloku Sparku
Vytvořte poznámkový blok Sparku s hlavním jazykem Scala (Scala). Použijeme výchozí nastavení poznámkového bloku pro relaci.
Čtení dat ve Sparku
Načtěte kontejner HTAP služby Azure Cosmos DB se Sparkem do datového rámce v první buňce.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
Agregace výsledků v novém datovém rámci
Ve druhé buňce spustíme transformaci a agregace potřebné pro nový datový rámec před jeho načtením do databáze vyhrazeného fondu SQL.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
Načtení výsledků do vyhrazeného fondu SQL
Ve třetí buňce načteme data do vyhrazeného fondu SQL. Automaticky vytvoří dočasnou externí tabulku, externí zdroj dat a formát externího souboru, které se po dokončení úlohy odstraní.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
Dotazování výsledků pomocí SQL
Výsledek můžete dotazovat pomocí jednoduchého dotazu SQL, jako je například následující skript SQL:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
Dotaz zobrazí následující výsledky v režimu grafu:
Další kroky
Váš názor
https://aka.ms/ContentUserFeedback.
Připravujeme: V průběhu roku 2024 budeme postupně vyřazovat problémy z GitHub coby mechanismus zpětné vazby pro obsah a nahrazovat ho novým systémem zpětné vazby. Další informace naleznete v tématu:Odeslat a zobrazit názory pro