次の方法で共有


Lakeflow Spark 宣言型パイプラインの Python 言語リファレンス

このセクションでは、Lakeflow Spark 宣言パイプライン (SDP) Python プログラミング インターフェイスの詳細について説明します。

pipelines モジュールの概要

Lakeflow Spark 宣言パイプライン Python 関数は、 pyspark.pipelines モジュールで定義されます ( dpとしてインポートされます)。 Python API で実装されたパイプラインでは、次のモジュールをインポートする必要があります。

from pyspark import pipelines as dp

パイプライン モジュールは、パイプラインのコンテキストでのみ使用できます。 パイプラインの外部で実行されている Python では使用できません。 パイプライン コードの編集の詳細については、「 Lakeflow Pipelines Editor を使用した ETL パイプラインの開発とデバッグ」を参照してください。

Apache Spark™ パイプライン

Apache Spark には、 モジュールから入手できる Spark 4.1 以降のpyspark.pipelinesが含まれています。 Databricks ランタイムは、マネージド 運用で使用するための追加の API と統合によって、これらのオープン ソース機能を拡張します。

オープンソースの pipelines モジュールで記述されたコードは、Azure Databricks で変更なしで実行されます。 次の機能は Apache Spark の一部ではありません。

  • dp.create_auto_cdc_flow
  • dp.create_auto_cdc_from_snapshot_flow
  • @dp.expect(...)
  • @dp.temporary_view

pipelines モジュールは、以前は Azure Databricks でdltと呼ばれていました。 Apache Spark との違いの詳細と詳細については、「@dlt変更点」を参照してください。

データセット定義の関数

パイプラインでは、具体化されたビューやストリーミング テーブルなどのデータセットを定義するために Python デコレーターを使用します。 データセットを定義する関数を参照してください。

API リファレンス

Python パイプラインに関する考慮事項

Lakeflow Spark 宣言パイプライン (SDP) Python インターフェイスを使用してパイプラインを実装する場合の重要な考慮事項を次に示します。

  • SDP は、計画とパイプラインの実行中にパイプラインを複数回定義するコードを評価します。 データセットを定義する Python 関数には、テーブルまたはビューを定義するために必要なコードのみを含める必要があります。 データセット定義に含まれる任意の Python ロジックは、予期しない動作につながる可能性があります。
  • データセット定義にカスタム監視ロジックを実装しないでください。 イベント フックを使用したパイプラインのカスタム監視の定義を参照してください。
  • データセットの定義に使用する関数は、Spark DataFrame を返す必要があります。 返された DataFrame に関連しないロジックをデータセット定義に含めないでください。
  • パイプライン データセット コードの一部として、ファイルまたはテーブルを保存または書き込むメソッドは使用しないでください。

パイプライン コードで使用してはならない Apache Spark 操作の例を次に示します。

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

@dltはどうなりましたか?

以前は、Azure Databricks はパイプライン機能をサポートするために dlt モジュールを使用しました。 dlt モジュールは、pyspark.pipelines モジュールに置き換えられました。 dltは引き続き使用できますが、Databricks ではpipelinesを使用することをお勧めします。

DLT、SDP、Apache Spark の違い

次の表は、DLT、Lakeflow Spark 宣言パイプライン、Apache Spark 宣言パイプラインの構文と機能の違いを示しています。

Area DLT 構文 SDP 構文 (該当する場合は Lakeflow と Apache) Apache Spark で使用可能
インポート import dlt from pyspark import pipelines (as dp、必要に応じて) イエス
ストリーミング テーブル @dlt.table ストリーミング データフレームを使用する @dp.table イエス
マテリアライズド・ビュー @dlt.table バッチ データフレームを使用する @dp.materialized_view イエス
表示 @dlt.view @dp.temporary_view イエス
追加フロー @dlt.append_flow @dp.append_flow イエス
SQL – ストリーミング CREATE STREAMING TABLE ... CREATE STREAMING TABLE ... イエス
SQL – マテリアライズド CREATE MATERIALIZED VIEW ... CREATE MATERIALIZED VIEW ... イエス
SQL – フロー CREATE FLOW ... CREATE FLOW ... イエス
イベント ログ spark.read.table("event_log") spark.read.table("event_log") いいえ
変更の適用 (CDC) dlt.apply_changes(...) dp.create_auto_cdc_flow(...) いいえ
Expectations @dlt.expect(...) dp.expect(...) いいえ
連続モード 継続的トリガーを使用したパイプライン構成 (同じ) いいえ
シンク @dlt.create_sink(...) dp.create_sink(...) イエス