Samordna Azure Databricks-jobb med Apache Airflow

Den här artikeln beskriver Apache Airflow-stödet för att samordna datapipelines med Azure Databricks, har instruktioner för att installera och konfigurera Airflow lokalt och innehåller ett exempel på hur du distribuerar och kör ett Azure Databricks-arbetsflöde med Airflow.

Jobborkestrering i en datapipeline

Att utveckla och distribuera en databehandlingspipeline kräver ofta att du hanterar komplexa beroenden mellan aktiviteter. En pipeline kan till exempel läsa data från en källa, rensa data, transformera rensade data och skriva transformerade data till ett mål. Du behöver också stöd för test-, schemaläggnings- och felsökningsfel när du operationaliserar en pipeline.

Arbetsflödessystem hanterar dessa utmaningar genom att du kan definiera beroenden mellan aktiviteter, schemalägga när pipelines körs och övervaka arbetsflöden. Apache Airflow är en öppen källkod lösning för att hantera och schemalägga datapipelines. Airflow representerar datapipelines som riktade acykliska grafer (DAG) för åtgärder. Du definierar ett arbetsflöde i en Python-fil och Airflow hanterar schemaläggning och körning. Med Airflow Azure Databricks-anslutningen kan du dra nytta av den optimerade Spark-motorn som erbjuds av Azure Databricks med schemaläggningsfunktionerna i Airflow.

Behov

  • Integreringen mellan Airflow och Azure Databricks kräver Airflow version 2.5.0 och senare. Exemplen i den här artikeln testas med Airflow version 2.6.1.
  • Airflow kräver Python 3.8, 3.9, 3.10 eller 3.11. Exemplen i den här artikeln testas med Python 3.8.
  • Anvisningarna i den här artikeln för att installera och köra Airflow kräver pipenv för att skapa en virtuell Python-miljö.

Luftflödesoperatorer för Databricks

En Airflow DAG består av uppgifter, där varje uppgift kör en Airflow Operator. Airflow-operatörer som stöder integreringen till Databricks implementeras i Databricks-providern.

Databricks-providern innehåller operatorer för att köra ett antal uppgifter mot en Azure Databricks-arbetsyta, inklusive att importera data till en tabell, köra SQL-frågor och arbeta med Databricks Repos.

Databricks-providern implementerar två operatorer för att utlösa jobb:

För att skapa ett nytt Azure Databricks-jobb eller återställa ett befintligt jobb implementerar Databricks-providern DatabricksCreateJobsOperator. DatabricksCreateJobsOperator Använder API-begäranden för POST /api/2.1/jobs/create och POST /api/2.1/jobs/reset. Du kan använda DatabricksCreateJobsOperator med DatabricksRunNowOperator för att skapa och köra ett jobb.

Kommentar

Att använda Databricks-operatorerna för att utlösa ett jobb kräver att du anger autentiseringsuppgifter i Databricks-anslutningskonfigurationen. Se Skapa en personlig Åtkomsttoken för Azure Databricks för Airflow.

Databricks Airflow-operatorerna skriver url:en för jobbkörningssidan till Airflow-loggarna var polling_period_seconds (standardvärdet är 30 sekunder). Mer information finns på paketsidan apache-airflow-providers-databricks på Airflows webbplats.

Installera Airflow Azure Databricks-integreringen lokalt

Använd följande steg för att installera Airflow och Databricks-providern lokalt för testning och utveckling. Andra installationsalternativ för Airflow, inklusive att skapa en produktionsinstallation, finns i installationen i Airflow-dokumentationen.

Öppna en terminal och kör följande kommandon:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Ersätt <firstname>, <lastname>och <email> med ditt användarnamn och din e-postadress. Du uppmanas att ange ett lösenord för administratörsanvändaren. Se till att spara det här lösenordet eftersom det krävs för att logga in på Airflow-användargränssnittet.

Det här skriptet utför följande steg:

  1. Skapar en katalog med namnet airflow och ändringar i katalogen.
  2. Använder pipenv för att skapa och skapa en virtuell Python-miljö. Databricks rekommenderar att du använder en virtuell Python-miljö för att isolera paketversioner och kodberoenden till den miljön. Den här isoleringen hjälper till att minska oväntade paketversionsfel och kodberoendekollisioner.
  3. Initierar en miljövariabel med namnet AIRFLOW_HOME inställd på sökvägen airflow till katalogen.
  4. Installerar Airflow- och Airflow Databricks-providerpaketen.
  5. Skapar en airflow/dags katalog. Airflow använder dags katalogen för att lagra DAG-definitioner.
  6. Initierar en SQLite-databas som Airflow använder för att spåra metadata. I en airflow-distribution i produktion konfigurerar du Airflow med en standarddatabas. SQLite-databasen och standardkonfigurationen för din Airflow-distribution initieras i airflow katalogen.
  7. Skapar en administratörsanvändare för Airflow.

