使用工作流编排管理器的 CI/CD 模型

重要

2026 年 1 月 1 日,你将无法再使用 ADF 的工作流业务流程管理器创建新的 Airflow 实例。 建议在 2025 年 12 月 31 日之前将所有工作流业务流程管理器(Azure 数据工厂 中的 Apache Airflow)工作负荷迁移到 Microsoft Fabric 中的 Apache Airflow 作业

有关在使用 Microsoft Fabric 迁移到 Apache Airflow 期间获取更多信息或技术支持,请联系微软技术支持。

工作流业务流程管理器提供了一种简单高效的方法来创建和管理 Apache Airflow 环境。 借助该服务,可以轻松大规模运行数据管道。 在工作流业务流程管理器中运行 DAG 有两个主要方法。 可以在 Blob 存储中上传 DAG 文件,并将其链接到 Airflow 环境。 或者,可以使用 Git 同步功能自动将 Git 存储库与 Airflow 环境同步。

在 Airflow 中使用数据管道时,需要根据工作流需求频繁创建或更新 DAG、插件和要求文件。 尽管开发人员可以在 Blob 存储中手动上传或编辑 DAG 文件,但许多组织更喜欢使用持续集成和持续交付(CI/CD)方法进行代码部署。 本文将引导你完成建议的部署模式,以便将 Apache Airflow DAG 与工作流业务流程管理器无缝集成和部署。

了解 CI/CD

持续集成

持续集成是一种软件开发实践,它强调将代码更改频繁自动集成到共享存储库中。 它涉及开发人员定期提交其代码,每次提交时,自动化 CI 管道都会生成代码、运行测试并执行验证检查。 主要目标是在开发过程中尽早检测和解决集成问题,并向开发人员提供快速反馈。

CI 可确保代码库保持持续可测试且可部署的状态。 这种做法能够提升代码质量、协作,并提高在 bug 成为重大问题之前捕获和修复它们的能力。

持续交付

持续交付是持续集成的扩展,它更进一步推动了自动化。 虽然 CI 侧重于自动化集成和测试阶段,但 CD 会自动将代码更改部署到生产环境或其他目标环境。 这种做法可帮助组织快速可靠地发布软件更新。 它减少了手动部署中的错误,并确保将批准的代码更改快速交付给用户。

工作流编排管理器中的 CI/CD 工作流

显示可在工作流业务流程管理器中使用的 CI/CD 模式的屏幕截图。

Git 与 Dev/QA 集成运行时同步

使用 Git 存储库的开发/QA 分支映射工作流业务流程管理器环境。

使用 Dev/QA 集成环境的 CI 管道

当从功能分支向开发分支发出拉取请求(PR)时,会触发 PR 流水线。 此管道旨在有效地对功能分支执行质量检查,确保代码完整性和可靠性。 可以在管道中包含以下类型的检查:

  • Python 依赖项测试: 这些测试安装并验证 Python 依赖项的正确性,以确保正确配置项目的依赖项。
  • 代码分析和 Lint 分析:应用用于静态代码分析和 Lint 分析的工具来评估代码质量和遵守编码标准。
  • Airflow DAG 测试: 这些测试执行验证测试,包括 DAG 定义的测试,以及针对 Airflow DAG 设计的单元测试。
  • Airflow 自定义操作器、钩子、传感器和触发器的单元测试

如果其中任一检查失败,管道将终止。 接下来,需要解决识别出的问题。

Git 同步与生产集成运行时相结合

将工作流编排管理器环境与 Git 存储库的生产分支进行映射。

PR 管道与生产集成运行时相结合

最佳做法是维护单独的生产环境,以防止每个开发功能公开访问。

在功能分支成功合并到开发分支后,可以向生产分支创建拉取请求,使新合并的功能公开。 此拉取请求会触发 PR 管道,对开发分支执行快速质量检查。 质量检查可确保所有功能都正确集成,并且生产环境中没有错误。

在工作流业务流程管理器中使用 CI/CD 工作流的好处

  • 快速失败方法: 如果不集成 CI/CD 进程,你第一次发现 DAG 包含错误可能是在它被推送到 GitHub,并与工作流编排管理器同步后引发 Import Error。 与此同时,另一位开发人员可以不知不觉地从存储库中提取故障代码,这可能会导致行内效率低下。
  • 代码质量改进: 如果忽略了基本检查,如语法验证、必要的导入以及检查其他最佳编码实践,就会增加交付质量欠佳代码的可能性。

