次の方法で共有


プライベート パッケージをインストールする

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。

注意

この機能はパブリック プレビュー段階にあります。 ワークフロー オーケストレーション マネージャーは Apache Airflow を利用しています。

Python パッケージは、関連する Python モジュールを単一のディレクトリ階層に整理する方法です。 パッケージは通常、__init__.py という特殊なファイルを含むディレクトリとして表されます。 パッケージ ディレクトリ内には、関数、クラス、変数を定義した複数の Python モジュール ファイル (.py ファイル) を含めることができます。 ワークフロー オーケストレーション マネージャーのコンテキストでは、カスタム コードを追加するパッケージを作成できます。

このガイドでは、Python パッケージのバイナリ配布形式として機能する .whl (Wheel) ファイルをワークフロー オーケストレーション マネージャーにインストールする手順について説明します。

説明のために、dags ファイル内のモジュールとしてインポートできる Python パッケージとして単純なカスタム演算子を作成します。

手順 1: カスタム演算子とそれをテストするファイルを作成します。

  • sample_operator.py ファイルを作成します
from airflow.models.baseoperator import BaseOperator


class SampleOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = f"Hello {self.name}"
        return message
from datetime import datetime
from airflow import DAG

from airflow_operator.sample_operator import SampleOperator


with DAG(
   "test-custom-package",
   tags=["example"]
   description="A simple tutorial DAG",
   schedule_interval=None,
   start_date=datetime(2021, 1, 1),
) as dag:
    task = SampleOperator(task_id="sample-task", name="foo_bar")

    task

手順 2: ストレージ コンテナーを作成する。

Azure portal を使用して BLOB コンテナーを管理する」の手順に従って、dag ファイルとパッケージ ファイルをアップロードするためのストレージ アカウントを作成します。

手順 3: プライベート パッケージをストレージ アカウントにアップロードする。

  1. Airflow DAG とプラグイン ファイルを格納することにした所定のコンテナーに移動します。
  2. このコンテナーにプライベート パッケージ ファイルをアップロードします。 一般的なファイル形式には、zip.whltar.gz があります。 適切な "Dags" フォルダーまたは "Plugins" フォルダー内にファイルを配置します。

手順 4: プライベート パッケージを要件として追加する。

requirements.txt ファイルに、要件の 1 つとしてプライベート パッケージを追加します。 このファイルがまだ存在しない場合は、新たに追加します。 Git 同期の場合は、UI 自体にすべての要件を追加する必要があります。

  • Blob Storage - パッケージ パスの前には必ずプレフィックス "/opt/airflow" を付けてください。 たとえば、プライベート パッケージが "/dags/test/private.whl" にある場合、requirements.txt ファイルには "/opt/airflow/dags/test/private.whl" という要件を指定します。

  • Git 同期 - すべての Git サービスについて、"/opt/airflow/git/<repoName>.git/" をパッケージ パスの先頭に付加します。 たとえば、プライベート パッケージが GitHub リポジトリの "/dags/test/private.whl" にある場合は、要件 "/opt/airflow/git/<repoName>.git/dags/test/private.whl" を Airflow 環境に追加する必要があります。

  • ADO - ADO の場合は、パッケージ パスの先頭に "/opt/airflow/git/<repoName>/" を付加します。

手順 5: フォルダーを Airflow 統合ランタイム (IR) 環境にインポートする。

フォルダーを Airflow IR 環境にインポートする処理を実行する場合は、[インポートの要件] チェック ボックスをオンにして、要件を Airflow 環境に読み込ませてください。

Airflow 統合ランタイム環境に対し、インポート ダイアログで [インポートの要件] チェック ボックスをオンにした状態のスクリーンショット。

Airflow 統合ランタイム環境でインポートされた要件ダイアログを示すスクリーンショット。

手順 6: Airflow UI 内で、手順 1 で作成した dag ファイルを実行して、インポートが成功したかどうかを確認する。

次のステップ