次の方法で共有


ワークフロー オーケストレーション マネージャーを使用した CI/CD パターン

ワークフロー オーケストレーション マネージャーは Apache Airflow を利用しています。

ワークフロー オーケストレーション マネージャーは、Apache Airflow 環境を簡単かつ効率的に作成および管理する方法を提供します。 このサービスを使用すると、大規模なデータ パイプラインを簡単に実行できます。 ワークフロー オーケストレーション マネージャーで DAG を実行するには、主に 2 つの方法があります。 BLOB ストレージに DAG ファイルをアップロードし、Airflow 環境にリンクすることができます。 または、Git 同期機能を使用して、Git リポジトリを Airflow 環境と自動的に同期することもできます。

Airflow のデータ パイプラインを操作するには、ワークフローのニーズに基づいて、DAG、プラグイン、要件ファイルを頻繁に作成または更新する必要があります。 開発者は BLOB ストレージ内の DAG ファイルを手動でアップロードしたり編集したりできますが、多くの組織では、コードの展開には継続的インテグレーションと継続的デリバリー (CI/CD) アプローチを使用することを好みます。 この記事では、Apache Airflow DAG をワークフロー オーケストレーション マネージャーとシームレスに統合して展開するための推奨される展開パターンについて説明します。

CI/CD について

継続的インテグレーション

継続的インテグレーション は、コード変更を共有リポジトリに頻繁かつ自動化して統合することを重視するソフトウェア開発プラクティスです。 これには、開発者が定期的にコードをコミットする操作と、各コミット時に、自動化された CI パイプラインがコードをビルドし、テストを実行し、検証チェックを実行する操作が含まれます。 主な目標は、開発プロセスの早い段階で統合の問題を検出して対処し、開発者に迅速なフィードバックを提供することです。

CI は、常にテスト可能で展開可能な状態にコードベースを維持します。 このプラクティスにより、コードの品質、コラボレーション、バグが重大な問題になる前にバグをキャッチして修正する機能が強化されます。

継続的デリバリー

継続的デリバリーは、自動化をさらに 1 ステップ進める CI の拡張機能です。 CI は統合とテストのフェーズの自動化に重点を置いていますが、CD は運用環境またはその他のターゲット環境へのコード変更の展開を自動化します。 このプラクティスは、組織がソフトウェア更新プログラムを迅速かつ確実にリリースするのに役立ちます。 手動展開の間違いを減らし、承認されたコード変更がユーザーに迅速に配信されるようにします。

ワークフロー オーケストレーション マネージャー内の CI/CD ワークフロー

ワークフロー オーケストレーション マネージャーで使用できる CI/CD パターンを示すスクリーンショット。

Dev/QA 統合ランタイムとの Git 同期

ワークフロー オーケストレーション マネージャー環境を Git リポジトリの開発/QA ブランチにマップします。

Dev/QA 統合ランタイムでの CI パイプライン

機能ブランチから開発ブランチへの pull request (PR) が作られると、PR パイプラインがトリガーされます。 このパイプラインは、機能ブランチで品質チェックを効率的に実行し、コードの整合性と信頼性を確保するように設計されています。 パイプラインには、次の種類のチェックを含めることができます。

  • Python 依存関係テスト: これらのテストでは、プロジェクトの依存関係が正しく構成されていることを確認するために、Python の依存関係の正確性をインストールして検証します。
  • コード分析とリンティング: コードの品質とコーディング標準への準拠を評価するために、静的コード分析とリンティング用のツールが適用されます。
  • Airflow DAG のテスト: これらのテストでは、DAG 定義のテストや、Airflow DAG 用に設計された単体テストを含む、検証テストが実行されます。
  • Airflow カスタムオペレータ、フック、センサー、トリガーの単体テスト

これらのチェックのいずれかが失敗した場合、パイプラインは終了します。 その場合、特定された問題に対処する必要があります。

運用統合ランタイムとの Git 同期

ワークフロー オーケストレーション マネージャー環境を Git リポジトリの運用ブランチにマップします。

運用統合ランタイムでの PR パイプライン

すべての開発機能がパブリックにアクセス可能にならないように、個別の運用環境を維持することがベスト プラクティスであると考えられています。

機能ブランチが開発ブランチと正常にマージされたら、新しくマージされた機能をパブリックにするために、運用ブランチへの pull request を作成できます。 この pull request により、開発ブランチで迅速な品質チェックを行う PR パイプラインがトリガーされます。 品質チェックにより、すべての機能が正しく統合され、運用環境にエラーがないことを確認します。

ワークフロー オーケストレーション マネージャーで CI/CD ワークフローを使用する利点

  • フェイルファスト アプローチ: CI/CD プロセスの統合なしで、DAG にエラーが含まれていることが初めてわかるのは、GitHub にプッシュされ、ワークフロー オーケストレーション マネージャーと同期され、Import Error をスローするときである可能性があります。 その間にも、このエラーを含むコードをリポジトリから他の開発者が知らずにプルする可能性があり、将来的には非効率性につながる可能性があります。
  • コード品質の向上: 構文検証、必要なインポート、その他のコーディングのベスト プラクティスのチェックなどの基本的なチェックを無視すると、基準に達しないコードを提供する可能性が高くなります。

