Samordna Azure Databricks-jobb med Apache Airflow
Den här artikeln beskriver Apache Airflow-stödet för att samordna datapipelines med Azure Databricks, har instruktioner för att installera och konfigurera Airflow lokalt och innehåller ett exempel på hur du distribuerar och kör ett Azure Databricks-arbetsflöde med Airflow.
Jobborkestrering i en datapipeline
Att utveckla och distribuera en databehandlingspipeline kräver ofta att du hanterar komplexa beroenden mellan aktiviteter. En pipeline kan till exempel läsa data från en källa, rensa data, transformera rensade data och skriva transformerade data till ett mål. Du behöver också stöd för test-, schemaläggnings- och felsökningsfel när du operationaliserar en pipeline.
Arbetsflödessystem hanterar dessa utmaningar genom att du kan definiera beroenden mellan aktiviteter, schemalägga när pipelines körs och övervaka arbetsflöden. Apache Airflow är en öppen källkod lösning för att hantera och schemalägga datapipelines. Airflow representerar datapipelines som riktade acykliska grafer (DAG) för åtgärder. Du definierar ett arbetsflöde i en Python-fil och Airflow hanterar schemaläggning och körning. Med Airflow Azure Databricks-anslutningen kan du dra nytta av den optimerade Spark-motorn som erbjuds av Azure Databricks med schemaläggningsfunktionerna i Airflow.
Krav
- Integreringen mellan Airflow och Azure Databricks kräver Airflow version 2.5.0 och senare. Exemplen i den här artikeln testas med Airflow version 2.6.1.
- Airflow kräver Python 3.8, 3.9, 3.10 eller 3.11. Exemplen i den här artikeln testas med Python 3.8.
- Anvisningarna i den här artikeln för att installera och köra Airflow kräver pipenv för att skapa en virtuell Python-miljö.
Luftflödesoperatorer för Databricks
En Airflow DAG består av uppgifter, där varje uppgift kör en Airflow Operator. Airflow-operatörer som stöder integreringen till Databricks implementeras i Databricks-providern.
Databricks-providern innehåller operatorer för att köra ett antal uppgifter mot en Azure Databricks-arbetsyta, inklusive att importera data till en tabell, köra SQL-frågor och arbeta med Databricks Git-mappar.
Databricks-providern implementerar två operatorer för att utlösa jobb:
- DatabricksRunNowOperator kräver ett befintligt Azure Databricks-jobb och använder API-begäran POST /api/2.1/jobs/run-now för att utlösa en körning. Databricks rekommenderar att du använder
DatabricksRunNowOperator
eftersom det minskar dupliceringen av jobbdefinitioner och jobbkörningar som utlöses med den här operatorn finns i användargränssnittet för jobb. - DatabricksSubmitRunOperator kräver inte att ett jobb finns i Azure Databricks och använder POST /api/2.1/jobs/runs/submit API-begäran för att skicka jobbspecifikationen och utlösa en körning.
För att skapa ett nytt Azure Databricks-jobb eller återställa ett befintligt jobb implementerar Databricks-providern DatabricksCreateJobsOperator. DatabricksCreateJobsOperator
Använder API-begäranden för POST /api/2.1/jobs/create och POST /api/2.1/jobs/reset. Du kan använda DatabricksCreateJobsOperator
med DatabricksRunNowOperator
för att skapa och köra ett jobb.
Kommentar
Att använda Databricks-operatorerna för att utlösa ett jobb kräver att du anger autentiseringsuppgifter i Databricks-anslutningskonfigurationen. Se Skapa en personlig Åtkomsttoken för Azure Databricks för Airflow.
Databricks Airflow-operatorerna skriver url:en för jobbkörningssidan till Airflow-loggarna var polling_period_seconds
(standardvärdet är 30 sekunder). Mer information finns på paketsidan apache-airflow-providers-databricks på Airflows webbplats.
Installera Airflow Azure Databricks-integreringen lokalt
Använd följande steg för att installera Airflow och Databricks-providern lokalt för testning och utveckling. Andra installationsalternativ för Airflow, inklusive att skapa en produktionsinstallation, finns i installationen i Airflow-dokumentationen.
Öppna en terminal och kör följande kommandon:
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>
Ersätt <firstname>
, <lastname>
och <email>
med ditt användarnamn och din e-postadress. Du uppmanas att ange ett lösenord för administratörsanvändaren. Se till att spara det här lösenordet eftersom det krävs för att logga in på Airflow-användargränssnittet.
Det här skriptet utför följande steg:
- Skapar en katalog med namnet
airflow
och ändringar i katalogen. - Använder
pipenv
för att skapa och skapa en virtuell Python-miljö. Databricks rekommenderar att du använder en virtuell Python-miljö för att isolera paketversioner och kodberoenden till den miljön. Den här isoleringen hjälper till att minska oväntade paketversionsfel och kodberoendekollisioner. - Initierar en miljövariabel med namnet
AIRFLOW_HOME
inställd på sökvägenairflow
till katalogen. - Installerar Airflow- och Airflow Databricks-providerpaketen.
- Skapar en
airflow/dags
katalog. Airflow använderdags
katalogen för att lagra DAG-definitioner. - Initierar en SQLite-databas som Airflow använder för att spåra metadata. I en airflow-distribution i produktion konfigurerar du Airflow med en standarddatabas. SQLite-databasen och standardkonfigurationen för din Airflow-distribution initieras i
airflow
katalogen. - Skapar en administratörsanvändare för Airflow.
Dricks
Bekräfta installationen av Databricks-providern genom att köra följande kommando i installationskatalogen för Airflow:
airflow providers list
Starta Airflow-webbservern och schemaläggaren
Airflow-webbservern krävs för att visa användargränssnittet för Airflow. Starta webbservern genom att öppna en terminal i installationskatalogen för Airflow och köra följande kommandon:
Kommentar
Om Airflow-webbservern inte startar på grund av en portkonflikt kan du ändra standardporten i Airflow-konfigurationen.
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver
Schemaläggaren är airflow-komponenten som schemalägger DAG:er. Starta schemaläggaren genom att öppna en ny terminal i installationskatalogen för Airflow och köra följande kommandon:
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler
Testa Airflow-installationen
För att verifiera Airflow-installationen kan du köra något av de exempel på DAG:er som ingår i Airflow:
- Öppna i ett webbläsarfönster
http://localhost:8080/home
. Logga in på Airflow-användargränssnittet med det användarnamn och lösenord som du skapade när du installerade Airflow. Sidan Airflow DAG visas. - Klicka på paus-/paus-DAG-växlingsknappen för att ta bort en av exempel-DAG:erna
example_python_operator
, till exempel . - Utlös exemplet DAG genom att klicka på knappen Trigger DAG (Utlösa dag).
- Klicka på DAG-namnet för att visa information, inklusive körningsstatus för DAG.
Skapa en personlig Åtkomsttoken för Azure Databricks för Airflow
Airflow ansluter till Databricks med en personlig åtkomsttoken för Azure Databricks (PAT). Så här skapar du en PAT:
- I din Azure Databricks-arbetsyta klickar du på ditt Användarnamn för Azure Databricks i det övre fältet och väljer sedan Inställningar i listrutan.
- Klicka på Utvecklare.
- Bredvid Åtkomsttoken klickar du på Hantera.
- Klicka på Generera ny token.
- (Valfritt) Ange en kommentar som hjälper dig att identifiera den här token i framtiden och ändra tokens standardlivslängd på 90 dagar. Om du vill skapa en token utan livslängd (rekommenderas inte) lämnar du rutan Livslängd (dagar) tom (tom).
- Klicka på Generera.
- Kopiera den visade token till en säker plats och klicka sedan på Klar.
Kommentar
Se till att spara den kopierade token på en säker plats. Dela inte din kopierade token med andra. Om du förlorar den kopierade token kan du inte återskapa exakt samma token. I stället måste du upprepa den här proceduren för att skapa en ny token. Om du förlorar den kopierade token eller om du tror att token har komprometterats rekommenderar Databricks starkt att du omedelbart tar bort den token från arbetsytan genom att klicka på papperskorgsikonen (Återkalla) bredvid token på sidan Åtkomsttoken .
Om du inte kan skapa eller använda token på din arbetsyta kan det bero på att arbetsyteadministratören har inaktiverat token eller inte har gett dig behörighet att skapa eller använda token. Se administratören för arbetsytan eller följande avsnitt:
Kommentar
När du autentiserar med automatiserade verktyg, system, skript och appar rekommenderar Databricks att du använder personliga åtkomsttoken som tillhör tjänstens huvudnamn i stället för arbetsyteanvändare. Information om hur du skapar token för tjänstens huvudnamn finns i Hantera token för tjänstens huvudnamn.
Du kan också autentisera till Azure Databricks med hjälp av en Microsoft Entra ID-token. Se Databricks-anslutning i Airflow-dokumentationen.
Konfigurera en Azure Databricks-anslutning
Airflow-installationen innehåller en standardanslutning för Azure Databricks. Så här uppdaterar du anslutningen för att ansluta till din arbetsyta med hjälp av den personliga åtkomsttoken som du skapade ovan:
- Öppna i ett webbläsarfönster
http://localhost:8080/connection/list/
. Om du uppmanas att logga in anger du administratörens användarnamn och lösenord. - Under Conn ID letar du upp databricks_default och klickar på knappen Redigera post .
- Ersätt värdet i fältet Värd med arbetsytans instansnamn för din Azure Databricks-distribution, till exempel
https://adb-123456789.cloud.databricks.com
. - I fältet Lösenord anger du din personliga åtkomsttoken för Azure Databricks.
- Klicka på Spara.
Om du använder en Microsoft Entra-ID-token kan du läsa Databricks-anslutning i Airflow-dokumentationen för information om hur du konfigurerar autentisering.
Exempel: Skapa en Airflow DAG för att köra ett Azure Databricks-jobb
I följande exempel visas hur du skapar en enkel Airflow-distribution som körs på din lokala dator och distribuerar ett exempel på DAG för att utlösa körningar i Azure Databricks. I det här exemplet kommer du att:
- Skapa en ny notebook-fil och lägg till kod för att skriva ut en hälsning baserat på en konfigurerad parameter.
- Skapa ett Azure Databricks-jobb med en enda uppgift som kör notebook-filen.
- Konfigurera en Airflow-anslutning till din Azure Databricks-arbetsyta.
- Skapa en Airflow DAG för att utlösa notebook-jobbet. Du definierar DAG i ett Python-skript med .
DatabricksRunNowOperator
- Använd airflow-användargränssnittet för att utlösa DAG och visa körningsstatusen.
Skapa en notebook-fil
I det här exemplet används en notebook-fil som innehåller två celler:
- Den första cellen innehåller en textwidget för Databricks Utilities som definierar en variabel med namnet
greeting
inställt på standardvärdetworld
. - Den andra cellen skriver ut värdet för variabeln
greeting
som prefixet avhello
.
Så här skapar du notebook-filen:
Gå till din Azure Databricks-arbetsyta, klicka på Nytt i sidofältet och välj Notebook.
Ge anteckningsboken ett namn, till exempel Hello Airflow, och kontrollera att standardspråket är inställt på Python.
Kopiera följande Python-kod och klistra in den i den första cellen i notebook-filen.
dbutils.widgets.text("greeting", "world", "Greeting") greeting = dbutils.widgets.get("greeting")
Lägg till en ny cell under den första cellen och kopiera och klistra in följande Python-kod i den nya cellen:
print("hello {}".format(greeting))
Skapa ett jobb
Klicka på Arbetsflöden i sidofältet.
Klicka på .
Fliken Uppgifter visas i dialogrutan Skapa aktivitet.
Ersätt Lägg till ett namn för jobbet... med jobbets namn.
I fältet Aktivitetsnamn anger du ett namn för aktiviteten, till exempel hälsningsaktivitet.
I listrutan Typ väljer du Anteckningsbok.
I listrutan Källa väljer du Arbetsyta.
Klicka på textrutan Sökväg och använd filwebbläsaren för att hitta anteckningsboken du skapade, klicka på anteckningsbokens namn och klicka på Bekräfta.
Klicka på Lägg till under Parametrar. I fältet Nyckel anger du
greeting
. I fältet Värde anger duAirflow user
.Klicka på Skapa uppgift.
I panelen Jobbinformation kopierar du jobb-ID-värdet. Det här värdet krävs för att utlösa jobbet från Airflow.
Kör jobbet
Om du vill testa ditt nya jobb i användargränssnittet för Azure Databricks-jobb klickar du i det övre högra hörnet. När körningen är klar kan du verifiera utdata genom att visa jobbkörningsinformationen.
Skapa en ny Airflow DAG
Du definierar en Airflow DAG i en Python-fil. Så här skapar du en DAG för att utlösa notebook-exempeljobbet:
I en textredigerare eller IDE skapar du en ny fil med namnet
databricks_dag.py
med följande innehåll:from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow' } with DAG('databricks_dag', start_date = days_ago(2), schedule_interval = None, default_args = default_args ) as dag: opr_run_now = DatabricksRunNowOperator( task_id = 'run_now', databricks_conn_id = 'databricks_default', job_id = JOB_ID )
Ersätt
JOB_ID
med värdet för det jobb-ID som sparades tidigare.Spara filen i
airflow/dags
katalogen. Airflow läser och installerar AUTOMATISKT DAG-filer som lagras iairflow/dags/
.
Installera och verifiera DAG i Airflow
Så här utlöser och verifierar du DAG i användargränssnittet för Airflow:
- Öppna i ett webbläsarfönster
http://localhost:8080/home
. Skärmen Airflow DAG visas. - Leta upp
databricks_dag
och klicka på paus-/paus-DAG-växlingsknappen för att avpausera DAG. - Utlös DAG genom att klicka på knappen Trigger DAG (Utlösa dag).
- Klicka på en körning i kolumnen Körningar för att visa status och information om körningen.