Modelli CI/CD con Workflow Orchestration Manager

Importante

Il 1° gennaio 2026 non potrai più creare nuove istanze Airflow utilizzando il Workflow Orchestration Manager di ADF. Ti consigliamo di migrare tutti i carichi di lavoro di Workflow Orchestration Manager (Apache Airflow in Azure Data Factory) ai lavori Apache Airflow in Microsoft Fabric prima del 31 dicembre 2025.

Per maggiori informazioni o per supporto durante la migrazione ad Apache Airflow in Microsoft Fabric, contatta il supporto Microsoft.

Workflow Orchestration Manager offre un modo semplice ed efficiente per creare e gestire ambienti Apache Airflow. Il servizio consente di eseguire pipeline di dati su larga scala con facilità. Esistono due metodi principali per eseguire i dag in Workflow Orchestration Manager. È possibile caricare i file DAG nell'archivio BLOB e collegarli all'ambiente Airflow. In alternativa, è possibile usare la funzionalità di sincronizzazione Git per sincronizzare automaticamente il repository Git con l'ambiente Airflow.

Lavorare con le pipeline di dati in Airflow richiede di creare o aggiornare frequentemente i DAG, plug-in e file di requisiti, in base alle esigenze del flusso di lavoro. Anche se gli sviluppatori possono caricare o modificare manualmente i file DAG nell'archiviazione BLOB, molte organizzazioni preferiscono usare un approccio di integrazione continua e recapito continuo (CI/CD) per la distribuzione del codice. Questo articolo illustra i modelli di distribuzione consigliati per integrare e distribuire facilmente i dag Apache Airflow con Workflow Orchestration Manager.

Comprendere CI/CD

Integrazione continua

L'integrazione continua è una pratica di sviluppo software che enfatizza l'integrazione frequente e automatizzata delle modifiche del codice in un repository condiviso. Implica che gli sviluppatori eseguano regolarmente il commit del codice e, a ogni commit, una pipeline ci automatizzata compila il codice, esegue test ed esegue controlli di convalida. Gli obiettivi principali sono rilevare e risolvere i problemi di integrazione nelle prime fasi del processo di sviluppo e fornire feedback rapido agli sviluppatori.

L'integrazione continua (CI) garantisce che la codebase rimanga in uno stato costantemente testabile e distribuibile. Questa pratica porta a una migliore qualità del codice, collaborazione e la possibilità di rilevare e correggere i bug prima che diventino problemi significativi.

Distribuzione continua

La distribuzione continua è un'estensione dell'integrazione continua che porta l'automazione a un livello successivo. Mentre l'IC (Integrazione Continua) è incentrata sull'automazione delle fasi di integrazione e test, la CD (Consegna Continua) automatizza la distribuzione delle modifiche del codice negli ambienti di produzione o in altri ambienti di destinazione. Questa procedura consente alle organizzazioni di rilasciare gli aggiornamenti software in modo rapido e affidabile. Riduce gli errori nella distribuzione manuale e garantisce che le modifiche al codice approvate vengano recapitate rapidamente agli utenti.

Flusso di lavoro CI/CD in Workflow Orchestration Manager

Screenshot che mostra il modello CI/CD che può essere usato in Workflow Orchestration Manager.

Sincronizzazione Git con il runtime di integrazione Dev/QA

Mappa l'ambiente Workflow Orchestration Manager con il ramo sviluppo/QA del repository Git.

Pipeline CI con il runtime di integrazione Dev/QA

Quando viene effettuata una pull request da un ramo di funzionalità al ramo di sviluppo, viene avviata una pipeline di pull request. Questa pipeline è progettata per eseguire in modo efficiente controlli qualitativi sui rami di funzionalità, garantendo l'integrità e l'affidabilità del codice. È possibile includere i tipi di controlli seguenti nella pipeline:

  • Test delle dipendenze python: Questi test installano e verificano la correttezza delle dipendenze python per assicurarsi che le dipendenze del progetto siano configurate correttamente.
  • Analisi del codice e linting: Gli strumenti per l'analisi del codice statico e il linting vengono applicati per valutare la qualità del codice e la conformità agli standard di codifica.
  • Test DAG di Airflow: Questi test eseguono test di convalida, inclusi i test per la definizione del DAG e gli unit test progettati per i DAG di Airflow.
  • Unit test per operatori personalizzati, hook, sensori e trigger Airflow

