Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Sie können eine Pipeline als Teil eines Datenverarbeitungsworkflows mit Lakeflow-Aufträgen, Apache Airflow oder Azure Data Factory ausführen.
Jobs
Sie können mehrere Aufgaben in einem Databricks-Auftrag koordinieren, um einen Datenverarbeitungsworkflow zu implementieren. Um eine Pipeline in einen Auftrag einzuschließen, verwenden Sie die Pipelineaufgabe , wenn Sie einen Auftrag erstellen. Siehe Pipelineaufgabe für Aufträge.
Apache Airflow
Apache Airflow ist eine Open Source-Lösung zum Verwalten und Planen von Datenworkflows. Airflow stellt Workflows als gerichtete azyklische Diagramme (DAGs) von Operationen dar. Sie definieren einen Workflow in einer Python-Datei, und Airflow verwaltet die Planung und Ausführung. Informationen zum Installieren und Verwenden von Airflow mit Azure Databricks finden Sie unter Orchestrate Lakeflow Jobs with Apache Airflow.
Um eine Pipeline als Teil eines Airflow-Workflows auszuführen, verwenden Sie databricksSubmitRunOperator.
Anforderungen
Folgendes ist erforderlich, um die Airflow-Unterstützung für Lakeflow Spark Declarative Pipelines zu nutzen:
- Airflow Version 2.1.0 oder höher.
- Das Databricks-Anbieterpaket , Version 2.1.0 oder höher.
Example
Im folgenden Beispiel wird eine Airflow-DAG erstellt, die ein Update für die Pipeline mit dem Bezeichner 8279d543-063c-4d63-9926-dae38e35ce8bauslöst:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('ldp',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Ersetzen Sie CONNECTION_ID durch den Bezeichner für eine Airflow-Verbindung für Ihren Arbeitsbereich.
Speichern Sie dieses Beispiel im airflow/dags Verzeichnis, und verwenden Sie die Airflow-Benutzeroberfläche, um die DAG anzuzeigen und auszulösen . Verwenden Sie die Pipeline-Benutzeroberfläche, um die Details des Pipelineupdates anzuzeigen.
Azure Data Factory
Hinweis
Lakeflow Spark Declarative Pipelines und Azure Data Factory enthalten jeweils Optionen zum Konfigurieren der Anzahl der Wiederholungen, wenn ein Fehler auftritt. Wenn Wiederholungswerte für Ihre Pipeline und für die Azure Data Factory-Aktivität konfiguriert sind, die die Pipeline aufruft, ist die Anzahl der Wiederholungsversuche der Azure Data Factory-Wiederholungswert multipliziert mit dem Wiederholungswert der Pipeline.
Wenn beispielsweise ein Pipelineupdate fehlschlägt, wiederholt Lakeflow Spark Declarative Pipelines das Update standardmäßig bis zu fünf Mal. Wenn der Azure Data Factory-Wiederholungsversuch auf drei festgelegt ist und Ihre Pipeline den Standardwert von fünf Wiederholungsversuchen verwendet, wird die fehlerhafte Pipeline möglicherweise bis zu fünfzehn Mal wiederholt. Um übermäßige Wiederholungsversuche zu vermeiden, wenn Pipelineupdates fehlschlagen, empfiehlt Databricks, die Anzahl der Wiederholungen beim Konfigurieren der Pipeline oder der Azure Data Factory-Aktivität einzuschränken, die die Pipeline aufruft.
Wenn Sie die Wiederholungskonfiguration für Ihre Pipeline ändern möchten, verwenden Sie die pipelines.numUpdateRetryAttempts Einstellung beim Konfigurieren der Pipeline.
Azure Data Factory ist ein cloudbasierter ETL-Dienst, mit dem Sie Datenintegrations- und Transformationsworkflows koordinieren können. Azure Data Factory unterstützt die direkte Ausführung von Azure Databricks-Aufgaben in einem Workflow, einschließlich Notizbüchern, JAR-Aufgaben und Python-Skripts. Sie können auch eine Pipeline in einen Workflow einschließen, indem Sie die Pipeline-REST-API aus einer Azure Data Factory-Webaktivität aufrufen. Um beispielsweise ein Pipelineupdate aus Azure Data Factory auszulösen:
Erstellen Sie eine Datenfabrik oder öffnen Sie eine vorhandene Datenfabrik.
Wenn die Erstellung abgeschlossen ist, öffnen Sie die Seite Ihrer Data Factory und klicken Sie auf die Kachel "Azure Data Factory Studio öffnen". Die Benutzeroberfläche von Azure Data Factory wird angezeigt.
Erstellen Sie eine neue Azure Data Factory-Pipeline, indem Sie " Pipeline" im Dropdownmenü " Neu " auf der Benutzeroberfläche von Azure Data Factory Studio auswählen.
In der Aktivitäten-Toolbox erweitern Sie Allgemein und ziehen Sie die Webaktivität auf den Pipeline-Canvas. Klicken Sie auf die Registerkarte "Einstellungen ", und geben Sie die folgenden Werte ein:
Hinweis
Als bewährte Sicherheitsmaßnahme empfiehlt Databricks, dass Sie bei der Authentifizierung mit automatisierten Tools, Systemen, Skripten und Apps persönliche Zugriffstoken von Dienstprinzipalen anstelle von Arbeitsbereichsbenutzern verwenden. Informationen zum Erstellen von Token für Dienstprinzipale finden Sie unter "Verwalten von Token für einen Dienstprinzipal".
URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.Ersetzen Sie
<get-workspace-instance>.Ersetzen Sie
<pipeline-id>durch den Pipelinebezeichner.Methode: Wählen Sie POST aus dem Dropdownmenü aus.
Kopfzeilen: Klicken + Neu. Geben Sie in das Textfeld
Authorizationein. Geben Sie im TextfeldBearer <personal-access-token>ein.Ersetzen Sie es
<personal-access-token>durch ein persönliches Azure Databricks-Zugriffstoken.Nachrichtentext: Um zusätzliche Anforderungsparameter zu übergeben, geben Sie ein JSON-Dokument ein, das die Parameter enthält. Um beispielsweise eine Aktualisierung zu starten und alle Daten für die Pipeline neu zu verarbeiten:
{"full_refresh": "true"}. Falls keine zusätzlichen Anforderungsparameter vorhanden sind, geben Sie leere geschweifte Klammern ein ({}).
Um die Webaktivität zu testen, klicken Sie auf der Pipelinesymbolleiste in der Data Factory-Benutzeroberfläche auf "Debuggen ". Die Ausgabe und der Status der Ausführung, einschließlich Fehlern, werden auf der Registerkarte " Ausgabe " der Azure Data Factory-Pipeline angezeigt. Verwenden Sie die Pipeline-Benutzeroberfläche, um die Details des Pipelineupdates anzuzeigen.
Tipp
Eine allgemeine Workflowanforderung besteht darin, eine Aufgabe nach Abschluss eines vorherigen Vorgangs zu starten. Da die Pipelineanforderung updates asynchron ist – die Anforderung wird nach dem Starten des Updates zurückgegeben, aber bevor das Update abgeschlossen ist – Aufgaben in Ihrer Azure Data Factory-Pipeline mit einer Abhängigkeit vom Pipelineupdate müssen warten, bis das Update abgeschlossen ist. Eine Möglichkeit, bis zum Abschluss des Updates zu warten, besteht darin, eine Bis-Aktivität nach der Webaktivität hinzuzufügen, die das Update für Lakeflow Spark Declarative Pipelines initiieren. In der Until-Aktivität:
- Fügen Sie eine Wait-Aktivität hinzu, um eine konfigurierte Anzahl von Sekunden auf den Abschluss der Aktualisierung zu warten.
- Fügen Sie eine Webaktivität nach der Warteaktivität hinzu, die die Anfrage für Pipelineaktualisierungsdetails verwendet, um den Status des Updates zu ermitteln. Das
stateFeld in der Antwort gibt den aktuellen Status der Aktualisierung zurück, einschließlich, wenn es abgeschlossen ist. - Verwenden Sie den Wert des Felds
state, um die Beendigungsbedingung für die Until-Aktivität festzulegen. Sie können auch eine Set Variable-Aktivität verwenden, um eine Pipelinevariable basierend auf demstateWert hinzuzufügen und diese Variable für die Beendigungsbedingung zu verwenden.