Patrones CI/CD con el Administrador de orquestación de flujo de trabajo

Importante

El 1 de enero de 2026 ya no podrás crear nuevas instancias de Airflow usando el Workflow Orchestration Manager de ADF. Recomendamos que migres todas las cargas de trabajo de Workflow Orchestration Manager (Apache Airflow en Azure Data Factory) a trabajos de Apache Airflow en Microsoft Fabric antes del 31 de diciembre de 2025.

Para obtener más información o para obtener soporte técnico durante la migración a Apache Airflow en Microsoft Fabric, póngase en contacto con el soporte técnico de Microsoft.

Workflow Orchestration Manager proporciona una manera sencilla y eficaz de crear y administrar entornos de Apache Airflow. El servicio le permite ejecutar canalizaciones de datos a escala con facilidad. Hay dos métodos principales para ejecutar DAG en el Administrador de orquestaciones de flujo de trabajo. Puede cargar los archivos DAG en el almacenamiento de blobs y vincularlos con el entorno de Airflow. Como alternativa, puede usar la característica de sincronización de Git para sincronizar automáticamente el repositorio de Git con el entorno de Airflow.

Trabajar con canalizaciones de datos en Airflow requiere que cree o actualice los DAG, los complementos y los archivos de requisitos con frecuencia, en función de sus necesidades de flujo de trabajo. Aunque los desarrolladores pueden cargar o editar manualmente archivos DAG en Blob Storage, muchas organizaciones prefieren usar un enfoque de integración continua y entrega continua (CI/CD) para la implementación de código. Este artículo le guía por los patrones de implementación recomendados para integrar e implementar sin problemas los DAG de Apache Airflow con Workflow Orchestration Manager.

Comprender CI/CD

Integración continua

La integración continua es una práctica de desarrollo de software que enfatiza la integración frecuente y automatizada de los cambios de código en un repositorio compartido. Implica a los desarrolladores confirmar periódicamente su código y, tras cada confirmación, una canalización de CI automatizada compila el código, ejecuta pruebas y realiza comprobaciones de validación. Los objetivos principales son detectar y solucionar problemas de integración al principio del proceso de desarrollo y proporcionar comentarios rápidos a los desarrolladores.

CI garantiza que el código base permanece en un estado que se puede probar e implementar constantemente. Esta práctica lleva a mejorar la calidad del código, la colaboración y la capacidad de detectar y corregir errores antes de que se conviertan en problemas importantes.

Entrega continua

La entrega continua es una extensión de CI que lleva la automatización un paso más allá. Aunque CI se centra en automatizar las fases de integración y pruebas, CD automatiza la implementación de cambios de código en producción u otros entornos de destino. Esta práctica ayuda a las organizaciones a publicar actualizaciones de software de forma rápida y confiable. Reduce los errores en la implementación manual y garantiza que los cambios de código aprobados se entregan a los usuarios rápidamente.

Flujo de trabajo de CI/CD en el Administrador de orquestación de flujo de trabajo

Captura de pantalla que muestra el patrón de CI/CD que se puede usar en el Administrador de orquestaciones de flujo de trabajo.

Sincronización de Git con Dev/QA Integration Runtime

Relacione el entorno del Gestor de Orquestación de Flujos de Trabajo con la rama de desarrollo/QA del repositorio de Git.

Canalización de integración continua con el entorno de ejecución de integración de desarrollo y control de calidad

Cuando se envía una solicitud de incorporación de cambios (PR) desde una rama de características a la rama de desarrollo, desencadena una canalización de PR. Esta canalización está diseñada para realizar comprobaciones de calidad de forma eficaz en las ramas de características, lo que garantiza la integridad y la confiabilidad del código. Puede incluir los siguientes tipos de comprobaciones en la canalización:

  • Pruebas de dependencias de Python: Estas pruebas instalan y comprueban la exactitud de las dependencias de Python para asegurarse de que las dependencias del proyecto están configuradas correctamente.
  • Análisis de código y linting: Las herramientas para el análisis de código estático y el linting se aplican para evaluar la calidad del código y el cumplimiento de los estándares de codificación.
  • Pruebas DAG de flujo de aire: Estas pruebas ejecutan pruebas de validación, incluidas las pruebas para la definición de DAG y las pruebas unitarias diseñadas para los DAG de Airflow.
  • Pruebas unitarias para enlaces, sensores, desencadenadores y operadores personalizados de Airflow

