Compartir a través de


Patrones CI/CD con el Administrador de orquestación de flujo de trabajo

Nota:

El Administrador de orquestación de flujo de trabajo cuenta con la tecnología de Apache Airflow.

El Administrador de orquestación de flujo de trabajo proporciona una manera sencilla y eficaz de crear y administrar entornos de Apache Airflow. El servicio te permite ejecutar canalizaciones de datos a gran escala con facilidad. Hay dos métodos principales para ejecutar DAG en el Administrador de orquestación de flujo de trabajo. Puedes cargar los archivos DAG en el almacenamiento de blobs y vincularlos con el entorno de Airflow. Como alternativa, puedes usar la característica de sincronización de Git para sincronizar automáticamente el repositorio de Git con el entorno de Airflow.

Trabajar con canalizaciones de datos en Airflow requiere la creación o actualización de los DAG, los complementos y los archivos de requisitos con frecuencia, según las necesidades del flujo de trabajo. Aunque los desarrolladores pueden cargar o editar manualmente archivos DAG en Blob Storage, muchas organizaciones prefieren usar un enfoque de integración continua y entrega continua (CI/CD) para la implementación de código. Este artículo te guía por los patrones de implementación recomendados para integrar e implementar sin problemas los DAG de Apache Airflow con el Administrador de orquestación de flujo de trabajo.

Descripción de CI/CD

Integración continua

La integración continua es una práctica de desarrollo de software que enfatiza la integración frecuente y automatizada de los cambios de código en un repositorio compartido. Involucra a los desarrolladores para que confirmen periódicamente su código y, después de cada confirmación, una canalización de CI automatizada compila el código, ejecuta pruebas y realiza comprobaciones de validación. Los objetivos principales son detectar y solucionar problemas de integración al principio del proceso de desarrollo y proporcionar comentarios rápidos a los desarrolladores.

La integración continua (CI) garantiza que el código base permanezca en un estado que se puede probar e implementar constantemente. Esta práctica lleva a mejorar la calidad del código, la colaboración y la capacidad de detectar y corregir errores antes de que se conviertan en problemas importantes.

Entrega continua

La entrega continua es una extensión de la integración continua que lleva la automatización un paso más allá. Aunque la integración continua se centra en automatizar las fases de integración y pruebas, la implementación continua automatiza la implementación de los cambios de código en producción u otros entornos de destino. Esta práctica ayuda a las organizaciones a publicar actualizaciones de software de forma rápida y fiable. Reduce los errores en la implementación manual y garantiza que los cambios de código aprobados se entregan a los usuarios rápidamente.

Flujo de trabajo de CI/CD en el Administrador de orquestación de flujo de trabajo

Captura de pantalla que muestra el patrón de CI/CD que se puede usar en el Administrador de orquestación de flujo de trabajo.

Sincronización de Git con el entorno de ejecución de integración de desarrollo y control de calidad

Asigna el entorno del Administrador de orquestación de flujo de trabajo con la rama de desarrollo y control de calidad del repositorio de Git.

Canalización de integración continua con el entorno de ejecución de integración de desarrollo y control de calidad

Cuando se envía una solicitud de incorporación de cambios (PR) desde una rama de características a la rama de desarrollo, desencadena una canalización de PR. Esta canalización está diseñada para realizar comprobaciones de calidad de forma eficaz en las ramas de características, lo que garantiza la integridad y la confiabilidad del código. En la canalización se pueden incluir los siguientes tipos de inserciones en el repositorio:

  • Pruebas de dependencias de Python: estas pruebas instalan las dependencias de Python y comprueban si son correctas, para asegurarse de que las dependencias del proyecto están configuradas correctamente.
  • Análisis de código y linting: se aplican herramientas para el análisis de código estático y linting, con el fin de evaluar la calidad del código y el cumplimiento de los estándares de programación.
  • Pruebas de DAG de Airflow: ejecutan pruebas de validación, incluidas pruebas para la definición de DAG y pruebas unitarias diseñadas para los DAG de Airflow.
  • Pruebas unitarias para enlaces, sensores, desencadenadores y operadores personalizados de Airflow

