演習: Azure Synapse Analytics で SQL プールと Spark プールを統合する
次の演習では、Azure Synapse Analytics での SQL プールと Apache Spark プールの統合について説明します。
Azure Synapse Analytics で SQL プールと Apache Spark プールを統合する
あなたは、Spark で Data Engineering タスクを実行した後に専用 SQL プールに書き込んでから、他のファイルからのデータを含む Apache Spark DataFrame と結合するためのソースとして SQL プール データを参照したいと考えています。
Azure Synapse Apache Spark to Synapse SQL コネクタを使用することを決定して、Azure Synapse の Spark プールと SQL プールの間でデータを効率的に転送できるようにします。
Apache Spark プールと SQL プール間のデータ転送は、JavaDataBaseConnectivity (JDBC) を使用して行うことができます。 ただし、Apache Spark プールや SQL プールといった 2 つの分散システムでは、JDBC はシリアル データ転送のボトルネックになる傾向があります。
Synapse SQL コネクタへの Azure Synapse の Apache Spark プールは、Apache Spark 用のデータ ソースの実装です。 これにより、Azure Data Lake Storage Gen2 と SQL プールの PolyBase が使用され、Spark クラスターと Synapse SQL インスタンスの間でデータが効率的に転送されます。
Apache Spark pool to Synapse SQL コネクタ (
sqlanalytics
) を使用する場合、DataFrame 内でデータの一時ビューを作成する方法があります。 新しいセルで次のコードを実行して、top_purchases
という名前のビューを作成します。Python# Create a temporary view for top purchases topPurchases.createOrReplaceTempView("top_purchases")
フラット化された JSON ユーザー購入データが格納されている、以前に作成した
topPurchases
データフレームから新しい一時ビューを作成しました。Apache Spark pool to Synapse SQL コネクタを使用するコードを Scala で実行する必要があります。 これを行うには、セルに
%%spark
マジックを追加します。 新しいセルで次のコードを実行し、top_purchases
ビューから読み取ります。Java%%spark // Make sure the name of the SQL pool (SQLPool01 below) matches the name of your SQL pool. val df = spark.sqlContext.sql("select * from top_purchases") df.write.sqlanalytics("SQLPool01.wwi.TopPurchases", Constants.INTERNAL)
注意
セルの実行には 1 分以上かかる場合があります。 この前にこのコマンドを実行すると、テーブルが既に存在しているため、"... という名前のオブジェクトが既に存在します" というエラーが表示されます。
セルの実行が完了したら、SQL プールのテーブルの一覧を見て、テーブルが正常に作成されたことを確認します。
ノートブックを開いたままにして、[データ] ハブに移動します (まだ選択されていない場合)。
[ワークスペース] タブ (1) を選択し、SQL プールを展開して、[テーブル] (2) の省略記号 (...) を選択し、[更新] (3) を選択します。
wwi.TopPurchases
テーブルと [列]wwi.TopPurchases
を展開します。ご覧のとおり、Apache Spark DataFrame の派生スキーマに基づいて、
wwi.TopPurchases
テーブルが自動的に作成されました。 Apache Spark pool to Synapse SQL コネクタは、テーブルを作成し、そこへデータを効率的に読み込む役割を担っていました。ノートブックに戻り、次を新しいセルで次のコードを実行して、
sale-small/Year=2019/Quarter=Q4/Month=12/
フォルダーにあるすべての Parquet ファイルから売上データを読み取ります。Pythondfsales = spark.read.load('abfss://wwi-02@' + datalake + '.dfs.core.windows.net/sale-small/Year=2019/Quarter=Q4/Month=12/*/*.parquet', format='parquet') display(dfsales.limit(10))
注意
このセルの実行には 3 分以上かかることがあります。
最初のセルで作成した
datalake
変数は、ファイル パスの一部としてここで使用されます。上のセルのファイル パスを最初のセルのファイル パスと比較します。 ここでは、
sale-small
にある Parquet ファイルから、2010 年 12 月 31 日の売上データではなく、すべての 2019 年 12 月の売上データを読み込むために、相対パスを使用しています。次に、前の手順で作成した SQL プールのテーブルから
TopSales
データを新しい Apache Spark DataFrame に読み込み、この新しいdfsales
DataFrame と結合してみましょう。 これを行うには、Apache Spark pool to Synapse SQL コネクタを使用して SQL プールからデータを取得する必要があるため、新しいセルに対して%%spark
マジックを再度使用する必要があります。 次に、Python からデータにアクセスできるように、DataFrame の内容を新しい一時ビューに追加する必要があります。新しいセルで次のコードを実行して、
TopSales
SQL プールのテーブルから読み取りを行い、その内容を一時ビューに保存します。Java%%spark // Make sure the name of the SQL pool (SQLPool01 below) matches the name of your SQL pool. val df2 = spark.read.sqlanalytics("SQLPool01.wwi.TopPurchases") df2.createTempView("top_purchases_sql") df2.head(10)
セルの言語は、セルの上部で
%%spark
マジックを使用することによってScala
に設定されますScala
。spark.read.sqlanalytics
メソッドによって作成された新しいデータフレームとして、df2
という名前の新しい変数を宣言しました。この変数は、SQL プールのTopPurchases
テーブルから読み取りを行いますdf2
。 次に、top_purchases_sql
という名前の新しい一時ビューを作成しました (3)。 最後に、df2.head(10))
行で最初の 10 個のレコードを表示しましたdf2.head(10))
。 セルの出力には、DataFrame 値が表示されます (5)。新しいセルで次のコードを実行して、
top_purchases_sql
一時ビューから Python で新しい DataFrame を作成してから、最初の 10 件の結果を表示します。PythondfTopPurchasesFromSql = sqlContext.table("top_purchases_sql") display(dfTopPurchasesFromSql.limit(10))
新しいセルで次のコードを実行して、売上の Parquet ファイルと
TopPurchases
SQL プールのデータを結合します。Pythoninner_join = dfsales.join(dfTopPurchasesFromSql, (dfsales.CustomerId == dfTopPurchasesFromSql.visitorId) & (dfsales.ProductId == dfTopPurchasesFromSql.productId)) inner_join_agg = (inner_join.select("CustomerId","TotalAmount","Quantity","itemsPurchasedLast12Months","top_purchases_sql.productId") .groupBy(["CustomerId","top_purchases_sql.productId"]) .agg( sum("TotalAmount").alias("TotalAmountDecember"), sum("Quantity").alias("TotalQuantityDecember"), sum("itemsPurchasedLast12Months").alias("TotalItemsPurchasedLast12Months")) .orderBy("CustomerId") ) display(inner_join_agg.limit(100))
このクエリでは、
dfsales
とdfTopPurchasesFromSql
の DataFrame を結合し、CustomerId
およびProductId
と照合しました。 この結合では、TopPurchases
SQL プールのテーブル データと 2019 年 12 月の売上に関する Parquet データを結合していますTopPurchases
。CustomerId
フィールドとProductId
フィールドでグループ化を行いました。ProductId
フィールドの名前があいまいである (両方の DataFrame に存在する) ため、ProductId
の名前を完全に修飾して、その名前をTopPurchases
DataFrame で参照する必要がありましたProductId
。次に、各製品への 12 月の支払いの合計金額、12 月の製品項目の合計数、過去 12 か月間に購入した製品項目の合計数の集計を作成しました (3)。
最後に、結合および集計されたデータをテーブル ビューで表示しました。
[テーブル] ビューの列ヘッダーを選択して、結果セットを並べ替えてみてください。