Si se produce un error en alguna de estas comprobaciones, la canalización finaliza. A continuación, debe solucionar los problemas identificados.

Sincronización de Git con entorno de ejecución de integración de producción

Mapee el entorno de Workflow Orchestration Manager a la rama de producción del repositorio de Git.

Flujo de PR con entorno de ejecución integrado en producción

Un procedimiento recomendado es mantener un entorno de producción independiente para evitar que todas las características de desarrollo sean accesibles públicamente.

Cuando la rama de características se combine correctamente en la rama de desarrollo, puedes crear una solicitud de incorporación de cambios en la rama de producción para que la característica recién combinada sea pública. Esta solicitud de incorporación de cambios desencadena la canalización de PR que realiza comprobaciones rápidas de calidad en la rama de desarrollo. Las comprobaciones de calidad garantizan que todas las características se integraron correctamente y que no hay errores en el entorno de producción.

Ventajas de usar el flujo de trabajo de CI/CD en el Administrador de orquestaciones de flujo de trabajo

  • Enfoque de fallo rápido: Sin la integración del proceso de CI/CD, es probable que la primera vez que note que el DAG contiene errores sea cuando se suba a GitHub y se sincronice con el Administrador de Orquestación de Flujos de Trabajo, e inicie un Import Error. Mientras tanto, otro desarrollador puede descargar sin darse cuenta el código defectuoso del repositorio, lo que podría provocar ineficiencias en procesos futuros.
  • Mejora de la calidad del código: Si olvida comprobaciones fundamentales como la comprobación de sintaxis, las importaciones necesarias y las comprobaciones de otros procedimientos recomendados de codificación, aumenta la probabilidad de entregar código subpar.

Patrones de implementación en el Administrador de orquestaciones de flujo de trabajo

Se recomiendan dos patrones de implementación.

Patrón 1: Desarrollo de canalizaciones de datos directamente en Workflow Orchestration Manager

Puede desarrollar canalizaciones de datos directamente en El Administrador de orquestaciones de flujo de trabajo cuando se usa el patrón 1.

Prerrequisitos

  • Si no tiene una suscripción a Azure, cree una cuenta gratuita antes de empezar. Cree o seleccione una instancia de Data Factory existente en la región donde se admite la versión preliminar de Workflow Orchestration Manager.
  • Necesita acceso a un repositorio de GitHub.

Ventajas

  • No se requiere ningún entorno de desarrollo local: Workflow Orchestration Manager controla la infraestructura subyacente, las actualizaciones y el mantenimiento, lo que reduce la sobrecarga operativa de la administración de clústeres de Airflow. El servicio le permite centrarse en la creación y administración de flujos de trabajo en lugar de administrar la infraestructura.
  • Escalabilidad: Workflow Orchestration Manager proporciona capacidad de escalado automático para escalar los recursos según sea necesario, lo que garantiza que las canalizaciones de datos puedan controlar el aumento de las cargas de trabajo o las ráfagas de actividad sin intervención manual.
  • Supervisión y registro: Workflow Orchestration Manager incluye registros de diagnóstico y supervisión para ayudarle a realizar un seguimiento de la ejecución de los flujos de trabajo, diagnosticar problemas, configurar alertas y optimizar el rendimiento.

