Apache Spark を使用して Azure Cosmos DB から専用 SQL プールにデータをコピーする

Azure Synapse Link for Azure Cosmos DB を使用すると、ユーザーは Azure Cosmos DB のオペレーショナル データに対してほぼリアルタイムの分析を実行できます。 ただし、データ ウェアハウス ユーザーにサービスを提供するために、一部のデータを集計してエンリッチメント処理することが必要な場合があります。 Azure Synapse Link のデータのキュレーションとエクスポートは、ノートブックの少数のセルで実行できます。

前提条件

手順

このチュートリアルでは、トランザクション ストアに影響を与えないように分析ストアに接続します (要求ユニットを消費しません)。 ここでは、次の手順について説明します。

  1. Cosmos DB HTAP コンテナーを Spark データフレームに読み取る
  2. 新しいデータフレームで結果を集計する
  3. 専用 SQL プールにデータを取り込む

Spark to SQL Steps 1

Data

この例では、RetailSales という HTAP コンテナーを使用します。 これは、ConnectedData というリンクされたサービスの一部です。スキーマは次のとおりです。

  • _rid: string (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: string (nullable = true)
  • advertising: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: string (nullable = true)

レポート用に、売上 ("数量"、"収益" (価格 x 数量)) を productCodeweekStarting で集計します。 最後に、そのデータを dbo.productsales という専用 SQL プール テーブルにエクスポートします。

Spark ノートブックを構成する

Scala as Spark (Scala) を主要言語とした Spark ノートブックを作成します。 セッションにはノートブックのデフォルト設定を使用します。

Spark でデータを読み取る

Spark を使用した Cosmos DB HTAP コンテナーを最初のセルのデータフレームに読み取ります。

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

新しいデータフレームで結果を集計する

2 番目のセルでは、専用 SQL プール データベースに読み込む前に、新しいデータフレームに必要な変換と集計を実行します。

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

結果を専用 SQL プールに読み込む

3 番目のセルでは、データを専用 SQL プールに読み込みます。 これにより、ジョブが完了したら削除される一時的な外部テーブル、外部データ ソース、外部ファイル形式が自動的に作成されます。

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

SQL を使用して結果のクエリを実行する

次の SQL スクリプトのような単純な SQL クエリを使用して、結果のクエリを実行できます。

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

クエリでは、グラフ モードで次の結果を表示します。Spark to SQL Steps 2

次のステップ