このクイックスタートでは、Spark Structured Streaming を使用する Python コードを含む Spark ジョブ定義を作成し、レイクハウスにデータを配置し、SQL 分析エンドポイントを介して提供する方法について説明します。 このクイックスタートを完了すると、継続的に実行される Spark ジョブ定義が作成され、SQL 分析エンドポイントで受信データを表示できるようになります。
Python スクリプトを作成する
Spark 構造化ストリーミングを利用する次の Python コードを使用して、レイクハウス テーブル内のデータを取得します。
import sys from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.appName("MyApp").getOrCreate() tableName = "streamingtable" deltaTablePath = "Tables/" + tableName df = spark.readStream.format("rate").option("rowsPerSecond", 1).load() query = df.writeStream.outputMode("append").format("delta").option("path", deltaTablePath).option("checkpointLocation", deltaTablePath + "/checkpoint").start() query.awaitTermination()
スクリプトを Python ファイル (.py) としてローカル コンピューターに保存します。
レイクハウスを作成する
次の手順を使用して、レイクハウスを作成します。
Microsoft Fabric ポータルにサインインします。
目的のワークスペースに移動するか、必要に応じて新しいワークスペースを作成します。
レイクハウスを作成するには、ワークスペースから [新しい項目
選択し、開いたパネルで Lakehouse を選択します。レイクハウスの名前を入力し、[作成] を選択します。
Spark ジョブ定義を作成する
ジョブ定義を作成するには、次の手順を実行します。
レイクハウスを作成したのと同じワークスペースから、[新しいアイテム]
選択します。 開いたパネルで、データの取得の下にある Spark ジョブ定義を選択します。
Spark ジョブ定義の名前を入力し、[作成] を選択します。
[アップロード] を選択し、前の手順で作成した Python ファイルを選択します。
[レイクハウス リファレンス] で、作成したレイクハウスを選択します。
Spark ジョブ定義の再試行ポリシーの設定
Spark ジョブ定義の再試行ポリシーを設定するには、次の手順に従います。
トップ メニューから、[設定] アイコンを選択します。
[最適化] タブを開き、[再試行ポリシー] のトリガーを [オン] に設定します。
最大再試行回数を定義するか、[無制限の試行を許可する] をチェックします。
再試行の間隔を指定し、[適用] を選択します。
Note
再試行ポリシーのセットアップでは、有効期間に 90 日の制限があります。 その再試行ポリシーが有効になると、ポリシーに従って 90 日以内にそのジョブは再起動されます。 この期間が過ぎると、その再試行ポリシーは自動的に機能しなくなり、そのジョブは終了します。 その後、ユーザーはそのジョブを手動で再起動する必要があります。これにより、今度は再試行ポリシーがもう一度有効になります。
Spark ジョブ定義の実行と監視
トップ メニューから、[実行] アイコンを選択します。
Spark ジョブ定義が正常に送信され、実行されているかどうかを確認します。
SQL 分析エンドポイントを使用してデータを表示する
ワークスペース ビューで、レイクハウスを選択します。
右側の [レイクハウス] を選択し、[SQL 分析エンドポイント] を選択します。
[テーブル] の [SQL 分析エンドポイント] ビューで、スクリプトがデータの配置に使用するテーブルを選択します。 その後、SQL 分析エンドポイントからデータをプレビューできます。