Flujo de trabajo

  1. Use la característica de sincronización de Git.

    En este flujo de trabajo, no es necesario establecer su propio entorno local. En su lugar, puede empezar con la característica de sincronización de Git que ofrece el Administrador de orquestaciones de flujo de trabajo. Esta característica sincroniza automáticamente los archivos DAG con servidores web, programadores y trabajadores de Airflow. Ahora puede desarrollar, probar y ejecutar las canalizaciones de datos directamente a través de la interfaz de usuario de Workflow Orchestration Manager.

    Obtenga más información sobre cómo usar la característica Git-sync del Administrador de orquestaciones de flujo de trabajo.

  2. Cree entornos de rama de características individuales.

    Puede elegir la rama del repositorio para sincronizarla con el Administrador de orquestaciones de flujo de trabajo. Esta funcionalidad le permite crear un entorno de Airflow individual para cada rama de características. De esta manera, los desarrolladores pueden trabajar en tareas específicas para canalizaciones de datos.

  3. Cree una solicitud de incorporación de cambios.

    Continúa con el envío de una solicitud de incorporación de cambios al entorno de ejecución de integración del entorno de desarrollo de Airflow después de desarrollar y probar exhaustivamente las características en tu entorno de Airflow dedicado.

Patrón 2: Desarrollo de DAG localmente e implementación en el Administrador de orquestaciones de flujo de trabajo

Puede desarrollar DAG localmente e implementarlos en el Administrador de orquestaciones de flujo de trabajo cuando use el patrón 2.

Prerrequisitos

  • Necesita acceso a un repositorio de GitHub.
  • Asegúrese de que al menos una sola rama del repositorio de código esté sincronizada con el Administrador de orquestaciones de flujo de trabajo para ver los cambios de código en el servicio.

Ventajas

Solo puede limitar el acceso a los recursos de Azure a los administradores.

Flujo de trabajo

  1. Configure un entorno local.

    Comience configurando un entorno de desarrollo local para Apache Airflow en la máquina de desarrollo. En este entorno, puede desarrollar y probar el código de Airflow, incluidos los DAG y las tareas. Este enfoque le permite desarrollar canalizaciones sin depender del acceso directo a los recursos de Azure.

  2. Use la característica de sincronización de Git.

    Sincronice la rama del repositorio de GitHub con el Administrador de orquestaciones de flujo de trabajo.

    Obtenga más información sobre cómo usar la característica Git-sync del Administrador de orquestaciones de flujo de trabajo.

  3. Utilice el Administrador de orquestaciones de flujo de trabajo como entorno de producción.

    Después de desarrollar y probar correctamente las canalizaciones de datos en la configuración local, puedes generar una solicitud de incorporación de cambios a la rama sincronizada con el Administrador de orquestación de flujo de trabajo. Una vez combinada la rama, use las características del Administrador de orquestaciones de flujo de trabajo, como el escalado automático y la supervisión y el registro en el nivel de producción.

Canalización de CI/CD de ejemplo

