パイプライン コードの開発とテストは、他の Apache Spark ワークロードとは異なります。 この記事では、パイプライン コードを開発する際にサポートされる機能、ベスト プラクティス、および考慮事項の概要について説明します。 その他の推奨事項とベスト プラクティスについては、「 ソフトウェア開発と DevOps のベスト プラクティスをパイプラインに適用する」を参照してください。
注
コードを検証したり、更新を実行したりするには、ソース コードをパイプライン構成に追加する必要があります。 「 パイプラインの構成」を参照してください。
パイプライン ソース コードに対して有効なファイルは何ですか?
パイプライン コードには、Python または SQL を使用できます。 1 つのパイプラインをバックアップする Python ファイルと SQL ソース コード ファイルを混在させることができますが、各ファイルに含めることができる言語は 1 つだけです。 Python を使用したパイプライン コードの開発と SQL を使用した Lakeflow Spark 宣言型パイプライン コードの開発に関するページを参照してください。
パイプラインのソース ファイルは、ワークスペースに格納されます。 ワークスペース ファイルは、Lakeflow Pipelines エディターで作成された Python または SQL スクリプトを表します。 また、任意の IDE でファイルをローカルで編集し、ワークスペースに同期することもできます。 ワークスペース内のファイルの詳細については、「ワークスペース ファイルとは」を参照してください。 Lakeflow パイプライン エディターを使用した編集の詳細については、「Lakeflow パイプライン エディターを使用した ETL パイプラインの開発とデバッグ」を参照してください。 ローカル IDE でのコードの作成については、「ローカル 開発環境でのパイプライン コードの開発」を参照してください。
Python コードをモジュールまたはライブラリとして開発する場合は、コードをインストールしてインポートし、ソース コードとして構成された Python ファイルからメソッドを呼び出す必要があります。 パイプラインの Python 依存関係の管理に関するページを参照してください。
注
Python ノートブックで任意の SQL コマンドを使用する必要がある場合は、構文パターン spark.sql("<QUERY>") を使用して、SQL を Python コードとして実行できます。
Unity カタログ関数を使用すると、SQL で使用する任意の Python ユーザー定義関数を登録できます。 Unity カタログのユーザー定義関数 (UDF) を参照してください。
パイプライン開発機能の概要
パイプラインは、多くの Azure Databricks 開発機能を拡張して活用し、新しい機能と概念を導入します。 次の表に、パイプライン コード開発をサポートする概念と機能の概要を示します。
| 特徴 | Description |
|---|---|
| 開発モード | パイプラインを対話形式で実行する (Lakeflow Pipelines エディターを使用して更新することを選択する) と、 開発モードが使用されます。 スケジュールまたは自動トリガーを使用して自動的に実行すると、新しいパイプラインは開発モードをオフにして実行されます。 開発モードを参照してください。 |
| ドライラン | ドライラン更新では、テーブルに対して更新を実行することなく、パイプライン ソース コードの正確性が検証されます。 「テーブルが更新されるのを待たずにパイプラインでエラーを確認する」を参照してください。 |
| Lakeflow パイプライン エディター | パイプラインのソース コードとして構成された Python ファイルと SQL ファイルには、コードの検証と更新の実行のための対話型オプションが用意されています。 Lakeflow パイプライン エディターを使用した ETL パイプラインの開発とデバッグを参照してください。 |
| パラメーター | ソース コードとパイプライン構成のパラメーターを活用して、テストと拡張性を簡素化します。 「パイプラインでパラメーターを使用する」を参照してください。 |
| Databricks アセット バンドル | Databricks アセット バンドルを使用すると、パイプライン構成とソース コードをワークスペース間で移動できます。 「パイプラインを Databricks アセット バンドル プロジェクトに変換する」を参照してください。 |
開発とテスト用のサンプル データセットを作成する
Databricks では、開発データセットとテスト データセットを作成して、予想されるデータと、形式が正しくないレコードや破損している可能性のあるレコードを含むパイプライン ロジックをテストすることをお勧めします。 次のような、開発とテストに役立つデータセットを作成する方法は複数あります。
- 運用データセットからデータのサブセットを選択します。
- PII を含むソースには、匿名化または人為的に生成されたデータを使用します。
fakerライブラリを使用してテスト用のデータを生成するチュートリアルについては、「チュートリアル: 変更データ キャプチャを使用して ETL パイプラインを構築する」を参照してください。 - ダウンストリーム変換ロジックに基づいて、明確に定義された結果を含むテスト データを作成します。
- データ スキーマの期待を損なうレコードを作成して、潜在的なデータ破損、形式が正しくないレコード、アップストリームのデータ変更を予測します。
たとえば、次のコードを使用してデータセットを定義するファイルがある場合です。
CREATE OR REFRESH STREAMING TABLE input_data
AS SELECT * FROM STREAM read_files(
"/production/data",
format => "json")
次のようなクエリを使用して、レコードのサブセットを含むサンプル データセットを作成できます。
CREATE OR REFRESH MATERIALIZED VIEW input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading
次の例では、公開されたデータをフィルター処理して、開発またはテスト用の実稼働データのサブセットを作成する方法を示します。
CREATE OR REFRESH MATERIALIZED VIEW input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY
これらの異なるデータセットを使用するには、変換ロジックを実装するソース コードで複数のパイプラインを作成します。 各パイプラインは、 input_data データセットからデータを読み取ることができますが、環境に固有のデータセットを作成するファイルを含むように構成されています。
パイプライン データセットでデータを処理する方法
次の表では、具体化されたビュー、ストリーミング テーブル、ビューがデータを処理する方法について説明します。
| データセットの型 | 定義されたクエリによってレコードが処理される方法 |
|---|---|
| ストリーミング テーブル | 1 件のレコードは 1 回だけ処理されます。 この種類では追加専用のソースが想定されています。 |
| マテリアライズド・ビュー | その時点のデータ状態における正確な結果を返すために、必要なときにレコードが処理されます。 具体化されたビューは、変換、集計、低速クエリや頻繁に使用される計算の事前計算などのデータ処理タスクに使用する必要があります。 結果は更新の間にキャッシュされます。 |
| 表示 | ビューへのクエリが実行されるたびにレコードが処理されます。 ビューは、パブリック データセットに発行すべきでない中間変換やデータ品質チェックのために使用します。 |
パイプラインで最初のデータセットを宣言する
パイプラインでは、Python と SQL の新しい構文が導入されています。 パイプライン構文の基本については、「 Python を使用したパイプライン コードの開発 」および「SQL を使用した Lakeflow Spark 宣言パイプライン コードの開発」を参照してください。
注
パイプラインはデータセット定義を更新処理から分離し、パイプライン ソースは対話型の実行を意図していません。
パイプラインを構成する方法
パイプラインの設定は、次の 2 つの大きなカテゴリに分類されます。
- パイプライン構文を使用してデータセットを宣言するファイルのコレクション ( ソース コードと呼ばれます) を定義する構成。
- パイプライン インフラストラクチャ、依存関係管理、更新プログラムの処理方法、テーブルをワークスペースに保存する方法を制御する構成。
大半の構成内容は必須ではありませんが、一部については注意が必要です (特に、運用パイプラインの構成時)。 これには以下が含まれます。
- データをパイプラインの外部へ提供できるようにするには、Hive メタストアに発行するためのターゲット スキーマを宣言するか、Unity カタログに発行するためのターゲット カタログとターゲット スキーマを宣言する必要があります。
- データ アクセス許可は実行用のクラスターを介して構成されます。 クラスターにデータ ソース用に構成された適切なアクセス許可と、ターゲット ストレージの場所(指定されている場合) があることを確認します。
Python と SQL を使用してパイプラインのソース コードを記述する方法の詳細については、「 パイプライン SQL 言語リファレンス 」および 「Lakeflow Spark 宣言型パイプライン Python 言語リファレンス」を参照してください。
パイプラインの設定と構成の詳細については、「 パイプラインの構成」を参照してください。
最初のパイプラインをデプロイし、更新をトリガーする
SDP でデータを処理するには、パイプラインを構成します。 パイプラインを構成したら、更新をトリガーして、パイプライン内の各データセットの結果を計算できます。 パイプラインの使用を開始するには、「 チュートリアル: 変更データ キャプチャを使用して ETL パイプラインを構築する」を参照してください。
パイプラインの更新とは
パイプラインにはインフラストラクチャがデプロイされ、ユーザーが更新を開始すると、パイプラインでデータの状態が再計算されます。 更新では、以下の処理が実行されます。
- 正しい構成でクラスターを起動します。
- 定義されているすべてのテーブルとビューを検出し、無効な列名、不足している依存関係、構文エラーなどの分析エラーがないか確認します。
- 使用可能な最新のデータを含むテーブルおよびビューを作成または更新します。
パイプラインは、ユース ケースのコストと待機時間の要件に応じて、継続的またはスケジュールに従って実行できます。 「パイプラインの更新を実行する」を参照してください。
パイプラインを使用してデータを取り込む
パイプラインでは、Azure Databricks で使用できるすべてのデータ ソースがサポートされます。
Databricks では、大半のユース ケースにストリーミング テーブルを使用することを推奨しています。 また、クラウド オブジェクト ストレージに送られてくるファイルの読み込み方法としては自動ローダーを推奨しています。 ほとんどのメッセージ バスからパイプラインを使用してデータを直接取り込むことができます。
クラウド ストレージへのアクセスの構成の詳細については、「クラウド ストレージの構成」を参照してください。
自動ローダーの対応していない形式については、Python または SQL を使用することで、Apache Spark でサポートされているすべての形式をクエリできます。 「パイプラインにデータを読み込む」を参照してください。
データ品質の監視と適用
データセットの内容にデータ品質の制限を適用するには、期待定義を使用します。 制約に違反したレコードの追加を防ぐ従来のデータベースでの CHECK 制約とは異なり、期待値では、データ品質の要件を満たさないデータを処理するときに柔軟性が得られます。 この柔軟性により、煩雑になると予想されるデータと、厳密な品質要件を満たす必要があるデータを処理し保存できます。 パイプラインの期待を使用してデータ品質を管理する方法については、を参照してください。
Lakeflow Spark 宣言パイプラインと Delta Lake はどのように関連していますか?
SDP は Delta Lake の機能を拡張します。 パイプラインによって作成および管理されるテーブルは Delta テーブルであるため、Delta Lake によって提供される保証と機能は同じです。 「Azure Databricks の Delta Lake とは」を参照してください。
パイプラインでは、Delta Lake で設定できる多数のテーブル プロパティに加えて、いくつかのテーブル プロパティが追加されます。 パイプライン プロパティのリファレンスと Delta テーブルのプロパティリファレンスを参照してください。
パイプラインによってテーブルを作成および管理する方法
Azure Databricks は、パイプラインによって作成されたテーブルを自動的に管理し、テーブルの現在の状態を正しく計算するために更新を処理する必要がある方法を決定し、多数のメンテナンスタスクと最適化タスクを実行します。
ほとんどの操作では、パイプラインでターゲット テーブルに対するすべての更新、挿入、および削除を処理できるようにする必要があります。 詳細情報と制限事項については、「削除または更新を手動で実行する手段の確保」を参照してください。
パイプラインによって実行されるメンテナンス タスク
Azure Databricks は、 予測最適化を使用して、最適な周期でパイプラインによって管理されるテーブルに対してメンテナンス タスクを実行します。 メンテナンスによって古いバージョンのテーブルを削除することで、クエリのパフォーマンスを向上させ、コストを削減することができます。 これには、完全な OPTIMIZE 操作と VACUUM 操作が含まれます。 メンテナンス タスクは、予測最適化によって決定されたスケジュールに従って実行されます。これは、前のメンテナンス以降にパイプラインの更新が実行された場合に限られます。
予測最適化が実行される頻度を理解し、メンテナンス コストを理解するには、 予測最適化システムテーブルのリファレンスを参照してください。
制限事項
制限事項の一覧については、「 パイプラインの制限事項」を参照してください。
Unity カタログでのパイプラインの使用に固有の要件と制限事項の一覧については、「パイプラインでの Unity カタログの使用」を参照してください。
その他のリソース
- パイプラインは、Databricks REST API で完全にサポートされています。 パイプライン REST API に関する説明を参照してください。
- パイプラインとテーブルの設定については、「 パイプラインのプロパティのリファレンス」を参照してください。
- パイプライン SQL 言語リファレンス。
- Lakeflow Spark 宣言型パイプラインの Python 言語リファレンス。