英語で読む

演習: Azure Synapse Analytics で SQL プールと Spark プールを統合する

完了

次の演習では、Azure Synapse Analytics での SQL プールと Apache Spark プールの統合について説明します。

Azure Synapse Analytics で SQL プールと Apache Spark プールを統合する

あなたは、Spark で Data Engineering タスクを実行した後に専用 SQL プールに書き込んでから、他のファイルからのデータを含む Apache Spark DataFrame と結合するためのソースとして SQL プール データを参照したいと考えています。

Azure Synapse Apache Spark to Synapse SQL コネクタを使用することを決定して、Azure Synapse の Spark プールと SQL プールの間でデータを効率的に転送できるようにします。

Apache Spark プールと SQL プール間のデータ転送は、JavaDataBaseConnectivity (JDBC) を使用して行うことができます。 ただし、Apache Spark プールや SQL プールといった 2 つの分散システムでは、JDBC はシリアル データ転送のボトルネックになる傾向があります。

Synapse SQL コネクタへの Azure Synapse の Apache Spark プールは、Apache Spark 用のデータ ソースの実装です。 これにより、Azure Data Lake Storage Gen2 と SQL プールの PolyBase が使用され、Spark クラスターと Synapse SQL インスタンスの間でデータが効率的に転送されます。

  1. Apache Spark pool to Synapse SQL コネクタ (sqlanalytics) を使用する場合、DataFrame 内でデータの一時ビューを作成する方法があります。 新しいセルで次のコードを実行して、top_purchases という名前のビューを作成します。

    Python
    # Create a temporary view for top purchases 
    topPurchases.createOrReplaceTempView("top_purchases")
    

    フラット化された JSON ユーザー購入データが格納されている、以前に作成した topPurchases データフレームから新しい一時ビューを作成しました。

  2. Apache Spark pool to Synapse SQL コネクタを使用するコードを Scala で実行する必要があります。 これを行うには、セルに %%spark マジックを追加します。 新しいセルで次のコードを実行し、top_purchases ビューから読み取ります。

    Java
    %%spark
    // Make sure the name of the SQL pool (SQLPool01 below) matches the name of your SQL pool.
    val df = spark.sqlContext.sql("select * from top_purchases")
    df.write.sqlanalytics("SQLPool01.wwi.TopPurchases", Constants.INTERNAL)
    

    注意

    セルの実行には 1 分以上かかる場合があります。 この前にこのコマンドを実行すると、テーブルが既に存在しているため、"... という名前のオブジェクトが既に存在します" というエラーが表示されます。

    セルの実行が完了したら、SQL プールのテーブルの一覧を見て、テーブルが正常に作成されたことを確認します。

  3. ノートブックを開いたままにして[データ] ハブに移動します (まだ選択されていない場合)。

    [データ] ハブが強調表示されています

  4. [ワークスペース] タブ (1) を選択し、SQL プールを展開して、[テーブル] (2)省略記号 (...) を選択し、[更新] (3) を選択します。 wwi.TopPurchases テーブルと [列] wwi.TopPurchases を展開します。

    テーブルが表示されています。

    ご覧のとおり、Apache Spark DataFrame の派生スキーマに基づいて、wwi.TopPurchases テーブルが自動的に作成されました。 Apache Spark pool to Synapse SQL コネクタは、テーブルを作成し、そこへデータを効率的に読み込む役割を担っていました。

  5. ノートブックに戻り、次を新しいセルで次のコードを実行して、sale-small/Year=2019/Quarter=Q4/Month=12/ フォルダーにあるすべての Parquet ファイルから売上データを読み取ります。

    Python
    dfsales = spark.read.load('abfss://wwi-02@' + datalake + '.dfs.core.windows.net/sale-small/Year=2019/Quarter=Q4/Month=12/*/*.parquet', format='parquet')
    display(dfsales.limit(10))
    

    注意

    このセルの実行には 3 分以上かかることがあります。

    最初のセルで作成した datalake 変数は、ファイル パスの一部としてここで使用されます。

    セルの出力が表示されています。

    上のセルのファイル パスを最初のセルのファイル パスと比較します。 ここでは、sale-small にある Parquet ファイルから、2010 年 12 月 31 日の売上データではなく、すべての 2019 年 12 月の売上データを読み込むために、相対パスを使用しています。

    次に、前の手順で作成した SQL プールのテーブルから TopSales データを新しい Apache Spark DataFrame に読み込み、この新しい dfsales DataFrame と結合してみましょう。 これを行うには、Apache Spark pool to Synapse SQL コネクタを使用して SQL プールからデータを取得する必要があるため、新しいセルに対して %%spark マジックを再度使用する必要があります。 次に、Python からデータにアクセスできるように、DataFrame の内容を新しい一時ビューに追加する必要があります。

  6. 新しいセルで次のコードを実行して、TopSales SQL プールのテーブルから読み取りを行い、その内容を一時ビューに保存します。

    Java
    %%spark
    // Make sure the name of the SQL pool (SQLPool01 below) matches the name of your SQL pool.
    val df2 = spark.read.sqlanalytics("SQLPool01.wwi.TopPurchases")
    df2.createTempView("top_purchases_sql")
    
    df2.head(10)
    

    セルとその出力が、説明のとおりに表示されています。

    セルの言語は、セルの上部で %%spark マジックを使用することによって Scala に設定されます Scalaspark.read.sqlanalytics メソッドによって作成された新しいデータフレームとして、df2 という名前の新しい変数を宣言しました。この変数は、SQL プールの TopPurchases テーブルから読み取りを行います df2。 次に、top_purchases_sql という名前の新しい一時ビューを作成しました (3)。 最後に、df2.head(10)) 行で最初の 10 個のレコードを表示しました df2.head(10))。 セルの出力には、DataFrame 値が表示されます (5)

  7. 新しいセルで次のコードを実行して、top_purchases_sql 一時ビューから Python で新しい DataFrame を作成してから、最初の 10 件の結果を表示します。

    Python
    dfTopPurchasesFromSql = sqlContext.table("top_purchases_sql")
    
    display(dfTopPurchasesFromSql.limit(10))
    

    DataFrame のコードと出力が表示されています。

  8. 新しいセルで次のコードを実行して、売上の Parquet ファイルと TopPurchases SQL プールのデータを結合します。

    Python
    inner_join = dfsales.join(dfTopPurchasesFromSql,
        (dfsales.CustomerId == dfTopPurchasesFromSql.visitorId) & (dfsales.ProductId == dfTopPurchasesFromSql.productId))
    
    inner_join_agg = (inner_join.select("CustomerId","TotalAmount","Quantity","itemsPurchasedLast12Months","top_purchases_sql.productId")
        .groupBy(["CustomerId","top_purchases_sql.productId"])
        .agg(
            sum("TotalAmount").alias("TotalAmountDecember"),
            sum("Quantity").alias("TotalQuantityDecember"),
            sum("itemsPurchasedLast12Months").alias("TotalItemsPurchasedLast12Months"))
        .orderBy("CustomerId") )
    
    display(inner_join_agg.limit(100))
    

    このクエリでは、dfsalesdfTopPurchasesFromSql の DataFrame を結合し、CustomerId および ProductId と照合しました。 この結合では、TopPurchases SQL プールのテーブル データと 2019 年 12 月の売上に関する Parquet データを結合しています TopPurchases

    CustomerId フィールドと ProductId フィールドでグループ化を行いました。 ProductId フィールドの名前があいまいである (両方の DataFrame に存在する) ため、ProductId の名前を完全に修飾して、その名前を TopPurchases DataFrame で参照する必要がありました ProductId

    次に、各製品への 12 月の支払いの合計金額、12 月の製品項目の合計数、過去 12 か月間に購入した製品項目の合計数の集計を作成しました (3)

    最後に、結合および集計されたデータをテーブル ビューで表示しました。

    [テーブル] ビューの列ヘッダーを選択して、結果セットを並べ替えてみてください。

    セルの内容と出力が表示されています。