工作流业务流程管理器中的部署模式

建议使用两种部署模式。

模式 1:直接在工作流业务流程管理器中开发数据管道

使用模式 1 时,可以直接在工作流业务流程管理器中开发数据管道。

先决条件

  • 如果没有 Azure 订阅,请在开始之前先创建一个免费帐户。 在支持工作流业务流程管理器预览的区域中创建或选择现有的数据工厂实例。
  • 需要访问 GitHub 存储库

优点

  • 不需要本地开发环境: 工作流业务流程管理器处理底层基础结构、更新和维护,从而减少管理 Airflow 群集的作开销。 借助该服务,你可以专注于构建和管理工作流,而不是管理基础结构。
  • 可伸缩性: 工作流业务流程管理器提供按需缩放资源的自动缩放功能,确保数据管道可以处理不断增加的工作负荷或活动突发,而无需手动干预。
  • 监视和日志记录: 工作流业务流程管理器包括诊断日志和监视,可帮助你跟踪工作流的执行、诊断问题、设置警报和优化性能。

Workflow

  1. 使用 Git 同步功能。

    在此工作流中,无需建立自己的本地环境。 相反,你可以立即使用工作流编排管理器提供的 Git 同步功能。 此功能会自动将 DAG 文件与 Airflow Web 服务器、计划程序和辅助角色同步。 现在,可以直接通过工作流业务流程管理器 UI 开发、测试和执行数据管道。

    详细了解如何使用工作流业务流程管理器 Git 同步功能

  2. 创建单个功能分支环境。

    可以从存储库中选择分支,以便与工作流业务流程管理器同步。 此功能允许为每个功能分支创建单独的 Airflow 环境。 这样,开发人员就可以处理数据管道的特定任务。

  3. 创建拉取请求。

    在专用 Airflow 环境中全面开发和测试功能后,继续向 Airflow 开发环境集成运行时提交拉取请求。

模式 2:在本地开发 DAG 并在工作流业务流程管理器上部署

使用模式 2 时,可以在本地开发 DAG 并将其部署到工作流业务流程管理器上。

先决条件

  • 需要访问 GitHub 存储库
  • 确保至少将代码存储库的单个分支与工作流业务流程管理器同步,以查看服务上的代码更改。

优点

只能将对 Azure 资源的访问权限限制为管理员。

Workflow

  1. 设置本地环境。

    首先,在开发计算机上为 Apache Airflow 设置本地开发环境。 在此环境中,可以开发和测试 Airflow 代码,包括 DAG 和任务。 此方法允许你开发管道,而无需直接访问 Azure 资源。

  2. 使用 Git 同步功能。

    将 GitHub 存储库的分支与工作流业务流程管理器同步。

    详细了解如何使用工作流业务流程管理器 Git 同步功能

  3. 使用工作流业务流程管理器作为生产环境。

    在本地设置上成功开发和测试数据管道后,可以向与工作流业务流程管理器同步的分支提出拉取请求。 合并分支后,请在生产级别使用工作流业务流程管理器功能,例如自动缩放和监视和日志记录。

示例 CI/CD 管道

