Lakeflow Spark 宣言型パイプライン (SDP) には、パイプラインで具体化されたビューとストリーミング テーブルを定義するための新しい Python コードコンストラクトがいくつか導入されています。 パイプライン開発のための Python サポートは、PySpark DataFrame API と Structured Streaming API の基本に基づいています。
Python と DataFrame に慣れていないユーザーの場合は、SQL インターフェイスを使うことをお勧めします。 SQL を使用した Lakeflow Spark 宣言パイプライン コードの開発に関するページを参照してください。
Lakeflow SDP Python 構文の完全なリファレンスについては、「 Lakeflow Spark 宣言パイプライン Python 言語リファレンス」を参照してください。
パイプライン開発のための Python の基本
pipline データセットを作成する Python コードは、DataFrames を返す必要があります。
すべての Lakeflow Spark 宣言パイプライン Python API は、 pyspark.pipelines モジュールに実装されます。 Python で実装されたパイプライン コードでは、Python ソースの先頭にある pipelines モジュールを明示的にインポートする必要があります。 この例では、次の import コマンドを使用し、例の dp を使用して pipelinesを参照します。
from pyspark import pipelines as dp
注
Apache Spark には、™ モジュールから入手できる Spark 4.1 以降のpyspark.pipelinesが含まれています。 Databricks ランタイムは、マネージド 運用で使用するための追加の API と統合によって、これらのオープン ソース機能を拡張します。
オープン ソース pipelines モジュールで記述されたコードは、Azure Databricks で変更なしで実行されます。 次の機能は Apache Spark の一部ではありません。
dp.create_auto_cdc_flowdp.create_auto_cdc_from_snapshot_flow@dp.expect(...)@dp.temporary_view
パイプラインの読み取りと書き込みの既定値は、パイプラインの構成時に指定されたカタログとスキーマです。 「ターゲット カタログとスキーマのを設定する」を参照してください。
パイプライン固有の Python コードは、1 つの重要な方法で他の種類の Python コードとは異なります。Python パイプライン コードでは、データセットを作成するためにデータ インジェストと変換を実行する関数は直接呼び出されません。 代わりに、SDP は、パイプラインで構成されているすべてのソース コード ファイル内の dp モジュールのデコレーター関数を解釈し、データフロー グラフを構築します。
Important
パイプラインの実行時に予期しない動作が発生しないよう、データセットを定義する関数に、副作用がある可能性のあるコードを含めないでください。 詳しくは、Python のリファレンスに関する記事をご覧ください。
Python を使用して具体化されたビューまたはストリーミング テーブルを作成する
@dp.tableを使用して、ストリーミング読み取りの結果からストリーミング テーブルを作成します。
@dp.materialized_viewを使用して、バッチ読み取りの結果から具体化されたビューを作成します。
既定では、具体化されたビュー名とストリーミング テーブル名は関数名から推論されます。 次のコード例は、具体化されたビューとストリーミング テーブルを作成するための基本的な構文を示しています。
注
どちらの関数も、 samples カタログ内の同じテーブルを参照し、同じデコレーター関数を使用します。 これらの例では、具体化されたビューとストリーミング テーブルの基本的な構文の唯一の違いは、 spark.read と spark.readStreamを使用していることです。
すべてのデータ ソースがストリーミング読み取りをサポートしているわけではありません。 一部のデータ ソースは、常にストリーミング セマンティクスで処理する必要があります。
from pyspark import pipelines as dp
@dp.materialized_view()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dp.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
必要に応じて、name デコレーターの @dp.table 引数を使用してテーブル名を指定できます。 次の例は、具体化されたビューとストリーミング テーブルのこのパターンを示しています。
from pyspark import pipelines as dp
@dp.materialized_view(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dp.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
オブジェクト ストレージからデータを読み込む
パイプラインでは、Azure Databricks でサポートされているすべての形式からのデータの読み込みがサポートされています。 「データ形式のオプション」を参照してください。
注
これらの例では、ワークスペースに自動的にマウントされた /databricks-datasets で使用可能なデータを使用します。 Databricks では、クラウド オブジェクト ストレージに格納されているデータを参照するために、ボリューム パスまたはクラウド URI を使用することをお勧めします。
「Unity カタログ ボリュームとは」を参照してください。
Databricks では、クラウド オブジェクト ストレージに格納されているデータに対して増分インジェスト ワークロードを構成するときに、自動ローダーとストリーミング テーブルを使用することをお勧めします。 「自動ローダーとは」を参照してください。
次の例では、自動ローダーを使用して JSON ファイルからストリーミング テーブルを作成します。
from pyspark import pipelines as dp
@dp.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
次の例では、バッチ セマンティクスを使用して JSON ディレクトリを読み取り、具体化されたビューを作成します。
from pyspark import pipelines as dp
@dp.materialized_view()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
期待値を使用してデータを検証する
期待値を使用して、データ品質の制約を設定および適用できます。 パイプラインの期待を使用してデータ品質を管理する方法については、を参照してください。
次のコードでは、 @dp.expect_or_drop を使用して、データ インジェスト中に null レコードを削除する valid_data という名前の期待値を定義します。
from pyspark import pipelines as dp
@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
パイプラインで定義された具体化されたビューとストリーミング テーブルに対してクエリを実行する
次の例では、4 つのデータセットを定義します。
- JSON データを読み込む
ordersという名前のストリーミング テーブル。 - CSV データを読み込む
customersという名前の具体化されたビュー。 -
customer_ordersという名前のマテリアライズドビューは、ordersおよびcustomersデータセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_id、order_number、state、およびorder_dateフィールドを選択します。 - 各状態の注文の日次数を集計する
daily_orders_by_stateという名前の具体化されたビュー。
注
パイプライン内のビューまたはテーブルに対してクエリを実行する場合は、カタログとスキーマを直接指定することも、パイプラインで構成された既定値を使用することもできます。 この例では、orders、customers、および customer_orders テーブルが書き込まれ、パイプライン用に構成された既定のカタログとスキーマから読み取られます。
従来の発行モードでは、LIVE スキーマを使用して、パイプラインで定義されている他の具体化されたビューとストリーミング テーブルに対してクエリを実行します。 新しいパイプラインでは、LIVE スキーマの構文は暗黙的に無視されます。
LIVE スキーマ (レガシー)を参照してください。
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dp.materialized_view()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dp.materialized_view()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dp.materialized_view()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
for ループでテーブルを作成する
Python for ループを使用して、複数のテーブルをプログラムで作成できます。 これは、少数のパラメーターによって異なるデータ ソースやターゲット データセットが多数ある場合に便利です。その結果、コードの合計が少なくなり、コードの冗長性が低下します。
for ループはロジックをシリアル順に評価しますが、データセットの計画が完了すると、パイプラインはロジックを並列で実行します。
Important
このパターンを使用してデータセットを定義する場合は、 for ループに渡される値の一覧が常に加算型であることを確認します。 パイプラインで以前に定義されたデータセットが将来のパイプライン実行から省略された場合、そのデータセットはターゲット スキーマから自動的に削除されます。
次の例では、リージョン別に顧客の注文をフィルター処理する 5 つのテーブルを作成します。 ここでは、リージョン名を使用して、ターゲットの具体化されたビューの名前を設定し、ソース データをフィルター処理します。 一時ビューは、最終的な具体化されたビューの構築に使用されるソース テーブルからの結合を定義するために使用されます。
from pyspark import pipelines as dp
from pyspark.sql.functions import collect_list, col
@dp.temporary_view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dp.temporary_view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dp.materialized_view(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
このパイプラインのデータ フロー グラフの例を次に示します。
トラブルシューティング: for ループによって、同じ値を持つ多数のテーブルが作成される
パイプラインが Python コードの評価に使用する遅延実行モデルでは、 @dp.materialized_view() によって修飾された関数が呼び出されたときに、ロジックが個々の値を直接参照する必要があります。
次の例では、 for ループを使用してテーブルを定義する 2 つの正しい方法を示します。 どちらの例でも、 tables リストの各テーブル名は、 @dp.materialized_view()によって修飾された関数内で明示的に参照されます。
from pyspark import pipelines as dp
# Create a parent function to set local variables
def create_table(table_name):
@dp.materialized_view(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dp.materialized_view()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dp.materialized_view(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
次の例は 参照を正しくしていません。 この例では、個別の名前を持つテーブルを作成しますが、すべてのテーブルは、 for ループの最後の値からデータを読み込みます。
from pyspark import pipelines as dp
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dp.materialized(name=t_name)
def create_table():
return spark.read.table(t_name)
具体化されたビューまたはストリーミング テーブルからレコードを完全に削除する
GDPR コンプライアンスの場合など、削除ベクトルが有効になっている具体化されたビューまたはストリーミング テーブルからレコードを完全に削除するには、オブジェクトの基になる Delta テーブルに対して追加の操作を実行する必要があります。 具体化されたビューからレコードを確実に削除するには、「削除ベクターを有効にしてマテリアライズドビューからレコードを完全に削除する」を参照してください。 ストリーミング テーブルからレコードを確実に削除するには、「ストリーミング テーブル からレコードを完全に削除する」を参照してください。