ワークフロー オーケストレーション マネージャーでの展開パターン

2 つの展開パターンをお勧めします。

パターン 1: ワークフロー オーケストレーション マネージャーでデータ パイプラインを直接開発する

パターン 1 を使用する場合は、ワークフロー オーケストレーション マネージャーでデータ パイプラインを直接開発できます。

前提条件

  • Azure サブスクリプションをお持ちでない場合は、開始する前に 無料アカウント を作成してください。 既存の Data Factory インスタンスをワークフロー オーケストレーション マネージャー プレビューがサポートされているリージョンで作成または選択します。
  • GitHub リポジトリにアクセスする必要があります。

利点

  • ローカル開発環境は不要: ワークフロー オーケストレーション マネージャーは、基になるインフラストラクチャ、更新、メンテナンスを処理して、Airflow クラスターを管理する運用上のオーバーヘッドを削減します。 このサービスを使用すると、インフラストラクチャの管理ではなく、ワークフローの構築と管理に集中できます。
  • スケーラビリティ: ワークフロー オーケストレーション マネージャーは、必要に応じてリソースをスケーリングする自動スケーリング機能を提供し、増加するワークロードやアクティビティのバーストを手動介入なしでデータ パイプラインが確実に処理できるようにします。
  • 監視とログ記録: ワークフロー オーケストレーション マネージャーには、ワークフローの実行の追跡、問題の診断、アラートの設定、パフォーマンスの最適化に役立つ診断ログと監視が含まれています。

ワークフロー

  1. Git 同期機能を使用します。

    このワークフローでは、独自のローカル環境を確立する必要はありません。 代わりに、ワークフロー オーケストレーション マネージャーによって提供される Git 同期機能を使用することで開始できます。 この機能により、DAG ファイルは、Airflow Web サーバー、スケジューラ、ワーカーと自動的に同期されます。 ワークフロー オーケストレーション マネージャー UI を使用して、データ パイプラインを直接開発、テスト、実行できるようになりました。

    ワークフロー オーケストレーション マネージャーの Git 同期機能を使用する方法の詳細について説明します。

  2. 個々の機能ブランチ環境を作成します。

    リポジトリからブランチを選択して、ワークフロー オーケストレーション マネージャーと同期できます。 この機能を使用すると、機能ブランチごとに個別の Airflow 環境を作成できます。 これにより、開発者はデータ パイプラインの特定のタスクに取り組むことができます。

  3. pull request を作成する。

    専用の Airflow 環境内で機能を十分に開発してテストした後、pull request を Airflow 開発環境統合ランタイムに送信します。

パターン 2: ローカルで DAG を開発し、ワークフロー オーケストレーション マネージャーに展開する

パターン 2 を使用する場合は、ローカルで DAG を開発し、ワークフロー オーケストレーション マネージャーに展開できます。

前提条件

  • GitHub リポジトリにアクセスする必要があります。
  • コード リポジトリの少なくとも 1 つのブランチがワークフロー オーケストレーション マネージャーと同期されていることを確認して、サービスのコード変更を確認します。

利点

Azure リソースへのアクセスを管理者のみに制限できます。

ワークフロー

  1. ローカル環境を設定します。

    まず、開発マシンに Apache Airflow 用のローカル開発環境を設定します。 この環境では、DAG やタスクなど、Airflow コードを開発してテストできます。 このアプローチを使用すると、Azure リソースへの直接アクセスに依存せずにパイプラインを開発できます。

  2. Git 同期機能を使用します。

    GitHub リポジトリのブランチをワークフロー オーケストレーション マネージャーと同期します。

    ワークフロー オーケストレーション マネージャーの Git 同期機能を使用する方法の詳細について説明します。

  3. ワークフロー オーケストレーション マネージャーを運用環境として使用します。

    ローカル セットアップでデータ パイプラインの開発とテストに成功したら、ワークフロー オーケストレーション マネージャーと同期されたブランチに pull request を立ち上げることができます。 ブランチがマージされたら、実稼働レベルで、自動スケールや監視、ログ記録などのワークフロー オーケストレーション マネージャー機能を使用します。

CI/CD パイプラインのサンプル