Si se produce un error en alguna de estas comprobaciones, la canalización finaliza. Después, debes solucionar los problemas identificados.

Sincronización de Git con el entorno de ejecución de integración

Asigna el entorno del Administrador de orquestación de flujo de trabajo con la rama de producción del repositorio de Git.

Canalización de PR con el entorno de ejecución de integración

Se considera un procedimiento recomendado mantener un entorno de producción aparte con el fin de evitar que todas las características de desarrollo estén accesibles públicamente.

Cuando la rama de características se combine correctamente en la rama de desarrollo, puedes crear una solicitud de incorporación de cambios en la rama de producción para que la característica recién combinada sea pública. Esta solicitud de incorporación de cambios desencadena la canalización de PR que realiza comprobaciones rápidas de calidad en la rama de desarrollo. Las comprobaciones de calidad garantizan que todas las características se integraron correctamente y que no hay errores en el entorno de producción.

Ventajas de usar el flujo de trabajo de CI/CD en el Administrador de orquestación de flujo de trabajo

  • Enfoque con respuesta rápida a errores: sin la integración del proceso de CI/CD, es probable que la primera vez que sepa que un DAG contiene errores sea cuando se inserta en GitHub, se sincroniza con el Administrador de orquestación de flujo de trabajo y genera un error Import Error. Mientras tanto, otro desarrollador puede extraer el error del repositorio sin saberlo, lo que puede dar lugar a deficiencias en la línea.
  • Mejora de la calidad del código: si no realizas las comprobaciones fundamentales, como la comprobación de la sintaxis, las importaciones necesarias y las comprobaciones de otros procedimientos recomendados de programación, puede aumentar la probabilidad de entregar código mediocre.

Implementación de patrones en el Administrador de orquestaciones de flujo de trabajo

Se recomiendan dos patrones de implementación.

Patrón 1: Desarrollo de canalizaciones de datos directamente en el Administrador de orquestación de flujo de trabajo

Puedes desarrollar canalizaciones de datos directamente en el Administrador de orquestación de flujo de trabajo cuando uses el patrón 1.

Requisitos previos

  • Si no tiene una suscripción a Azure, cree una cuenta gratuita antes de empezar. Crea o selecciona una instancia de Data Factory existente en la región donde se admita la versión preliminar del Administrador de orquestación de flujo de trabajo.
  • Necesitas acceso a un repositorio de GitHub.

Ventajas

  • No se requiere ningún entorno de desarrollo local: el Administrador de orquestación de flujo de trabajo controla la infraestructura subyacente, las actualizaciones y el mantenimiento, lo que reduce la sobrecarga operativa de la administración de clústeres de Airflow. El servicio le permite centrarse en la creación y administración de flujos de trabajo en lugar de administrar la infraestructura.
  • Escalabilidad: el Administrador de orquestación de flujo de trabajo proporciona capacidad de escalado automático para escalar los recursos según sea necesario, lo que garantiza que las canalizaciones de datos puedan controlar la ampliación de las cargas de trabajo o el aumento transitorio de la actividad sin intervención manual.
  • Supervisión y registro: el Administrador de orquestación de flujo de trabajo incluye registros de diagnóstico y supervisión para ayudarte a hacer un seguimiento de la ejecución de los flujos de trabajo, el diagnóstico de problemas, la configuración de alertas y la optimización del rendimiento.