Se uno di questi controlli ha esito negativo, la pipeline termina. È quindi necessario risolvere i problemi identificati.

Sincronizzazione Git con il runtime di integrazione di produzione

Mappa il tuo ambiente Workflow Orchestration Manager con il branch di produzione del repository Git.

Pipeline di richiesta pull con il runtime di integrazione di produzione

Una procedura consigliata consiste nel mantenere un ambiente di produzione separato per impedire che ogni funzionalità di sviluppo diventi accessibile pubblicamente.

Dopo che il ramo di funzionalità viene unito correttamente nel ramo di sviluppo, è possibile creare una richiesta pull al ramo di produzione per rendere pubblica la nuova funzionalità unita. Questa richiesta pull attiva la pipeline di richiesta pull che esegue rapidi controlli di qualità nel ramo di sviluppo. I controlli qualitativi assicurano che tutte le funzionalità siano state integrate correttamente e che non siano presenti errori nell'ambiente di produzione.

Vantaggi dell'uso del flusso di lavoro CI/CD in Workflow Orchestration Manager

  • Approccio rapido con esito negativo: Senza l'integrazione del processo CI/CD, la prima volta che si sa che DAG contiene errori è probabile che venga eseguito il push in GitHub, sincronizzato con Workflow Orchestration Manager e generi un'eccezione Import Error. Nel frattempo, un altro sviluppatore può inconsapevolmente estrarre il codice difettoso dal repository, il che può portare a inefficienze a lungo termine.
  • Miglioramento della qualità del codice: Se si ignorano controlli fondamentali come la verifica della sintassi, le importazioni necessarie e i controlli per altre migliori pratiche di codifica, aumenta la probabilità di distribuire codice di qualità inferiore.

Modelli di distribuzione in Workflow Orchestration Manager

È consigliabile usare due modelli di distribuzione.

Modello 1: Sviluppare pipeline di dati direttamente in Workflow Orchestration Manager

È possibile sviluppare pipeline di dati direttamente in Workflow Orchestration Manager quando si usa il modello 1.

Prerequisiti

  • Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare. Creare o selezionare un'istanza di Data Factory esistente nell'area in cui è supportata l'anteprima di Workflow Orchestration Manager.
  • È necessario accedere a un repository GitHub.

Vantaggi

  • Non è necessario alcun ambiente di sviluppo locale: Workflow Orchestration Manager gestisce l'infrastruttura, gli aggiornamenti e la manutenzione sottostanti, riducendo il sovraccarico operativo della gestione dei cluster Airflow. Il servizio consente di concentrarsi sulla creazione e la gestione dei flussi di lavoro anziché sulla gestione dell'infrastruttura.
  • Scalabilità: Workflow Orchestration Manager offre funzionalità di scalabilità automatica per ridimensionare le risorse in base alle esigenze, assicurandosi che le pipeline di dati possano gestire carichi di lavoro crescenti o picchi di attività senza intervento manuale.
  • Monitoraggio e registrazione: Workflow Orchestration Manager include log di diagnostica e monitoraggio che consentono di tenere traccia dell'esecuzione dei flussi di lavoro, diagnosticare i problemi, configurare gli avvisi e ottimizzare le prestazioni.