Dricks

Bekräfta installationen av Databricks-providern genom att köra följande kommando i installationskatalogen för Airflow:

airflow providers list

Starta Airflow-webbservern och schemaläggaren

Airflow-webbservern krävs för att visa användargränssnittet för Airflow. Starta webbservern genom att öppna en terminal i installationskatalogen för Airflow och köra följande kommandon:

Kommentar

Om Airflow-webbservern inte startar på grund av en portkonflikt kan du ändra standardporten i Airflow-konfigurationen.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

Schemaläggaren är airflow-komponenten som schemalägger DAG:er. Starta schemaläggaren genom att öppna en ny terminal i installationskatalogen för Airflow och köra följande kommandon:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Testa Airflow-installationen

För att verifiera Airflow-installationen kan du köra något av de exempel på DAG:er som ingår i Airflow:

  1. Öppna i ett webbläsarfönster http://localhost:8080/home. Logga in på Airflow-användargränssnittet med det användarnamn och lösenord som du skapade när du installerade Airflow. Sidan Airflow DAG visas.
  2. Klicka på paus-/paus-DAG-växlingsknappen för att ta bort en av exempel-DAG:ernaexample_python_operator, till exempel .
  3. Utlös exemplet DAG genom att klicka på knappen Trigger DAG (Utlösa dag).
  4. Klicka på DAG-namnet för att visa information, inklusive körningsstatus för DAG.

Skapa en personlig Åtkomsttoken för Azure Databricks för Airflow

Airflow ansluter till Databricks med en personlig åtkomsttoken för Azure Databricks (PAT). Så här skapar du en PAT:

  1. I din Azure Databricks-arbetsyta klickar du på ditt Användarnamn för Azure Databricks i det övre fältet och väljer sedan Användare Inställningar i listrutan.
  2. Klicka på Utvecklare.
  3. Bredvid Åtkomsttoken klickar du på Hantera.
  4. Klicka på Generera ny token.
  5. (Valfritt) Ange en kommentar som hjälper dig att identifiera den här token i framtiden och ändra tokens standardlivslängd på 90 dagar. Om du vill skapa en token utan livslängd (rekommenderas inte) lämnar du rutan Livslängd (dagar) tom (tom).
  6. Klicka på Generera.
  7. Kopiera den visade token till en säker plats och klicka sedan på Klar.

Kommentar

Se till att spara den kopierade token på en säker plats. Dela inte din kopierade token med andra. Om du förlorar den kopierade token kan du inte återskapa exakt samma token. I stället måste du upprepa den här proceduren för att skapa en ny token. Om du förlorar den kopierade token eller om du tror att token har komprometterats rekommenderar Databricks starkt att du omedelbart tar bort den token från arbetsytan genom att klicka på papperskorgsikonen (Återkalla) bredvid token på sidan Åtkomsttoken .

Om du inte kan skapa eller använda token på din arbetsyta kan det bero på att arbetsyteadministratören har inaktiverat token eller inte har gett dig behörighet att skapa eller använda token. Se din arbetsyteadministratör eller följande:

Kommentar

När du autentiserar med automatiserade verktyg, system, skript och appar rekommenderar Databricks att du använder personliga åtkomsttoken som tillhör tjänstens huvudnamn i stället för arbetsyteanvändare. Information om hur du skapar token för tjänstens huvudnamn finns i Hantera token för tjänstens huvudnamn.

Du kan också autentisera till Azure Databricks med hjälp av en Microsoft Entra ID-token (tidigare Azure Active Directory). Se Databricks Anslut ion i Airflow-dokumentationen.

Konfigurera en Azure Databricks-anslutning

Airflow-installationen innehåller en standardanslutning för Azure Databricks. Så här uppdaterar du anslutningen för att ansluta till din arbetsyta med hjälp av den personliga åtkomsttoken som du skapade ovan:

  1. Öppna i ett webbläsarfönster http://localhost:8080/connection/list/. Om du uppmanas att logga in anger du administratörens användarnamn och lösenord.
  2. Under Conn ID letar du upp databricks_default och klickar på knappen Redigera post .
  3. Ersätt värdet i fältet Värd med arbetsytans instansnamn för din Azure Databricks-distribution, till exempel https://adb-123456789.cloud.databricks.com.
  4. I fältet Lösenord anger du din personliga åtkomsttoken för Azure Databricks.
  5. Klicka på Spara.