Flujo de trabajo

  1. Utiliza la característica de sincronización de Git.

    En este flujo de trabajo, no es necesario que establezcas tu propio entorno local. En lugar de eso, puedes empezar usando la característica de sincronización de Git que ofrece el Administrador de orquestación de flujo de trabajo. Esta característica sincroniza automáticamente los archivos DAG con programadores, trabajadores y servidores web de Airflow. Ahora puedes desarrollar, probar y ejecutar las canalizaciones de datos directamente mediante la interfaz de usuario del Administrador de orquestación de flujo de trabajo.

    Obtén más información sobre cómo usar la característica de sincronización de Git del Administrador de orquestación de flujo de trabajo.

  2. Crea entornos de rama de características individual.

    Puedes elegir la rama del repositorio para sincronizarla con el Administrador de orquestación de flujo de trabajo. Esta capacidad te permite crear un entorno de Airflow individual para cada rama de características. De esta manera, los desarrolladores pueden trabajar en tareas específicas para canalizaciones de datos.

  3. Cree una solicitud de incorporación de cambios.

    Continúa con el envío de una solicitud de incorporación de cambios al entorno de ejecución de integración del entorno de desarrollo de Airflow después de desarrollar y probar exhaustivamente las características en tu entorno de Airflow dedicado.

Patrón 2: Desarrollo de DAG en modo local e implementación en el Administrador de orquestación de flujo de trabajo

Puedes desarrollar DAG en modo local e implementarlos en el Administrador de orquestación de flujo de trabajo cuando uses el patrón 2.

Requisitos previos

  • Necesitas acceso a un repositorio de GitHub.
  • Asegúrate de que al menos una sola rama del repositorio de código esté sincronizada con el Administrador de orquestación de flujo de trabajo para ver los cambios de código en el servicio.

Ventajas

Puedes limitar el acceso a los recursos de Azure solo a administradores.

Flujo de trabajo

  1. Configura un entorno local.

    Comience configurando un entorno de desarrollo local para Apache Airflow en la máquina de desarrollo. En este entorno, puede desarrollar y probar el código de Airflow, incluidos los DAG y las tareas. Este enfoque le permite desarrollar canalizaciones sin depender del acceso directo a los recursos de Azure.

  2. Utiliza la característica de sincronización de Git.

    Sincroniza la rama del repositorio de GitHub con el Administrador de orquestación de flujo de trabajo.

    Obtén más información sobre cómo usar la característica de sincronización de Git del Administrador de orquestación de flujo de trabajo.

  3. Usa el Administrador de orquestación de flujo de trabajo como entorno de producción.

    Después de desarrollar y probar correctamente las canalizaciones de datos en la configuración local, puedes generar una solicitud de incorporación de cambios a la rama sincronizada con el Administrador de orquestación de flujo de trabajo. Una vez combinada la rama, usa las características del Administrador de orquestación de flujo de trabajo, como el escalado automático, la supervisión y el registro en el nivel de producción.

Canalización de CI/CD de ejemplo