Flusso di lavoro

  1. Usare la funzionalità di sincronizzazione Git.

    In questo flusso di lavoro non è necessario stabilire un ambiente locale. È invece possibile iniziare usando la funzionalità di sincronizzazione Git offerta da Workflow Orchestration Manager. Questa funzionalità sincronizza automaticamente i file DAG con i server web di Airflow, gli scheduler e i lavoratori. È ora possibile sviluppare, testare ed eseguire le pipeline di dati direttamente tramite l'interfaccia utente di Workflow Orchestration Manager.

    Altre informazioni su come usare la funzionalità di sincronizzazione Git di Workflow Orchestration Manager.

  2. Creare singoli ambienti del ramo di funzionalità.

    È possibile scegliere il ramo dal repository per la sincronizzazione con Workflow Orchestration Manager. Questa funzionalità consente di creare un singolo ambiente Airflow per ogni ramo di funzionalità. In questo modo, gli sviluppatori possono lavorare su attività specifiche per le pipeline di dati.

  3. Creare una richiesta pull.

    Continuare a inviare una richiesta pull al runtime di integrazione dell'ambiente di sviluppo Airflow dopo aver sviluppato e testato accuratamente le funzionalità all'interno dell'ambiente Airflow dedicato.

Modello 2: Sviluppare DAG in locale e distribuire nel Manager di Orchestrazione dei Flussi di Lavoro

È possibile sviluppare DAGs localmente e distribuirli in Workflow Orchestration Manager quando si utilizza il pattern 2.

Prerequisiti

  • È necessario accedere a un repository GitHub.
  • Assicurarsi che almeno un singolo ramo del repository di codice sia sincronizzato con Workflow Orchestration Manager per visualizzare le modifiche al codice nel servizio.

Vantaggi

È possibile limitare l'accesso alle risorse di Azure solo agli amministratori.

Flusso di lavoro

  1. Configurare un ambiente locale.

    Iniziare configurando un ambiente di sviluppo locale per Apache Airflow nel computer di sviluppo. In questo ambiente è possibile sviluppare e testare il codice Airflow, inclusi i dag e le attività. Questo approccio consente di sviluppare pipeline senza basarsi sull'accesso diretto alle risorse di Azure.

  2. Usare la funzionalità di sincronizzazione Git.

    Sincronizza il ramo del repository GitHub con il Gestore dell'Orchestrazione del Flusso di Lavoro.

    Altre informazioni su come usare la funzionalità di sincronizzazione Git di Workflow Orchestration Manager.

  3. Usare Workflow Orchestration Manager come ambiente di produzione.

    Dopo aver sviluppato e testato correttamente le pipeline di dati nella configurazione locale, è possibile generare una richiesta pull al ramo sincronizzato con Workflow Orchestration Manager. Dopo aver unito il ramo, usare le funzionalità di Workflow Orchestration Manager, ad esempio la scalabilità automatica e il monitoraggio e la registrazione a livello di produzione.

Pipeline CI/CD di esempio

