pyspark.pipelines (ここではdpとしてエイリアス化) モジュールは、デコレーターを使用してコア機能の多くを実装します。 これらのデコレーターは、ストリーミング クエリまたはバッチ クエリを定義し、Apache Spark DataFrame を返す関数を受け入れます。 次の構文は、パイプライン データセットを定義するための簡単な例を示しています。
from pyspark import pipelines as dp
@dp.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
このページでは、パイプラインでデータセットを定義する関数とクエリの概要について説明します。 使用可能なデコレーターの完全な一覧については、「 パイプライン開発者向けリファレンス」を参照してください。
データセットの定義に使用する関数には、サードパーティの API の呼び出しなど、データセットに関係のない任意の Python ロジックを含めないようにしてください。 パイプラインは、計画、検証、更新中にこれらの関数を複数回実行します。 任意のロジックを含めると、予期しない結果になる可能性があります。
データを読み取ってデータセット定義を開始する
パイプライン データセットの定義に使用される関数は、通常、 spark.read または spark.readStream 操作で始まります。 これらの読み取り操作は、DataFrame を返す前に追加の変換を定義するために使用する静的またはストリーミング DataFrame オブジェクトを返します。 DataFrame を返す Spark 操作のその他の例としては、 spark.tableや spark.rangeなどがあります。
関数は、関数の外部で定義されている DataFrame を参照しないでください。 別のスコープで定義されている DataFrame を参照しようとすると、予期しない動作が発生する可能性があります。 複数のテーブルを作成するためのメタプログラミング パターンの例については、「for ループでのテーブルの作成」を参照してください。
次の例は、バッチまたはストリーミング ロジックを使用してデータを読み取るための基本的な構文を示しています。
from pyspark import pipelines as dp
# Batch read on a table
@dp.materialized_view()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dp.materialized_view()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dp.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dp.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
外部 REST API からデータを読み取る必要がある場合は、Python カスタム データ ソースを使用してこの接続を実装します。 PySpark カスタム データ ソースを参照してください。
注
Pandas DataFrames、dict、リストなど、Python のデータ コレクションから任意の Apache Spark DataFrame を作成できます。 これらのパターンは、開発とテスト中に役立つ場合がありますが、ほとんどの運用パイプライン データセット定義は、ファイル、外部システム、または既存のテーブルまたはビューからデータを読み込むことで開始する必要があります。
変換の連鎖
パイプラインでは、ほぼすべての Apache Spark DataFrame 変換がサポートされています。 データセット定義関数には任意の数の変換を含めることができますが、使用するメソッドが常に DataFrame オブジェクトを返すようにする必要があります。
複数のダウンストリーム ワークロードを駆動する中間変換があるが、テーブルとして具体化する必要がない場合は、 @dp.temporary_view() を使用してパイプラインに一時ビューを追加します。 その後、複数のダウンストリーム データセット定義で spark.read.table("temp_view_name") を使用して、このビューを参照できます。 次の構文は、このパターンを示しています。
from pyspark import pipelines as dp
@dp.temporary_view()
def a():
return spark.read.table("source").filter(...)
@dp.materialized_view()
def b():
return spark.read.table("a").groupBy(...)
@dp.materialized_view()
def c():
return spark.read.table("a").groupBy(...)
これにより、パイプラインの計画中にパイプラインがビュー内の変換を完全に認識し、データセット定義の外部で実行されている任意の Python コードに関連する潜在的な問題を防ぐことができます。
関数内では、次の例のように、DataFrame を連結して、増分結果をビュー、具体化されたビュー、ストリーミング テーブルとして書き込まずに新しい DataFrame を作成できます。
from pyspark import pipelines as dp
@dp.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
すべての DataFrame がバッチ ロジックを使用して最初の読み取りを実行する場合、返される結果は静的な DataFrame になります。 ストリーミングしているクエリがある場合、返される結果はストリーミング DataFrame になります。
DataFrame を返す
@dp.tableを使用して、ストリーミング読み取りの結果からストリーミング テーブルを作成します。
@dp.materialized_viewを使用して、バッチ読み取りの結果から具体化されたビューを作成します。 他のほとんどのデコレーターは、ストリーミングデータフレームと静的データフレームの両方で動作しますが、ストリーミングデータフレームが必要なデコレーターがいくつかあります。
データセットの定義に使用する関数は、Spark DataFrame を返す必要があります。 パイプライン データセット コードの一部として、ファイルまたはテーブルを保存または書き込むメソッドは使用しないでください。
パイプライン コードで使用してはならない Apache Spark 操作の例を次に示します。
collect()count()toPandas()save()saveAsTable()start()toTable()
注
パイプラインでは、データセット定義関数に対する Spark での Pandas の使用もサポートされています。 Spark の Pandas API を参照してください。
Python パイプラインで SQL を使用する
PySpark では、SQL を使用して DataFrame コードを記述する spark.sql 演算子がサポートされています。 パイプライン ソース コードでこのパターンを使用すると、具体化されたビューまたはストリーミング テーブルにコンパイルされます。
次のコード例は、データセット クエリ ロジックに spark.read.table("catalog_name.schema_name.table_name") を使用することと同じです。
@dp.materialized_view
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read および dlt.read_stream (レガシ)
以前の dlt モジュールには、レガシ パイプライン発行モードの機能をサポートするために導入された dlt.read() 関数と dlt.read_stream() 関数が含まれています。 これらのメソッドはサポートされていますが、Databricks では、次の理由から常に spark.read.table() 関数と spark.readStream.table() 関数を使用することをお勧めします。
-
dlt関数では、現在のパイプラインの外部で定義されたデータセットの読み取りが制限されています。 -
spark関数は、skipChangeCommitsなどの読み取り操作のオプションの指定をサポートしています。 オプションの指定は、dlt関数ではサポートされていません。 -
dltモジュール自体は、pyspark.pipelinesモジュールに置き換えられました。 Databricks では、Python でパイプライン コードを記述するときに使用するために、from pyspark import pipelines as dpを使用してpyspark.pipelinesをインポートすることをお勧めします。