你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
注意
工作流编排管理器由 Apache Airflow 提供支持。
工作流编排管理器提供了一种简单高效的方法来创建和管理 Apache Airflow 环境。 借助该服务,可以轻松大规模运行数据管道。 在工作流编排管理器中运行 DAG 有两个主要方法。 可以在 Blob 存储中上传 DAG 文件,并将其链接到 Airflow 环境。 或者,可以使用 Git 同步功能自动将 Git 存储库与 Airflow 环境同步。
要在 Airflow 中使用数据管道,需要根据工作流需求频繁创建或更新 DAG、插件和要求文件。 尽管开发人员可以在 Blob 存储中手动上传或编辑 DAG 文件,但许多组织更喜欢使用持续集成和持续交付 (CI/CD) 方法进行代码部署。 本文将引导你完成建议的部署模式,以便将 Apache Airflow DAG 与工作流编排管理器无缝集成和部署。
了解 CI/CD
持续集成
持续集成是一种软件开发实践,着重于将代码更改频繁自动地集成到共享存储库中。 它涉及到开发人员定期提交其代码,每次提交时,自动化 CI 管道都会生成代码、运行测试并执行验证检查。 主要目标是在开发过程中尽早检测和解决集成问题,并向开发人员提供快速反馈。
CI 可确保代码库保持持续可测试且可部署的状态。 此做法可提高代码质量和协作,还能在 bug 成为重大问题之前捕获和修复这些 bug。
持续交付
持续交付是 CI 的延伸,它将自动化向前推进了一步。 CI 侧重于自动执行集成和测试阶段,而 CD 自动将代码更改部署到生产环境或其他目标环境。 此做法可帮助组织快速可靠地发布软件更新。 它减少了手动部署中的错误,并确保将批准的代码更改快速交付给用户。
工作流编排管理器中的 CI/CD 工作流
Git 同步与 Dev/QA 集成运行时相结合
使用 Git 存储库的开发/QA 分支映射工作流编排管理器环境。
使用 Dev/QA 集成运行时的 CI 管道
从功能分支向开发分支发出拉取请求 (PR) 时,会触发 PR 管道。 此管道旨在有效地对功能分支执行质量检查,确保代码的完整性和可靠性。 可以在管道中包含以下类型的检查:
- Python 依赖项测试:这些测试会安装 Python 依赖项并测试其正确性,确保适当配置项目的依赖项。
- 代码分析和 Lint 分析:应用用于静态代码分析和 Lint 分析的工具来评估代码质量和遵守编码标准。
- Airflow DAG 测试:这些测试会执行验证测试,包括 DAG 定义的测试以及专为 Airflow DAG 设计的单元测试。
- Airflow 自定义操作器、钩子、传感器和触发器的单元测试
如果其中任一检查失败,则管道将终止。 然后,你需要解决发现的问题。
Git 同步与生产集成运行时相结合
使用 Git 存储库的生产分支映射工作流编排管理器环境。
PR 管道与生产集成运行时相结合
最佳做法是维护单独的生产环境,防止每个开发功能公开访问。
功能分支成功合并到开发分支后,可创建对生产分支的拉取请求,将新合并的功能公开。 此拉取请求会触发 PR 管道,对开发分支执行快速质量检查。 质量检查可确保所有功能都正确集成,并且生产环境中没有错误。
在工作流编排管理器中使用 CI/CD 工作流的好处
- 快速失败方法:如果没有集成 CI/CD 进程,则当 DAG 被推送到 GitHub、与工作流编排管理器同步和引发
Import Error
时,可能是你第一次知道 DAG 包含错误。 与此同时,其他开发人员可能会在不知情的情况下从存储库中拉取有问题的代码,这可能会导致效率低下。 - 代码质量改进:如果忽略语法验证、必要的导入和其他最佳编码做法检查等基本检查,则会增加交付次佳代码的可能性。
工作流编排管理器中的部署模式
建议使用两种部署模式。
模式 1:直接在工作流编排管理器中开发数据管道
使用模式 1 时,可以直接在工作流编排管理器中开发数据管道。
先决条件
- 如果没有 Azure 订阅,请在开始之前创建一个免费帐户。 在支持工作流编排管理器预览版的区域中创建一个数据工厂实例或选择一个现有的实例。
- 你需要能够访问 GitHub 存储库。
优点
- 不需要本地开发环境:工作流编排管理器会处理底层基础结构、更新和维护,从而减少了管理 Airflow 群集的操作开销。 有了该服务,你可专注构建和管理工作流,而不是管理基础结构。
- 可伸缩性:工作流编排管理器提供自动缩放功能来按需缩放资源,确保数据管道可处理不断增加的工作负载或突发活动,无需手动干预。
- 监视和日志记录:工作流编排管理器包括诊断日志和监视,可帮助你跟踪工作流的执行、诊断问题、设置警报和优化性能。
Workflow
使用 Git 同步功能。
在此工作流中,你无需建立自己的本地环境。 相反,你可以立即使用工作流编排管理器提供的 Git 同步功能。 此功能会自动将 DAG 文件与 Airflow Web 服务器、计划程序和辅助角色同步。 现在,可以直接通过工作流编排管理器 UI 开发、测试和执行数据管道。
详细了解如何使用工作流编排管理器的 Git 同步功能。
创建单独的功能分支环境。
可以从存储库中选择分支,以便与工作流编排管理器同步。 此功能允许为每个功能分支创建单独的 Airflow 环境。 这样,开发人员就可以处理数据管道的特定任务。
创建拉取请求。
在专用 Airflow 环境中全面开发和测试功能后,继续向 Airflow 开发环境集成运行时提交拉取请求。
模式 2:在本地开发 DAG 并在工作流编排管理器上部署
使用模式 2 时,可以在本地开发 DAG 并将其部署到工作流编排管理器上。
先决条件
- 你需要能够访问 GitHub 存储库。
- 确保将代码存储库的至少一个分支与工作流编排管理器同步,以查看服务上的代码更改。
优点
可限制只能由管理员访问 Azure 资源。
Workflow
设置本地环境。
首先,在开发计算机上为 Apache Airflow 设置本地开发环境。 在此环境中,可以开发和测试 Airflow 代码(包括 DAG 和任务)。 通过此方法可开发管道,而无需直接访问 Azure 资源。
使用 Git 同步功能。
将 GitHub 存储库的分支与工作流编排管理器同步。
详细了解如何使用工作流编排管理器的 Git 同步功能。
使用工作流编排管理器作为生产环境。
在本地设置上成功开发和测试数据管道后,可以向与工作流编排管理器同步的分支提出拉取请求。 合并分支后,请在生产级别使用工作流编排管理器功能,例如自动缩放和监视和日志记录。
示例 CI/CD 管道
有关详细信息,请参阅:
使用 Git 同步功能复制工作流编排管理器集成运行时中部署的 DAG 的代码。
from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator with DAG( dag_id="airflow-ci-cd-tutorial", start_date=datetime(2023, 8, 15), schedule="0 0 * * *", tags=["tutorial", "CI/CD"] ) as dag: # Tasks are represented as operators task1 = BashOperator(task_id="task1", bash_command="echo task1") task2 = BashOperator(task_id="task2", bash_command="echo task2") task3 = BashOperator(task_id="task3", bash_command="echo task3") task4 = BashOperator(task_id="task4", bash_command="echo task4") # Set dependencies between tasks task1 >> task2 >> task3 >> task4
创建 CI/CD 管道。 你有两个选项:Azure DevOps 或 GitHub 操作。
Azure DevOps 选项:创建文件
azure-devops-ci-cd.yaml
并复制以下代码。 该管道会在对开发分支的拉取请求或推送请求上触发:trigger: - dev pr: - dev pool: vmImage: ubuntu-latest strategy: matrix: Python3.11: python.version: '3.11.5' steps: - task: UsePythonVersion@0 inputs: versionSpec: '$(python.version)' displayName: 'Use Python $(python.version)' - script: | python -m pip install --upgrade pip pip install -r requirements.txt displayName: 'Install dependencies' - script: | airflow webserver & airflow db init airflow scheduler & pytest displayName: 'Pytest'
有关详细信息,请参阅 Azure Pipelines。
GitHub 操作选项:在 GitHub 存储库中创建一个
.github/workflows
目录。在
.github/workflows
目录中,创建名为github-actions-ci-cd.yml
的文件。复制以下代码。 每当有对开发分支的拉取请求或推送请求时,管道就会触发:
name: GitHub Actions CI/CD on: pull_request: branches: - "dev" push: branches: - "dev" jobs: flake8: strategy: matrix: python-version: [3.11.5] runs-on: ubuntu-latest steps: - name: Check out source repository uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v4 with: python-version: ${{matrix.python-version}} - name: flake8 Lint uses: py-actions/flake8@v1 with: max-line-length: 120 tests: strategy: matrix: python-version: [3.11.5] runs-on: ubuntu-latest needs: [flake8] steps: - uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v4 with: python-version: ${{matrix.python-version}} - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt - name: Pytest run: | airflow webserver & airflow db init airflow scheduler & pytest tests/
在测试文件夹中,为 Airflow DAG 创建测试。 以下是一些示例:
至少,使用
import_errors
进行初始测试来确保 DAG 的完整性和正确性至关重要。 此测试可确保:DAG 不包含循环性:循环性可能会导致意外和无限的执行循环,其中任务在工作流中形成循环依赖。
没有导入错误:导入错误可能是由于缺少依赖项、模块路径不正确或编码错误等问题引起的。
正确定义任务:确认已正确定义 DAG 中的任务。
@pytest.fixture() def dagbag(): return DagBag(dag_folder="dags") def test_no_import_errors(dagbag): """ Test Dags to contain no import errors. """ assert not dagbag.import_errors
测试以确保功能分支中存在特定的 DAG ID,然后再将它们合并到开发分支中。
def test_expected_dags(dagbag): """ Test whether expected dag Ids are present. """ expected_dag_ids = ["airflow-ci-cd-tutorial"] for dag_id in expected_dag_ids: dag = dagbag.get_dag(dag_id) assert dag is not None assert dag_id == dag.dag_id
测试以确保只有已批准的标记与 DAG 相关联。 此测试有助于强制实施批准的标记用法。
def test_requires_approved_tag(dagbag): """ Test if DAGS contain one or more tags from list of approved tags only. """ Expected_tags = {"tutorial", "CI/CD"} dagIds = dagbag.dag_ids for id in dagIds: dag = dagbag.get_dag(id) assert dag.tags if Expected_tags: assert not set(dag.tags) - Expected_tags
现在,向开发分支提出拉取请求时,可以看到 GitHub 操作会触发 CI 管道来运行所有测试。