有关详细信息,请参见:

  1. 使用 Git 同步功能复制工作流业务流程管理器集成运行时中部署的 DAG 的代码。

    from datetime import datetime
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    with DAG(
        dag_id="airflow-ci-cd-tutorial",
        start_date=datetime(2023, 8, 15),
        schedule="0 0 * * *",
        tags=["tutorial", "CI/CD"]
    ) as dag:
        # Tasks are represented as operators
        task1 = BashOperator(task_id="task1", bash_command="echo task1")
        task2 = BashOperator(task_id="task2", bash_command="echo task2")
        task3 = BashOperator(task_id="task3", bash_command="echo task3")
        task4 = BashOperator(task_id="task4", bash_command="echo task4")
    
        # Set dependencies between tasks
        task1 >> task2 >> task3 >> task4
    
  2. 创建 CI/CD 管道。 有两个选项:Azure DevOps 或 GitHub 操作。

    1. Azure DevOps 选项:创建文件 azure-devops-ci-cd.yaml 并复制以下代码。 该管道会在对开发分支的拉取请求或推送请求上触发:

      trigger:
      - dev
      
      pr:
      - dev
      
      pool:
        vmImage: ubuntu-latest
      strategy:
        matrix:
          Python3.11:
            python.version: '3.11.5'
      
      steps:
      - task: UsePythonVersion@0
        inputs:
          versionSpec: '$(python.version)'
        displayName: 'Use Python $(python.version)'
      
      - script: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
        displayName: 'Install dependencies'
      
      - script: |
          airflow webserver &
          airflow db init
          airflow scheduler &
          pytest
        displayName: 'Pytest'
      

      有关详细信息,请参阅 Azure Pipelines

    2. GitHub Actions 选项:在 GitHub 存储库中创建 .github/workflows 目录。

      1. .github/workflows 目录中,创建名为 github-actions-ci-cd.yml 的文件。

      2. 复制以下代码。 每当有对开发分支的拉取请求或推送请求时,管道就会触发:

        name: GitHub Actions CI/CD
        
        on:
          pull_request:
            branches:
              - "dev"
          push:
            branches:
              - "dev"
        
        jobs:
          flake8:
            strategy:
              matrix:
                python-version: [3.11.5]
            runs-on: ubuntu-latest
            steps:
              - name: Check out source repository
                uses: actions/checkout@v4
              - name: Setup Python
                uses: actions/setup-python@v4
                with:
                  python-version: ${{matrix.python-version}}
              - name: flake8 Lint
                uses: py-actions/flake8@v1
                with:
                  max-line-length: 120
          tests:
            strategy:
              matrix:
                python-version: [3.11.5]
            runs-on: ubuntu-latest
            needs: [flake8]
            steps:
              - uses: actions/checkout@v4
              - name: Setup Python
                uses: actions/setup-python@v4
                with:
                  python-version: ${{matrix.python-version}}
              - name: Install dependencies
                run: |
                  python -m pip install --upgrade pip
                  pip install -r requirements.txt
              - name: Pytest
                run: |
                  airflow webserver &
                  airflow db init
                  airflow scheduler &
                  pytest tests/
        
  3. 在测试文件夹中,为 Airflow DAG 创建测试。 以下是一些示例:

    1. 至少,使用 import_errors 以确保 DAG 的完整性和正确性进行初始测试至关重要。 此测试可确保:

      • 您的 DAG 不包含循环性: 循环性指的是任务在工作流中形成环或者循环依赖,可能会导致意外和无穷尽的执行循环。

      • 没有导入错误: 由于缺少依赖项、错误的模块路径或编码错误等问题,可能会出现导入错误。  

      • 正确定义了任务: 确认已正确定义 DAG 中的任务。

        @pytest.fixture()
        
        def dagbag():
            return DagBag(dag_folder="dags")
        
        def test_no_import_errors(dagbag):
            """
            Test Dags to contain no import errors.
            """
            assert not dagbag.import_errors
        
    2. 测试以确保功能分支中存在特定的 DAG ID,然后再将它们合并到开发分支中。

      def test_expected_dags(dagbag):
          """
          Test whether expected dag Ids are present.
          """
          expected_dag_ids = ["airflow-ci-cd-tutorial"]
      
          for dag_id in expected_dag_ids:
              dag = dagbag.get_dag(dag_id)
      
              assert dag is not None
              assert dag_id == dag.dag_id
      
    3. 测试以确保仅已批准的标签与 DAG 相关联。 此测试有助于强制实施批准的标记用法。

      def test_requires_approved_tag(dagbag):
          """
          Test if DAGS contain one or more tags from list of approved tags only.
          """
          Expected_tags = {"tutorial", "CI/CD"}
          dagIds = dagbag.dag_ids
      
          for id in dagIds:
              dag = dagbag.get_dag(id)
              assert dag.tags
              if Expected_tags:
                  assert not set(dag.tags) - Expected_tags
      
  4. 现在,当您向开发分支发起拉取请求时,可以看到 GitHub 操作会触发 CI 管道运行所有测试。