Om du använder en Microsoft Entra-ID-token kan du läsa Mer information om hur du konfigurerar autentisering finns i Databricks Anslut ion i Airflow-dokumentationen.

Exempel: Skapa en Airflow DAG för att köra ett Azure Databricks-jobb

I följande exempel visas hur du skapar en enkel Airflow-distribution som körs på din lokala dator och distribuerar ett exempel på DAG för att utlösa körningar i Azure Databricks. I det här exemplet kommer du att:

  1. Skapa en ny notebook-fil och lägg till kod för att skriva ut en hälsning baserat på en konfigurerad parameter.
  2. Skapa ett Azure Databricks-jobb med en enda uppgift som kör notebook-filen.
  3. Konfigurera en Airflow-anslutning till din Azure Databricks-arbetsyta.
  4. Skapa en Airflow DAG för att utlösa notebook-jobbet. Du definierar DAG i ett Python-skript med .DatabricksRunNowOperator
  5. Använd airflow-användargränssnittet för att utlösa DAG och visa körningsstatusen.

Skapa en notebook-fil

I det här exemplet används en notebook-fil som innehåller två celler:

  • Den första cellen innehåller en textwidget för Databricks Utilities som definierar en variabel med namnet greeting inställt på standardvärdet world.
  • Den andra cellen skriver ut värdet för variabeln greeting som prefixet av hello.

Så här skapar du notebook-filen:

  1. Gå till din Azure Databricks-arbetsyta, klicka på New IconNytt i sidofältet och välj Notebook.

  2. Ge anteckningsboken ett namn, till exempel Hello Airflow, och kontrollera att standardspråket är inställt på Python.

  3. Kopiera följande Python-kod och klistra in den i den första cellen i notebook-filen.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Lägg till en ny cell under den första cellen och kopiera och klistra in följande Python-kod i den nya cellen:

    print("hello {}".format(greeting))
    

Skapa ett jobb

  1. Klicka på Jobs IconArbetsflöden i sidofältet.

  2. Klicka på Create Job Button.

    Fliken Uppgifter visas i dialogrutan Skapa aktivitet.

    Create first task dialog

  3. Ersätt Lägg till ett namn för jobbet... med jobbets namn.

  4. I fältet Aktivitetsnamn anger du ett namn för aktiviteten, till exempel hälsningsaktivitet.

  5. I listrutan Typ väljer du Anteckningsbok.

  6. I listrutan Källa väljer du Arbetsyta.

  7. Klicka på textrutan Sökväg och använd filwebbläsaren för att hitta anteckningsboken du skapade, klicka på anteckningsbokens namn och klicka på Bekräfta.

  8. Klicka på Lägg till under Parametrar. I fältet Nyckel anger du greeting. I fältet Värde anger du Airflow user.

  9. Klicka på Skapa uppgift.

I panelen Jobbinformation kopierar du jobb-ID-värdet. Det här värdet krävs för att utlösa jobbet från Airflow.

Kör jobbet

Om du vill testa ditt nya jobb i användargränssnittet för Azure Databricks-arbetsflöden klickar du Run Now Button i det övre högra hörnet. När körningen är klar kan du verifiera utdata genom att visa jobbkörningsinformationen.

Skapa en ny Airflow DAG

Du definierar en Airflow DAG i en Python-fil. Så här skapar du en DAG för att utlösa notebook-exempeljobbet:

  1. I en textredigerare eller IDE skapar du en ny fil med namnet databricks_dag.py med följande innehåll:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Ersätt JOB_ID med värdet för det jobb-ID som sparades tidigare.

  2. Spara filen i airflow/dags katalogen. Airflow läser och installerar AUTOMATISKT DAG-filer som lagras i airflow/dags/.

Installera och verifiera DAG i Airflow

Så här utlöser och verifierar du DAG i användargränssnittet för Airflow:

  1. Öppna i ett webbläsarfönster http://localhost:8080/home. Skärmen Airflow DAG visas.
  2. Leta upp databricks_dag och klicka på paus-/paus-DAG-växlingsknappen för att avpausera DAG.
  3. Utlös DAG genom att klicka på knappen Trigger DAG (Utlösa dag).
  4. Klicka på en körning i kolumnen Körningar för att visa status och information om körningen.