詳細については、以下を参照してください。

  1. Git 同期機能を使用して、ワークフロー オーケストレーション マネージャー統合ランタイムに展開された DAG のコードをコピーします。

    from datetime import datetime
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    with DAG(
        dag_id="airflow-ci-cd-tutorial",
        start_date=datetime(2023, 8, 15),
        schedule="0 0 * * *",
        tags=["tutorial", "CI/CD"]
    ) as dag:
        # Tasks are represented as operators
        task1 = BashOperator(task_id="task1", bash_command="echo task1")
        task2 = BashOperator(task_id="task2", bash_command="echo task2")
        task3 = BashOperator(task_id="task3", bash_command="echo task3")
        task4 = BashOperator(task_id="task4", bash_command="echo task4")
    
        # Set dependencies between tasks
        task1 >> task2 >> task3 >> task4
    
  2. CI/CD パイプラインを作成します。 Azure DevOps アクションと GitHub アクションの 2 つのオプションがあります。

    1. Azure DevOps オプション: ファイル azure-devops-ci-cd.yaml を作成し、次のコードをコピーします。 パイプラインは、開発ブランチへの pull request または push request に対しトリガーされます。

      trigger:
      - dev
      
      pr:
      - dev
      
      pool:
        vmImage: ubuntu-latest
      strategy:
        matrix:
          Python3.11:
            python.version: '3.11.5'
      
      steps:
      - task: UsePythonVersion@0
        inputs:
          versionSpec: '$(python.version)'
        displayName: 'Use Python $(python.version)'
      
      - script: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
        displayName: 'Install dependencies'
      
      - script: |
          airflow webserver &
          airflow db init
          airflow scheduler &
          pytest
        displayName: 'Pytest'
      

      詳細については、「Azure Pipelines」を参照してください。

    2. GitHub Actions オプション: GitHub リポジトリに .github/workflows ディレクトリを作成します。

      1. .github/workflows ディレクトリに github-actions-ci-cd.yml という名前のファイルを作成します。

      2. 次のコードをコピーします。 パイプラインは、開発ブランチへの pull request または push request があれはトリガーされます。

        name: GitHub Actions CI/CD
        
        on:
          pull_request:
            branches:
              - "dev"
          push:
            branches:
              - "dev"
        
        jobs:
          flake8:
            strategy:
              matrix:
                python-version: [3.11.5]
            runs-on: ubuntu-latest
            steps:
              - name: Check out source repository
                uses: actions/checkout@v4
              - name: Setup Python
                uses: actions/setup-python@v4
                with:
                  python-version: ${{matrix.python-version}}
              - name: flake8 Lint
                uses: py-actions/flake8@v1
                with:
                  max-line-length: 120
          tests:
            strategy:
              matrix:
                python-version: [3.11.5]
            runs-on: ubuntu-latest
            needs: [flake8]
            steps:
              - uses: actions/checkout@v4
              - name: Setup Python
                uses: actions/setup-python@v4
                with:
                  python-version: ${{matrix.python-version}}
              - name: Install dependencies
                run: |
                  python -m pip install --upgrade pip
                  pip install -r requirements.txt
              - name: Pytest
                run: |
                  airflow webserver &
                  airflow db init
                  airflow scheduler &
                  pytest tests/
        
  3. tests フォルダーに、Airflow DAG のテストを作成します。 次に例をいくつか示します。

    1. 少なくとも、DAG の整合性と正確性を確保するために、import_errors を使用して初期テストを実施することが重要です。 このテストでは、次のことを確認します:

      • DAG に循環性が含まれていない: 循環性では、タスクがワークフロー内でループまたは循環依存関係を形成します。この場合、予期しない無限の実行ループが発生する可能性があります。

      • インポート エラーがない: 依存関係の不足、モジュール パスの間違い、コーディング エラーなどの問題が原因で、インポート エラーが発生する可能性があります。  

      • タスクが正しく定義されている: DAG 内のタスクが正しく定義されていることを確認します。

        @pytest.fixture()
        
        def dagbag():
            return DagBag(dag_folder="dags")
        
        def test_no_import_errors(dagbag):
            """
            Test Dags to contain no import errors.
            """
            assert not dagbag.import_errors
        
    2. 開発ブランチにマージする前に、特定の DAG ID が機能ブランチに存在することをテストして確認します。

      def test_expected_dags(dagbag):
          """
          Test whether expected dag Ids are present.
          """
          expected_dag_ids = ["airflow-ci-cd-tutorial"]
      
          for dag_id in expected_dag_ids:
              dag = dagbag.get_dag(dag_id)
      
              assert dag is not None
              assert dag_id == dag.dag_id
      
    3. 承認されたタグのみが DAG に関連付けられていることテストして確認します。 このテストは、承認済みのタグの使用を強制するのに役立ちます。

      def test_requires_approved_tag(dagbag):
          """
          Test if DAGS contain one or more tags from list of approved tags only.
          """
          Expected_tags = {"tutorial", "CI/CD"}
          dagIds = dagbag.dag_ids
      
          for id in dagIds:
              dag = dagbag.get_dag(id)
              assert dag.tags
              if Expected_tags:
                  assert not set(dag.tags) - Expected_tags
      
  4. 開発ブランチに pull request を発行すると、GitHub アクションが CI パイプラインをトリガーして、すべてのテストを実行することがわかります。