Kopírování dat z Azure Cosmos DB do vyhrazeného fondu SQL pomocí Apache Sparku

Azure Synapse Link pro Azure Cosmos DB umožňuje uživatelům spouštět analýzy téměř v reálném čase nad provozními daty ve službě Azure Cosmos DB. Existují ale chvíle, kdy je potřeba některá data agregovat a rozšířit, aby bylo možné obsluhovat uživatele datového skladu. Ke kurátorování a exportu dat Azure Synapse Link můžete použít jenom několik buněk v poznámkovém bloku.

Požadavky

Postup

V tomto kurzu se připojíte k analytickému úložišti, aby to nemělo žádný vliv na transakční úložiště (nebude spotřebovávat žádné jednotky žádostí). Provedeme následující kroky:

  1. Načtení kontejneru HTAP služby Azure Cosmos DB do datového rámce Sparku
  2. Agregace výsledků v novém datovém rámci
  3. Ingestování dat do vyhrazeného fondu SQL

Kroky 1 z Sparku do SQL

Data

V tomto příkladu použijeme kontejner HTAP s názvem RetailSales. Je součástí propojené služby s názvem ConnectedData a má následující schéma:

  • _rid: řetězec (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: string (nullable = true)
  • advertising: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: řetězec (nullable = true)

Pro účely vytváření sestav agregujeme prodej (množství, výnosy (cena x množství) podle kódu productCode a weekStarting . Nakonec tato data vyexportujeme do tabulky vyhrazeného fondu SQL s názvem dbo.productsales.

Konfigurace poznámkového bloku Sparku

Vytvořte poznámkový blok Sparku s hlavním jazykem Scala (Scala). Použijeme výchozí nastavení poznámkového bloku pro relaci.

Čtení dat ve Sparku

Načtěte kontejner HTAP služby Azure Cosmos DB se Sparkem do datového rámce v první buňce.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Agregace výsledků v novém datovém rámci

Ve druhé buňce spustíme transformaci a agregace potřebné pro nový datový rámec před jeho načtením do databáze vyhrazeného fondu SQL.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Načtení výsledků do vyhrazeného fondu SQL

Ve třetí buňce načteme data do vyhrazeného fondu SQL. Automaticky vytvoří dočasnou externí tabulku, externí zdroj dat a formát externího souboru, které se po dokončení úlohy odstraní.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Dotazování výsledků pomocí SQL

Výsledek můžete dotazovat pomocí jednoduchého dotazu SQL, jako je například následující skript SQL:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

Dotaz zobrazí následující výsledky v režimu grafu: Spark to SQL Steps 2

Další kroky