適用対象: Azure Data Factory
Azure Synapse Analytics
ヒント
企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。
注意
この機能はパブリック プレビュー段階にあります。 ワークフロー オーケストレーション マネージャーは Apache Airflow を利用しています。
Python パッケージは、関連する Python モジュールを単一のディレクトリ階層に整理する方法です。 パッケージは通常、__init__.py
という特殊なファイルを含むディレクトリとして表されます。 パッケージ ディレクトリ内には、関数、クラス、変数を定義した複数の Python モジュール ファイル (.py ファイル) を含めることができます。
ワークフロー オーケストレーション マネージャーのコンテキストでは、カスタム コードを追加するパッケージを作成できます。
このガイドでは、Python パッケージのバイナリ配布形式として機能する .whl
(Wheel) ファイルをワークフロー オーケストレーション マネージャーにインストールする手順について説明します。
説明のために、dags ファイル内のモジュールとしてインポートできる Python パッケージとして単純なカスタム演算子を作成します。
手順 1: カスタム演算子とそれをテストするファイルを作成します。
sample_operator.py
ファイルを作成します
from airflow.models.baseoperator import BaseOperator
class SampleOperator(BaseOperator):
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = f"Hello {self.name}"
return message
このファイルの Python パッケージを作成するには、「ガイド: Python でのパッケージの作成」を参照してください
手順 1 で定義された演算子をテストする dag ファイル
sample_dag.py
を作成します。
from datetime import datetime
from airflow import DAG
from airflow_operator.sample_operator import SampleOperator
with DAG(
"test-custom-package",
tags=["example"]
description="A simple tutorial DAG",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
) as dag:
task = SampleOperator(task_id="sample-task", name="foo_bar")
task
手順 2: ストレージ コンテナーを作成する。
「Azure portal を使用して BLOB コンテナーを管理する」の手順に従って、dag ファイルとパッケージ ファイルをアップロードするためのストレージ アカウントを作成します。
手順 3: プライベート パッケージをストレージ アカウントにアップロードする。
- Airflow DAG とプラグイン ファイルを格納することにした所定のコンテナーに移動します。
- このコンテナーにプライベート パッケージ ファイルをアップロードします。 一般的なファイル形式には、
zip
、.whl
、tar.gz
があります。 適切な "Dags" フォルダーまたは "Plugins" フォルダー内にファイルを配置します。
手順 4: プライベート パッケージを要件として追加する。
requirements.txt ファイルに、要件の 1 つとしてプライベート パッケージを追加します。 このファイルがまだ存在しない場合は、新たに追加します。 Git 同期の場合は、UI 自体にすべての要件を追加する必要があります。
Blob Storage - パッケージ パスの前には必ずプレフィックス "/opt/airflow" を付けてください。 たとえば、プライベート パッケージが "/dags/test/private.whl" にある場合、requirements.txt ファイルには "/opt/airflow/dags/test/private.whl" という要件を指定します。
Git 同期 - すべての Git サービスについて、"/opt/airflow/git/
<repoName>
.git/" をパッケージ パスの先頭に付加します。 たとえば、プライベート パッケージが GitHub リポジトリの "/dags/test/private.whl" にある場合は、要件 "/opt/airflow/git/<repoName>
.git/dags/test/private.whl" を Airflow 環境に追加する必要があります。ADO - ADO の場合は、パッケージ パスの先頭に "/opt/airflow/git/
<repoName>
/" を付加します。
手順 5: フォルダーを Airflow 統合ランタイム (IR) 環境にインポートする。
フォルダーを Airflow IR 環境にインポートする処理を実行する場合は、[インポートの要件] チェック ボックスをオンにして、要件を Airflow 環境に読み込ませてください。