Para obtener más información, consulte:

  1. Copie el código de un DAG implementado en el entorno de ejecución de integración de Workflow Orchestration Manager mediante la característica de sincronización de Git.

    from datetime import datetime
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    with DAG(
        dag_id="airflow-ci-cd-tutorial",
        start_date=datetime(2023, 8, 15),
        schedule="0 0 * * *",
        tags=["tutorial", "CI/CD"]
    ) as dag:
        # Tasks are represented as operators
        task1 = BashOperator(task_id="task1", bash_command="echo task1")
        task2 = BashOperator(task_id="task2", bash_command="echo task2")
        task3 = BashOperator(task_id="task3", bash_command="echo task3")
        task4 = BashOperator(task_id="task4", bash_command="echo task4")
    
        # Set dependencies between tasks
        task1 >> task2 >> task3 >> task4
    
  2. Crea una canalización de CI/CD. Tiene dos opciones: Acciones de Azure DevOps o GitHub.

    1. Opción de Azure DevOps: cree el archivo azure-devops-ci-cd.yaml y copie el código siguiente. La canalización se desencadena en una solicitud de incorporación de cambios o una solicitud de inserción en la rama de desarrollo:

      trigger:
      - dev
      
      pr:
      - dev
      
      pool:
        vmImage: ubuntu-latest
      strategy:
        matrix:
          Python3.11:
            python.version: '3.11.5'
      
      steps:
      - task: UsePythonVersion@0
        inputs:
          versionSpec: '$(python.version)'
        displayName: 'Use Python $(python.version)'
      
      - script: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
        displayName: 'Install dependencies'
      
      - script: |
          airflow webserver &
          airflow db init
          airflow scheduler &
          pytest
        displayName: 'Pytest'
      

      Para más información, consulte Azure Pipelines.

    2. Opción de acciones de GitHub: crear un .github/workflows directorio en el repositorio de GitHub.

      1. En el .github/workflows directorio , cree un archivo denominado github-actions-ci-cd.yml.

      2. Copie el código siguiente. La canalización se desencadena siempre que haya una solicitud de incorporación de cambios o una solicitud de inserción en la rama de desarrollo:

        name: GitHub Actions CI/CD
        
        on:
          pull_request:
            branches:
              - "dev"
          push:
            branches:
              - "dev"
        
        jobs:
          flake8:
            strategy:
              matrix:
                python-version: [3.11.5]
            runs-on: ubuntu-latest
            steps:
              - name: Check out source repository
                uses: actions/checkout@v4
              - name: Setup Python
                uses: actions/setup-python@v4
                with:
                  python-version: ${{matrix.python-version}}
              - name: flake8 Lint
                uses: py-actions/flake8@v1
                with:
                  max-line-length: 120
          tests:
            strategy:
              matrix:
                python-version: [3.11.5]
            runs-on: ubuntu-latest
            needs: [flake8]
            steps:
              - uses: actions/checkout@v4
              - name: Setup Python
                uses: actions/setup-python@v4
                with:
                  python-version: ${{matrix.python-version}}
              - name: Install dependencies
                run: |
                  python -m pip install --upgrade pip
                  pip install -r requirements.txt
              - name: Pytest
                run: |
                  airflow webserver &
                  airflow db init
                  airflow scheduler &
                  pytest tests/
        
  3. En la carpeta tests, cree las pruebas para los DAG de Airflow. Estos son algunos ejemplos:

    1. Al menos, es fundamental realizar pruebas iniciales mediante import_errors para garantizar la integridad y corrección del DAG. Esta prueba garantiza:

      • El DAG no contiene ciclicidad: La ciclicidad, donde una tarea forma un bucle o una dependencia circular dentro del flujo de trabajo, puede provocar bucles de ejecución inesperados e infinitos.

      • No hay errores de importación: Los errores de importación pueden surgir debido a problemas como dependencias que faltan, rutas de acceso de módulo incorrectas o errores de codificación.  

      • Las tareas se definen correctamente: Confirme que las tareas de su DAG están definidas correctamente.

        @pytest.fixture()
        
        def dagbag():
            return DagBag(dag_folder="dags")
        
        def test_no_import_errors(dagbag):
            """
            Test Dags to contain no import errors.
            """
            assert not dagbag.import_errors
        
    2. Pruebe para asegurarse de que hay identificadores DAG específicos en la rama de características antes de combinarlos en la rama de desarrollo.

      def test_expected_dags(dagbag):
          """
          Test whether expected dag Ids are present.
          """
          expected_dag_ids = ["airflow-ci-cd-tutorial"]
      
          for dag_id in expected_dag_ids:
              dag = dagbag.get_dag(dag_id)
      
              assert dag is not None
              assert dag_id == dag.dag_id
      
    3. Pruebe para asegurarse de que solo las etiquetas aprobadas están asociadas a los DAG. Esta prueba ayuda a aplicar el uso de etiquetas aprobadas.

      def test_requires_approved_tag(dagbag):
          """
          Test if DAGS contain one or more tags from list of approved tags only.
          """
          Expected_tags = {"tutorial", "CI/CD"}
          dagIds = dagbag.dag_ids
      
          for id in dagIds:
              dag = dagbag.get_dag(id)
              assert dag.tags
              if Expected_tags:
                  assert not set(dag.tags) - Expected_tags
      
  4. Ahora, cuando creas un pull request en la rama de desarrollo, puedes ver que las GitHub Actions desencadenan el pipeline de CI para ejecutar todas las pruebas.