Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Hinweis
Workflow Orchestration Manager wird von Apache Airflow unterstützt.
Workflow Orchestration Manager bietet eine einfache und effiziente Möglichkeit zum Erstellen und Verwalten von Apache Airflow-Umgebungen. Mit dem Dienst können Sie Datenpipelines mühelos ausführen. Es gibt zwei primäre Methoden zum Ausführen von DAGs im Workflow Orchestration Manager. Sie können die DAG-Dateien in Ihren BLOB-Speicher hochladen und mit der Airflow-Umgebung verknüpfen. Alternativ können Sie das Git-Synchronisierungsfeature verwenden, um Ihr Git-Repository automatisch mit der Airflow-Umgebung zu synchronisieren.
Wenn Sie Datenpipelines in Airflow verwenden, müssen Sie basierend auf Ihren Workflowanforderungen häufig Ihre DAGs, Plug-Ins und Anforderungsdateien erstellen oder aktualisieren. Entwickler können DAG-Dateien zwar manuell in BLOB-Speicher hochladen oder sie dort bearbeiten, aber viele Organisationen bevorzugen die Verwendung eines Ansatzes von Continuous Integration und Continuous Delivery (CI/CD) für die Codebereitstellung. Dieser Artikel führt Sie durch die empfohlenen Bereitstellungsmuster, um Ihre Apache Airflow DAGs nahtlos in Workflow Orchestration Manager zu integrieren und bereitzustellen.
Grundlegendes zu CI/CD
Continuous Integration
Continuous Integration ist eine Softwareentwicklungsmethode, bei der die häufige und automatisierte Integration von Codeänderungen in ein gemeinsam genutztes Repository im Vordergrund steht. Bei dieser Methode committen Entwickler*innen regelmäßig ihren Code, und dieser wird dann jeweils von einer automatisierten CI-Pipeline erstellt, getestet und überprüft. Das Hauptziel besteht darin, Integrationsprobleme frühzeitig im Entwicklungsprozess zu erkennen und zu behandeln und Entwicklern schnell Feedback bereitzustellen.
CI sorgt dafür, dass sich die Codebasis stets in einem testbaren und bereitstellungsfähigen Zustand befindet. Diese Methode führt zu verbesserter Codequalität, Zusammenarbeit und der Fähigkeit, Fehler zu erkennen und zu beheben, bevor sie zu erheblichen Problemen werden.
Kontinuierliche Zustellung
Continuous Delivery ist eine Erweiterung von CI und geht bei der Automatisierung noch einen Schritt weiter. Während bei CI die Automatisierung der Integrations- und Testphasen im Mittelpunkt steht, automatisiert CD die Bereitstellung von Codeänderungen in der Produktionsumgebung oder in anderen Zielumgebungen. Mit dieser Methode können Organisationen Softwareupdates schnell und zuverlässig freigeben. Sie reduziert Fehler bei der manuellen Bereitstellung und stellt sicher, dass genehmigte Codeänderungen schnell an Benutzer übermittelt werden.
CI/CD-Workflow im Workflow Orchestration Manager
Git-Synchronisierung mit Dev/QA-Integration Runtime
Ordnen Sie Ihre Workflow Orchestration Manager-Umgebung mit der Entwicklungs-/QA-Verzweigung Ihres Git-Repositorys zu.
CI-Pipeline mit Entwicklungs-/QA-Integration Runtime
Wenn ein Pull Request (PR) aus einem Featurebranch an den Entwicklungsbranch gerichtet wird, wird dadurch eine PR-Pipeline ausgelöst. Diese Pipeline dient zur effizienten Durchführung von Qualitätsprüfungen für Ihre Featurebranches, um die Integrität und Zuverlässigkeit des Codes sicherzustellen. Sie können die folgenden Arten von Prüfungen in die Pipeline einschließen:
- Testen von Python-Abhängigkeiten: Diese Tests installieren Python-Abhängigkeiten und überprüfen, ob sie korrekt sind, um sicherzustellen, dass die Abhängigkeiten des Projekts ordnungsgemäß konfiguriert sind.
- Codeanalyse und Linten: Tools für die statische Codeanalyse und für Linten werden angewendet, um die Codequalität und die Einhaltung von Programmierstandards zu bewerten.
- Tests für Airflow-DAGs: Diese Tests führen Validierungstests aus. Hierzu zählen auch Tests für die DAG-Definition sowie für Airflow-DAGs konzipierte Komponententests.
- Komponententests für benutzerdefinierte Airflow-Operatoren, Hooks, Sensoren und Trigger
Wenn eine dieser Prüfungen fehlschlägt, wird die Pipeline beendet. Anschließend müssen Sie die identifizierten Probleme beheben.
Git-Synchronisierung mit Integration Runtime für die Produktion
Ordnen Sie Ihre Workflow Orchestration Manager-Umgebung mit dem Produktionsbranch Ihres Git-Repositorys zu.
PR-Pipeline mit Integration Runtime für die Produktion
Eine bewährte Methode ist eine separate Produktionsumgebung zu verwenden, um zu verhindern, dass jedes Entwicklungsfeature öffentlich zugänglich wird.
Nach erfolgreicher Zusammenführung von Featurebranch und Entwicklungsbranch können Sie einen Pull Request für den Produktionsbranch erstellen, um Ihr neu zusammengeführtes Feature öffentlich zugänglich zu machen. Diese Pullanforderung löst die PR-Pipeline aus, die schnelle Qualitätsprüfungen für den Entwicklungsbranch durchführt. Qualitätsprüfungen stellen sicher, dass alle Features korrekt integriert wurden und es keine Fehler in der Produktionsumgebung gibt.
Vorteile der Verwendung des CI/CD-Workflows im Workflow Orchestration Manager
- Fail-Fast-Ansatz: Ohne Integration des CI/CD-Prozesses werden Sie wahrscheinlich erstmals auf DAG-Fehler aufmerksam, wenn der gerichtete azyklische Graph an GitHub gepusht und mit Workflow Orchestration Manager synchronisiert wird und dabei ein
Import Error
auftritt. Inzwischen pullen unter Umständen andere Entwickler den fehlerhaften Code aus dem Repository, was ggf. zu späteren Ineffizienzen führt. - Verbesserung der Codequalität: Wenn Sie grundlegende Überprüfungen wie die Syntaxüberprüfung, notwendige Importe und Überprüfungen auf andere bewährte Programmiermethoden vernachlässigen, erhöht sich die Wahrscheinlichkeit, dass suboptimaler Code bereitgestellt wird.
Bereitstellungsmuster im Workflow Orchestration Manager
Wir empfehlen zwei Bereitstellungsmuster.
Muster 1: Entwickeln von Datenpipelines direkt im Workflow Orchestration Manager
Sie können Datenpipelines direkt im Workflow Orchestration Manager entwickeln, wenn Sie Muster 1 verwenden.
Voraussetzungen
- Wenn Sie kein Azure-Abonnement besitzen, können Sie ein kostenloses Konto erstellen, bevor Sie beginnen. Erstellen oder wählen Sie eine vorhandene Data Factory-Instanz in der Region aus, in der die Vorschau des Workflow Orchestration Managers unterstützt wird.
- Sie benötigen Zugriff auf ein GitHub-Repository.
Vorteile
- Keine lokale Entwicklungsumgebung erforderlich: Workflow Orchestration Manager kümmert sich um die zugrunde liegende Infrastruktur sowie um Updates und Wartung, was den betrieblichen Aufwand für die Verwaltung von Airflow-Cluster reduziert. Der Dienst ermöglicht es Ihnen, sich auf die Erstellung und Verwaltung von Workflows zu konzentrieren anstatt auf die Verwaltung der Infrastruktur.
- Skalierbarkeit: Workflow Orchestration Manager bietet eine automatische Skalierungsfunktion, um Ressourcen nach Bedarf zu skalieren. Dadurch wird sichergestellt, dass Ihre Datenpipelines zunehmende Workloads oder Aktivitätsspitzen ohne manuelle Eingriffe bewältigen können.
- Überwachung und Protokollierung: Der Workflow Orchestration Manager umfasst Diagnoseprotokolle und Überwachung, um Ihnen zu helfen die Ausführung Ihrer Workflows nachzuverfolgen, Probleme zu diagnostizieren, Warnungen einzurichten und die Leistung zu optimieren.
Workflow
Verwenden Sie die Git-Synchronisierungsfunktion.
In diesem Workflow muss keine eigene lokale Umgebung eingerichtet werden. Stattdessen können Sie mit der Git-Synchronisierungsfunktion beginnen, die von Workflow Orchestration Manager angeboten wird. Mit diesem Feature werden Ihre DAG-Dateien automatisch mit Airflow-Webservern, -Planern und -Mitarbeitern synchronisiert. Jetzt können Sie Ihre Datenpipelines direkt über die Workflow Orchestration Manager-Benutzeroberfläche entwickeln, testen und ausführen.
Erfahren Sie mehr über die Verwendung des Git-Synchronisierungsfeatures des Workflow Orchestration Managers.
Erstellen Sie einzelne Featurebranch-Umgebungen.
Sie können die Verzweigung aus Ihrem Repository auswählen, um mit dem Workflow Orchestration Manager zu synchronisieren. Mit dieser Funktion können Sie eine einzelne Airflow-Umgebung für jeden Featurebranch erstellen. Auf diese Weise können Entwickler an bestimmten Aufgaben für Datenpipelines arbeiten.
Erstellen Sie einen Pull Request.
Fahren Sie mit der Übermittlung einer Pullanforderung an die Integrationslaufzeit der Airflow-Entwicklungsumgebung fort, nachdem Sie Ihre Features in Ihrer dedizierten Airflow-Umgebung gründlich entwickelt und getestet haben.
Muster 2: Lokales Entwickeln von DAGs und Bereitstellen im Workflow Orchestration Manager
Sie können DAGs lokal entwickeln und in Workflow Orchestration Manager bereitstellen, wenn Sie Muster 2 verwenden.
Voraussetzungen
- Sie benötigen Zugriff auf ein GitHub-Repository.
- Stellen Sie sicher, dass mindestens ein einzelner Branch Ihres Code-Repositorys mit der Workflow Orchestration Manager-Instanz synchronisiert wird, um die Codeänderungen im Dienst zu sehen.
Vorteile
Sie können den Zugriff auf Azure-Ressourcen auf Administratoren beschränken.
Workflow
Richten Sie eine lokale Umgebung ein.
Richten Sie zunächst auf Ihrem Entwicklungscomputer eine lokale Entwicklungsumgebung für Apache Airflow ein. In dieser Umgebung können Sie Ihren Airflow-Code einschließlich DAGs und Aufgaben entwickeln und testen. Dieser Ansatz ermöglicht die Entwicklung von Pipelines, ohne direkten Zugriff auf Azure-Ressourcen zu benötigen.
Verwenden Sie die Git-Synchronisierungsfunktion.
Synchronisieren Sie den Branch Ihres GitHub-Repositorys mit dem Workflow Orchestration Manager.
Erfahren Sie mehr über die Verwendung des Git-Synchronisierungsfeatures des Workflow Orchestration Managers.
Verwenden Sie Workflow Orchestration Manager als Produktionsumgebung.
Nachdem Sie Datenpipelines erfolgreich in Ihrem lokalen Setup entwickelt und getestet haben, können Sie eine Pullanforderung an die Branches auslösen, die mit dem Workflow Orchestration Manager synchronisiert wird. Nachdem die Verzweigung zusammengeführt wurde, verwenden Sie Workflow Orchestration Manager-Features wie automatische Skalierung und Überwachung und Protokollierung auf Produktionsebene.
CI/CD-Beispielpipeline
Weitere Informationen finden Sie unter:
Kopieren Sie den Code eines in Workflow Orchestration Manager-Integration Runtime bereitgestellten DAG mithilfe der Git-Synchronisierungsfunktion.
from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator with DAG( dag_id="airflow-ci-cd-tutorial", start_date=datetime(2023, 8, 15), schedule="0 0 * * *", tags=["tutorial", "CI/CD"] ) as dag: # Tasks are represented as operators task1 = BashOperator(task_id="task1", bash_command="echo task1") task2 = BashOperator(task_id="task2", bash_command="echo task2") task3 = BashOperator(task_id="task3", bash_command="echo task3") task4 = BashOperator(task_id="task4", bash_command="echo task4") # Set dependencies between tasks task1 >> task2 >> task3 >> task4
Erstellen einer CI/CD-Pipeline. Sie haben zwei Optionen: Azure DevOps- oder GitHub-Aktionen.
Azure DevOps-Option: Erstellen Sie die Datei
azure-devops-ci-cd.yaml
und kopieren Sie den folgenden Code. Die Pipeline löst eine Pullanforderung oder Pushanforderung an den Entwicklungsbranch aus:trigger: - dev pr: - dev pool: vmImage: ubuntu-latest strategy: matrix: Python3.11: python.version: '3.11.5' steps: - task: UsePythonVersion@0 inputs: versionSpec: '$(python.version)' displayName: 'Use Python $(python.version)' - script: | python -m pip install --upgrade pip pip install -r requirements.txt displayName: 'Install dependencies' - script: | airflow webserver & airflow db init airflow scheduler & pytest displayName: 'Pytest'
Weitere Informationen finden Sie unter Azure Pipelines.
Option „GitHub-Aktionen“: Erstellen eines
.github/workflows
-Verzeichnisses in Ihrem GitHub-Repository.Erstelle im Verzeichnis
.github/workflows
eine Datei namensgithub-actions-ci-cd.yml
.Kopieren Sie den folgenden Code. Die Pipeline wird ausgelöst, wenn eine Pullanforderung oder Pushanforderung an den Entwicklungsbranch vorhanden ist:
name: GitHub Actions CI/CD on: pull_request: branches: - "dev" push: branches: - "dev" jobs: flake8: strategy: matrix: python-version: [3.11.5] runs-on: ubuntu-latest steps: - name: Check out source repository uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v4 with: python-version: ${{matrix.python-version}} - name: flake8 Lint uses: py-actions/flake8@v1 with: max-line-length: 120 tests: strategy: matrix: python-version: [3.11.5] runs-on: ubuntu-latest needs: [flake8] steps: - uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v4 with: python-version: ${{matrix.python-version}} - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt - name: Pytest run: | airflow webserver & airflow db init airflow scheduler & pytest tests/
Erstellen Sie im Testordner die Tests für Airflow-DAGs. Hier sind einige Beispiele:
Es müssen mindestens anfängliche Tests mit
import_errors
durchgeführt werden, um die Integrität und Richtigkeit der DAG sicherzustellen. Dieser Test stellt Folgendes sicher:Ihre DAG enthält keine Zyklizität: Zyklizität bedeutet, dass eine Aufgabe innerhalb des Workflows eine Schleife oder Zirkelabhängigkeit bildet, was zu unerwarteten und unendlichen Ausführungsschleifen führen kann.
Es gibt keine Importfehler: Importfehler können aufgrund von Problemen wie fehlenden Abhängigkeiten, falschen Modulpfaden oder Programmierfehlern auftreten.
Aufgaben sind korrekt definiert: Es wird überprüft, ob die Aufgaben innerhalb der DAG ordnungsgemäß definiert sind.
@pytest.fixture() def dagbag(): return DagBag(dag_folder="dags") def test_no_import_errors(dagbag): """ Test Dags to contain no import errors. """ assert not dagbag.import_errors
Testen Sie, ob spezifische DAG-IDs in Ihrem Featurebranch vorhanden sind, bevor Sie sie im Entwicklungsbranch zusammenführen.
def test_expected_dags(dagbag): """ Test whether expected dag Ids are present. """ expected_dag_ids = ["airflow-ci-cd-tutorial"] for dag_id in expected_dag_ids: dag = dagbag.get_dag(dag_id) assert dag is not None assert dag_id == dag.dag_id
Testen Sie, ob Ihren DAGs nur genehmigte Tags zugeordnet sind. Dieser Test hilft dabei, die Verwendung genehmigter Tags zu erzwingen.
def test_requires_approved_tag(dagbag): """ Test if DAGS contain one or more tags from list of approved tags only. """ Expected_tags = {"tutorial", "CI/CD"} dagIds = dagbag.dag_ids for id in dagIds: dag = dagbag.get_dag(id) assert dag.tags if Expected_tags: assert not set(dag.tags) - Expected_tags
Wenn Sie nun eine Pullanforderung an den Entwicklungsbranch auslösen, können Sie sehen, dass GitHub-Aktionen die CI-Pipeline auslösen, um alle Tests auszuführen.