Para más información, vea:

  1. Copia el código de un DAG implementado en el entorno de ejecución de integración del Administrador de orquestación de flujo de trabajo mediante la característica de sincronización de Git.

    from datetime import datetime
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    with DAG(
        dag_id="airflow-ci-cd-tutorial",
        start_date=datetime(2023, 8, 15),
        schedule="0 0 * * *",
        tags=["tutorial", "CI/CD"]
    ) as dag:
        # Tasks are represented as operators
        task1 = BashOperator(task_id="task1", bash_command="echo task1")
        task2 = BashOperator(task_id="task2", bash_command="echo task2")
        task3 = BashOperator(task_id="task3", bash_command="echo task3")
        task4 = BashOperator(task_id="task4", bash_command="echo task4")
    
        # Set dependencies between tasks
        task1 >> task2 >> task3 >> task4
    
  2. Crea una canalización de CI/CD. Tienes dos opciones: Azure DevOps o Acciones de GitHub.

    1. Opción de Azure DevOps: crea el archivo azure-devops-ci-cd.yaml y copia el código siguiente. La canalización se desencadena en una solicitud de incorporación de cambios o una solicitud de inserción en la rama de desarrollo:

      trigger:
      - dev
      
      pr:
      - dev
      
      pool:
        vmImage: ubuntu-latest
      strategy:
        matrix:
          Python3.11:
            python.version: '3.11.5'
      
      steps:
      - task: UsePythonVersion@0
        inputs:
          versionSpec: '$(python.version)'
        displayName: 'Use Python $(python.version)'
      
      - script: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
        displayName: 'Install dependencies'
      
      - script: |
          airflow webserver &
          airflow db init
          airflow scheduler &
          pytest
        displayName: 'Pytest'
      

      Para obtener más información, consulte Azure Pipelines.

    2. Opción Acciones de GitHub: crea un directorio .github/workflows en el repositorio de GitHub.

      1. En el directorio .github/workflows, cree un archivo denominado github-actions-ci-cd.yml.

      2. Copia el código siguiente. La canalización se desencadena siempre que haya una solicitud de incorporación de cambios o una solicitud de inserción en la rama de desarrollo:

        name: GitHub Actions CI/CD
        
        on:
          pull_request:
            branches:
              - "dev"
          push:
            branches:
              - "dev"
        
        jobs:
          flake8:
            strategy:
              matrix:
                python-version: [3.11.5]
            runs-on: ubuntu-latest
            steps:
              - name: Check out source repository
                uses: actions/checkout@v4
              - name: Setup Python
                uses: actions/setup-python@v4
                with:
                  python-version: ${{matrix.python-version}}
              - name: flake8 Lint
                uses: py-actions/flake8@v1
                with:
                  max-line-length: 120
          tests:
            strategy:
              matrix:
                python-version: [3.11.5]
            runs-on: ubuntu-latest
            needs: [flake8]
            steps:
              - uses: actions/checkout@v4
              - name: Setup Python
                uses: actions/setup-python@v4
                with:
                  python-version: ${{matrix.python-version}}
              - name: Install dependencies
                run: |
                  python -m pip install --upgrade pip
                  pip install -r requirements.txt
              - name: Pytest
                run: |
                  airflow webserver &
                  airflow db init
                  airflow scheduler &
                  pytest tests/
        
  3. En la carpeta de pruebas, crea las pruebas para los DAG de Airflow. Estos son algunos ejemplos:

    1. Como mínimo, es fundamental realizar pruebas iniciales usando import_errors para garantizar la integridad y la corrección del DAG. Esta prueba garantiza:

      • El DAG no contiene ciclicidad: la ciclicidad (cuando una tarea forma un bucle o una dependencia circular dentro del flujo de trabajo) puede dar lugar a bucles de ejecución inesperados e infinitos.

      • No hay errores de importación: pueden surgir errores de importación debido a problemas como dependencias que faltan, rutas de acceso de módulo incorrectas o errores de codificación.  

      • Las tareas se han definido correctamente: confirme que las tareas del DAG están definidas correctamente.

        @pytest.fixture()
        
        def dagbag():
            return DagBag(dag_folder="dags")
        
        def test_no_import_errors(dagbag):
            """
            Test Dags to contain no import errors.
            """
            assert not dagbag.import_errors
        
    2. Haz una prueba para asegurarte de que los id. de DAG específicos están presentes en la rama de características antes de combinarlos en la rama de desarrollo.

      def test_expected_dags(dagbag):
          """
          Test whether expected dag Ids are present.
          """
          expected_dag_ids = ["airflow-ci-cd-tutorial"]
      
          for dag_id in expected_dag_ids:
              dag = dagbag.get_dag(dag_id)
      
              assert dag is not None
              assert dag_id == dag.dag_id
      
    3. Haz una prueba para asegurarte de que solo las etiquetas aprobadas están asociadas a los DAG. Esta prueba ayuda a implementar el uso de etiquetas aprobadas.

      def test_requires_approved_tag(dagbag):
          """
          Test if DAGS contain one or more tags from list of approved tags only.
          """
          Expected_tags = {"tutorial", "CI/CD"}
          dagIds = dagbag.dag_ids
      
          for id in dagIds:
              dag = dagbag.get_dag(id)
              assert dag.tags
              if Expected_tags:
                  assert not set(dag.tags) - Expected_tags
      
  4. Ahora, cuando generes una solicitud de incorporación de cambios en la rama de desarrollo, podrás ver que Acciones de GitHub desencadena la canalización de CI para ejecutar todas las pruebas.