Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Nota
Workflow Orchestration Manager è basato su Apache Airflow.
Workflow Orchestration Manager offre un modo semplice ed efficiente per creare e gestire ambienti Apache Airflow. Il servizio consente di eseguire pipeline di dati su larga scala con facilità. Esistono due metodi principali per eseguire i dag in Workflow Orchestration Manager. È possibile caricare i file DAG nell'archivio BLOB e collegarli all'ambiente Airflow. In alternativa, è possibile usare la funzionalità di sincronizzazione Git per sincronizzare automaticamente il repository Git con l'ambiente Airflow.
L'uso di pipeline di dati in Airflow richiede di creare o aggiornare frequentemente i file di disponibilità, plug-in e requisiti in base alle esigenze del flusso di lavoro. Anche se gli sviluppatori possono caricare o modificare manualmente i file DAG nell'archiviazione BLOB, molte organizzazioni preferiscono usare un approccio di integrazione continua e recapito continuo (CI/CD) per la distribuzione del codice. Questo articolo illustra i modelli di distribuzione consigliati per integrare e distribuire facilmente i dag Apache Airflow con Workflow Orchestration Manager.
Informazioni su CI/CD
Integrazione continua
L'integrazione continua è una pratica di sviluppo software che enfatizza l'integrazione frequente e automatizzata delle modifiche del codice in un repository condiviso. Implica che gli sviluppatori eseguano regolarmente il commit del codice e, a ogni commit, una pipeline ci automatizzata compila il codice, esegue test ed esegue controlli di convalida. Gli obiettivi principali sono rilevare e risolvere i problemi di integrazione nelle prime fasi del processo di sviluppo e fornire feedback rapido agli sviluppatori.
L'integrazione continua garantisce che la codebase rimanga in uno stato testabile e distribuibile costantemente. Questa pratica porta a una migliore qualità del codice, collaborazione e la possibilità di rilevare e correggere i bug prima che diventino problemi significativi.
Recapito continuo
Il recapito continuo è un'estensione di integrazione continua che consente di eseguire l'automazione un ulteriore passo avanti. Mentre l'integrazione continua è incentrata sull'automazione delle fasi di integrazione e test, CD automatizza la distribuzione delle modifiche del codice in ambienti di produzione o altri ambienti di destinazione. Questa procedura consente alle organizzazioni di rilasciare gli aggiornamenti software in modo rapido e affidabile. Riduce gli errori nella distribuzione manuale e garantisce che le modifiche al codice approvate vengano recapitate rapidamente agli utenti.
Flusso di lavoro CI/CD in Workflow Orchestration Manager
Sincronizzazione Git con il runtime di integrazione Dev/QA
Eseguire il mapping dell'ambiente Workflow Orchestration Manager con il ramo di sviluppo/controllo di qualità del repository Git.
Pipeline CI con il runtime di integrazione dev/QA
Quando viene effettuata una richiesta pull da un ramo di funzionalità al ramo di sviluppo, attiva una pipeline di richiesta pull. Questa pipeline è progettata per eseguire in modo efficiente controlli qualitativi sui rami di funzionalità, garantendo l'integrità e l'affidabilità del codice. È possibile includere i tipi di controlli seguenti nella pipeline:
- Test delle dipendenze python: questi test installano e verificano la correttezza delle dipendenze Python per assicurarsi che le dipendenze del progetto siano configurate correttamente.
- Analisi del codice e linting: gli strumenti per l'analisi e l'linting statici del codice vengono applicati per valutare la qualità del codice e la conformità agli standard di codifica.
- Test DAG airflow: questi test eseguono test di convalida, inclusi i test per la definizione del DAG e gli unit test progettati per i dag airflow.
- Unit test per operatori personalizzati, hook, sensori e trigger airflow
Se uno di questi controlli ha esito negativo, la pipeline termina. È quindi necessario risolvere i problemi identificati.
Sincronizzazione Git con il runtime di integrazione di produzione
Eseguire il mapping dell'ambiente Workflow Orchestration Manager con il ramo di produzione del repository Git.
Pipeline di richiesta pull con il runtime di integrazione di produzione
Una procedura consigliata consiste nel mantenere un ambiente di produzione separato per impedire che ogni funzionalità di sviluppo diventi accessibile pubblicamente.
Dopo che il ramo di funzionalità viene unito correttamente nel ramo di sviluppo, è possibile creare una richiesta pull al ramo di produzione per rendere pubblica la nuova funzionalità unita. Questa richiesta pull attiva la pipeline di richiesta pull che esegue rapidi controlli di qualità nel ramo di sviluppo. I controlli qualitativi assicurano che tutte le funzionalità siano state integrate correttamente e che non siano presenti errori nell'ambiente di produzione.
Vantaggi dell'uso del flusso di lavoro CI/CD in Workflow Orchestration Manager
- Approccio rapido con esito negativo: senza l'integrazione del processo CI/CD, la prima volta che si sa che il dag contiene errori è probabile quando viene eseguito il push in GitHub, sincronizzato con Workflow Orchestration Manager e genera un'eccezione
Import Error
. Nel frattempo, un altro sviluppatore può inconsapevolmente estrarre il codice difettoso dal repository, che potenzialmente porta a inefficienze nella riga. - Miglioramento della qualità del codice: se si ignorano controlli fondamentali come la verifica della sintassi, le importazioni necessarie e verifica la presenza di altre procedure consigliate per la codifica, aumenta la probabilità di distribuire codice secondario.
Modelli di distribuzione in Workflow Orchestration Manager
È consigliabile usare due modelli di distribuzione.
Modello 1: Sviluppare pipeline di dati direttamente in Workflow Orchestration Manager
È possibile sviluppare pipeline di dati direttamente in Workflow Orchestration Manager quando si usa il modello 1.
Prerequisiti
- Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare. Creare o selezionare un'istanza di Data Factory esistente nell'area in cui è supportata l'anteprima di Workflow Orchestration Manager.
- È necessario accedere a un repository GitHub.
Vantaggi
- Nessun ambiente di sviluppo locale richiesto: Workflow Orchestration Manager gestisce l'infrastruttura, gli aggiornamenti e la manutenzione sottostanti, riducendo il sovraccarico operativo della gestione dei cluster Airflow. Il servizio consente di concentrarsi sulla creazione e la gestione dei flussi di lavoro anziché sulla gestione dell'infrastruttura.
- Scalabilità: Workflow Orchestration Manager offre funzionalità di scalabilità automatica per ridimensionare le risorse in base alle esigenze, assicurandosi che le pipeline di dati possano gestire carichi di lavoro crescenti o picchi di attività senza intervento manuale.
- Monitoraggio e registrazione: Workflow Orchestration Manager include log di diagnostica e monitoraggio per tenere traccia dell'esecuzione dei flussi di lavoro, diagnosticare i problemi, configurare gli avvisi e ottimizzare le prestazioni.
Workflow
Usare la funzionalità di sincronizzazione Git.
In questo flusso di lavoro non è necessario stabilire un ambiente locale. È invece possibile iniziare usando la funzionalità di sincronizzazione Git offerta da Workflow Orchestration Manager. Questa funzionalità sincronizza automaticamente i file DAG con server Web Airflow, utilità di pianificazione e ruoli di lavoro. È ora possibile sviluppare, testare ed eseguire le pipeline di dati direttamente tramite l'interfaccia utente di Workflow Orchestration Manager.
Altre informazioni su come usare la funzionalità di sincronizzazione Git di Workflow Orchestration Manager.
Creare singoli ambienti del ramo di funzionalità.
È possibile scegliere il ramo dal repository per la sincronizzazione con Workflow Orchestration Manager. Questa funzionalità consente di creare un singolo ambiente Airflow per ogni ramo di funzionalità. In questo modo, gli sviluppatori possono lavorare su attività specifiche per le pipeline di dati.
Creare una richiesta pull.
Continuare a inviare una richiesta pull al runtime di integrazione dell'ambiente di sviluppo Airflow dopo aver sviluppato e testato accuratamente le funzionalità all'interno dell'ambiente Airflow dedicato.
Modello 2: Sviluppare dag in locale e distribuire in Gestione orchestrazione flussi di lavoro
È possibile sviluppare gruppi di disponibilità dati in locale e distribuirli in Workflow Orchestration Manager quando si usa il modello 2.
Prerequisiti
- È necessario accedere a un repository GitHub.
- Assicurarsi che almeno un singolo ramo del repository di codice sia sincronizzato con Workflow Orchestration Manager per visualizzare le modifiche al codice nel servizio.
Vantaggi
È possibile limitare l'accesso alle risorse di Azure solo agli amministratori.
Workflow
Configurare un ambiente locale.
Iniziare configurando un ambiente di sviluppo locale per Apache Airflow nel computer di sviluppo. In questo ambiente è possibile sviluppare e testare il codice Airflow, inclusi i dag e le attività. Questo approccio consente di sviluppare pipeline senza basarsi sull'accesso diretto alle risorse di Azure.
Usare la funzionalità di sincronizzazione Git.
Sincronizzare il ramo del repository GitHub con Workflow Orchestration Manager.
Altre informazioni su come usare la funzionalità di sincronizzazione Git di Workflow Orchestration Manager.
Usare Workflow Orchestration Manager come ambiente di produzione.
Dopo aver sviluppato e testato correttamente le pipeline di dati nella configurazione locale, è possibile generare una richiesta pull al ramo sincronizzato con Workflow Orchestration Manager. Dopo aver unito il ramo, usare le funzionalità di Workflow Orchestration Manager, ad esempio la scalabilità automatica e il monitoraggio e la registrazione a livello di produzione.
Pipeline CI/CD di esempio
Per altre informazioni, vedi:
Copiare il codice di un daG distribuito in Workflow Orchestration Manager integration runtime usando la funzionalità di sincronizzazione Git.
from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator with DAG( dag_id="airflow-ci-cd-tutorial", start_date=datetime(2023, 8, 15), schedule="0 0 * * *", tags=["tutorial", "CI/CD"] ) as dag: # Tasks are represented as operators task1 = BashOperator(task_id="task1", bash_command="echo task1") task2 = BashOperator(task_id="task2", bash_command="echo task2") task3 = BashOperator(task_id="task3", bash_command="echo task3") task4 = BashOperator(task_id="task4", bash_command="echo task4") # Set dependencies between tasks task1 >> task2 >> task3 >> task4
Creare una pipeline CI/CD. Sono disponibili due opzioni: Azure DevOps o GitHub actions.
Opzione Azure DevOps: creare il file
azure-devops-ci-cd.yaml
e copiare il codice seguente. La pipeline viene attivata in una richiesta pull o in una richiesta push al ramo di sviluppo:trigger: - dev pr: - dev pool: vmImage: ubuntu-latest strategy: matrix: Python3.11: python.version: '3.11.5' steps: - task: UsePythonVersion@0 inputs: versionSpec: '$(python.version)' displayName: 'Use Python $(python.version)' - script: | python -m pip install --upgrade pip pip install -r requirements.txt displayName: 'Install dependencies' - script: | airflow webserver & airflow db init airflow scheduler & pytest displayName: 'Pytest'
Per altre informazioni, vedere Azure Pipelines.
Opzione GitHub actions: creare una
.github/workflows
directory nel repository GitHub.Creare un nuovo file denominato
github-actions-ci-cd.yml
nella directory.github/workflows
.Copiare il codice seguente. La pipeline viene attivata ogni volta che è presente una richiesta pull o una richiesta push al ramo di sviluppo:
name: GitHub Actions CI/CD on: pull_request: branches: - "dev" push: branches: - "dev" jobs: flake8: strategy: matrix: python-version: [3.11.5] runs-on: ubuntu-latest steps: - name: Check out source repository uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v4 with: python-version: ${{matrix.python-version}} - name: flake8 Lint uses: py-actions/flake8@v1 with: max-line-length: 120 tests: strategy: matrix: python-version: [3.11.5] runs-on: ubuntu-latest needs: [flake8] steps: - uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v4 with: python-version: ${{matrix.python-version}} - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt - name: Pytest run: | airflow webserver & airflow db init airflow scheduler & pytest tests/
Nella cartella test creare i test per i dag Airflow. Ecco alcuni esempi:
Almeno, è fondamentale eseguire test iniziali usando
import_errors
per garantire l'integrità e la correttezza del DAG. Questo test garantisce:Il DAG non contiene cyclicity: Cyclicity, dove un'attività costituisce un ciclo o una dipendenza circolare all'interno del flusso di lavoro, può causare cicli di esecuzione imprevisti e infiniti.
Non sono presenti errori di importazione: gli errori di importazione possono verificarsi a causa di problemi come dipendenze mancanti, percorsi di modulo non corretti o errori di codifica.
Le attività sono definite correttamente: verificare che le attività all'interno del DAG siano definite correttamente.
@pytest.fixture() def dagbag(): return DagBag(dag_folder="dags") def test_no_import_errors(dagbag): """ Test Dags to contain no import errors. """ assert not dagbag.import_errors
Testare per assicurarsi che nel ramo di funzionalità siano presenti ID DAG specifici prima di unirli al ramo di sviluppo.
def test_expected_dags(dagbag): """ Test whether expected dag Ids are present. """ expected_dag_ids = ["airflow-ci-cd-tutorial"] for dag_id in expected_dag_ids: dag = dagbag.get_dag(dag_id) assert dag is not None assert dag_id == dag.dag_id
Eseguire il test per assicurarsi che solo i tag approvati siano associati ai dag. Questo test consente di applicare l'utilizzo dei tag approvato.
def test_requires_approved_tag(dagbag): """ Test if DAGS contain one or more tags from list of approved tags only. """ Expected_tags = {"tutorial", "CI/CD"} dagIds = dagbag.dag_ids for id in dagIds: dag = dagbag.get_dag(id) assert dag.tags if Expected_tags: assert not set(dag.tags) - Expected_tags
Ora, quando si genera una richiesta pull al ramo di sviluppo, è possibile notare che GitHub actions attiva la pipeline CI per eseguire tutti i test.