Per ulteriori informazioni, vedere:

  1. Copiare il codice di un daG distribuito in Workflow Orchestration Manager integration runtime usando la funzionalità di sincronizzazione Git.

    from datetime import datetime
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    with DAG(
        dag_id="airflow-ci-cd-tutorial",
        start_date=datetime(2023, 8, 15),
        schedule="0 0 * * *",
        tags=["tutorial", "CI/CD"]
    ) as dag:
        # Tasks are represented as operators
        task1 = BashOperator(task_id="task1", bash_command="echo task1")
        task2 = BashOperator(task_id="task2", bash_command="echo task2")
        task3 = BashOperator(task_id="task3", bash_command="echo task3")
        task4 = BashOperator(task_id="task4", bash_command="echo task4")
    
        # Set dependencies between tasks
        task1 >> task2 >> task3 >> task4
    
  2. Creare una pipeline CI/CD. Sono disponibili due opzioni: Azure DevOps o GitHub actions.

    1. Opzione Azure DevOps: creare il file azure-devops-ci-cd.yaml e copiare il codice seguente. La pipeline viene attivata in corrispondenza di una richiesta pull o una richiesta push al ramo di sviluppo:

      trigger:
      - dev
      
      pr:
      - dev
      
      pool:
        vmImage: ubuntu-latest
      strategy:
        matrix:
          Python3.11:
            python.version: '3.11.5'
      
      steps:
      - task: UsePythonVersion@0
        inputs:
          versionSpec: '$(python.version)'
        displayName: 'Use Python $(python.version)'
      
      - script: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
        displayName: 'Install dependencies'
      
      - script: |
          airflow webserver &
          airflow db init
          airflow scheduler &
          pytest
        displayName: 'Pytest'
      

      Per altre informazioni, vedere Azure Pipelines.

    2. Opzione GitHub Actions: creare una directory .github/workflows nel repository GitHub.

      1. Nella directory .github/workflows, creare un file denominato github-actions-ci-cd.yml.

      2. Copiare il codice seguente. La pipeline viene attivata ogni volta che è presente una richiesta pull o una richiesta push al ramo di sviluppo:

        name: GitHub Actions CI/CD
        
        on:
          pull_request:
            branches:
              - "dev"
          push:
            branches:
              - "dev"
        
        jobs:
          flake8:
            strategy:
              matrix:
                python-version: [3.11.5]
            runs-on: ubuntu-latest
            steps:
              - name: Check out source repository
                uses: actions/checkout@v4
              - name: Setup Python
                uses: actions/setup-python@v4
                with:
                  python-version: ${{matrix.python-version}}
              - name: flake8 Lint
                uses: py-actions/flake8@v1
                with:
                  max-line-length: 120
          tests:
            strategy:
              matrix:
                python-version: [3.11.5]
            runs-on: ubuntu-latest
            needs: [flake8]
            steps:
              - uses: actions/checkout@v4
              - name: Setup Python
                uses: actions/setup-python@v4
                with:
                  python-version: ${{matrix.python-version}}
              - name: Install dependencies
                run: |
                  python -m pip install --upgrade pip
                  pip install -r requirements.txt
              - name: Pytest
                run: |
                  airflow webserver &
                  airflow db init
                  airflow scheduler &
                  pytest tests/
        
  3. Nella cartella dei test, creare i test per i DAG di Airflow. Ecco alcuni esempi:

    1. Almeno, è fondamentale eseguire test iniziali usando import_errors per garantire l'integrità e la correttezza del DAG. Questo test garantisce:

      • Il DAG non contiene ciclicità: La ciclicità, in cui un'attività costituisce un ciclo o una dipendenza circolare all'interno del flusso di lavoro, può causare cicli di esecuzione imprevisti e infiniti.

      • Non sono presenti errori di importazione: Gli errori di importazione possono verificarsi a causa di problemi quali dipendenze mancanti, percorsi di modulo non corretti o errori di codifica.  

      • Le attività sono definite correttamente: Verificare che le attività all'interno del DAG siano definite correttamente.

        @pytest.fixture()
        
        def dagbag():
            return DagBag(dag_folder="dags")
        
        def test_no_import_errors(dagbag):
            """
            Test Dags to contain no import errors.
            """
            assert not dagbag.import_errors
        
    2. Testare per assicurarsi che nel ramo di funzionalità siano presenti ID DAG specifici prima di unirli al ramo di sviluppo.

      def test_expected_dags(dagbag):
          """
          Test whether expected dag Ids are present.
          """
          expected_dag_ids = ["airflow-ci-cd-tutorial"]
      
          for dag_id in expected_dag_ids:
              dag = dagbag.get_dag(dag_id)
      
              assert dag is not None
              assert dag_id == dag.dag_id
      
    3. Esegui il test per assicurarti del fatto che solo i tag approvati siano associati ai DAG. Questo test consente di applicare l'utilizzo dei tag approvato.

      def test_requires_approved_tag(dagbag):
          """
          Test if DAGS contain one or more tags from list of approved tags only.
          """
          Expected_tags = {"tutorial", "CI/CD"}
          dagIds = dagbag.dag_ids
      
          for id in dagIds:
              dag = dagbag.get_dag(id)
              assert dag.tags
              if Expected_tags:
                  assert not set(dag.tags) - Expected_tags
      
  4. Ora, quando si genera una richiesta pull al ramo di sviluppo, è possibile notare che GitHub actions attiva la pipeline CI per eseguire tutti i test.