Orchestrace úloh Azure Databricks pomocí Apache Airflow
Tento článek popisuje podporu Apache Airflow pro orchestraci datových kanálů pomocí Azure Databricks, obsahuje pokyny k instalaci a konfiguraci Airflow místně a poskytuje příklad nasazení a spuštění pracovního postupu Azure Databricks pomocí Airflow.
Orchestrace úloh v datovém kanálu
Vývoj a nasazení kanálu pro zpracování dat často vyžaduje správu složitých závislostí mezi úlohami. Kanál může například číst data ze zdroje, vyčistit data, transformovat vyčištěná data a zapsat transformovaná data do cíle. Potřebujete také podporu pro testování, plánování a řešení chyb při zprovoznění kanálu.
Systémy pracovních postupů tyto problémy řeší tím, že umožňují definovat závislosti mezi úlohami, naplánovat, kdy se kanály spouštějí a monitorují pracovní postupy. Apache Airflow je opensourcové řešení pro správu a plánování datových kanálů. Airflow představuje datové kanály jako řízené acyklické grafy (DAG) operací. Pracovní postup definujete v souboru Pythonu a Airflow spravuje plánování a provádění. Připojení Airflow Azure Databricks umožňuje využívat optimalizovaný modul Spark nabízený službou Azure Databricks s funkcemi plánování Airflow.
Požadavky
- Integrace mezi Airflow a Azure Databricks vyžaduje Airflow verze 2.5.0 a novější. Příklady v tomto článku jsou testovány s Airflow verze 2.6.1.
- Airflow vyžaduje Python 3.8, 3.9, 3.10 nebo 3.11. Příklady v tomto článku jsou testovány v Pythonu 3.8.
- Pokyny v tomto článku k instalaci a spuštění Airflow vyžadují pipenv k vytvoření virtuálního prostředí Pythonu.
Operátoři airflow pro Databricks
DaG airflow se skládá z úkolů, kde každý úkol spouští operátor airflow. Operátory airflow podporující integraci do Databricks jsou implementované ve zprostředkovateli Databricks.
Poskytovatel Databricks zahrnuje operátory pro spouštění několika úloh v pracovním prostoru Azure Databricks, včetně importu dat do tabulky, spouštění dotazů SQL a práce se složkami Git Databricks.
Zprostředkovatel Databricks implementuje dva operátory pro aktivaci úloh:
- DatabricksRunNowOperator vyžaduje existující úlohu Azure Databricks a k aktivaci spuštění používá požadavek ROZHRANÍ API POST /api/2.1/jobs/run-now. Databricks doporučuje použít,
DatabricksRunNowOperator
protože snižuje duplicitu definic úloh a spuštění úloh aktivovaná tímto operátorem najdete v uživatelském rozhraní úloh. - DatabricksSubmitRunOperator nevyžaduje, aby v Azure Databricks existovala úloha a používá post /api/2.1/jobs/run/submit API požadavek k odeslání specifikace úlohy a aktivaci spuštění.
Pokud chcete vytvořit novou úlohu Azure Databricks nebo resetovat existující úlohu, poskytovatel Databricks implementuje DatabricksCreateJobsOperator. Používá DatabricksCreateJobsOperator
požadavky POST /api/2.1/jobs/create a POST /api/2.1/jobs/reset API. Pomocí této možnosti DatabricksCreateJobsOperator
DatabricksRunNowOperator
můžete vytvořit a spustit úlohu.
Poznámka:
Použití operátorů Databricks k aktivaci úlohy vyžaduje zadání přihlašovacích údajů v konfiguraci připojení Databricks. Viz Vytvoření tokenu pat Azure Databricks pro Airflow.
Operátoři Airflow Databricks zapisují adresu URL stránky spuštění úlohy do protokolů Airflow každých polling_period_seconds
(výchozí hodnota je 30 sekund). Další informace najdete na stránce balíčku apache-airflow-providers-databricks na webu Airflow.
Místní instalace integrace Azure Databricks s Airflow
Pokud chcete nainstalovat Airflow a poskytovatele Databricks místně pro účely testování a vývoje, postupujte následovně. Další možnosti instalace Airflow, včetně vytvoření produkční instalace, najdete v dokumentaci k Airflow.
Otevřete terminál a spusťte následující příkazy:
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>
Nahraďte <firstname>
a <lastname>
<email>
zadejte svoje uživatelské jméno a e-mail. Zobrazí se výzva k zadání hesla pro uživatele správce. Nezapomeňte toto heslo uložit, protože je nutné se přihlásit k uživatelskému rozhraní Airflow.
Tento skript provede následující kroky:
- Vytvoří adresář pojmenovaný
airflow
a změní se do daného adresáře. - Používá
pipenv
se k vytvoření a vytvoření virtuálního prostředí Pythonu. Databricks doporučuje používat virtuální prostředí Pythonu k izolaci verzí balíčků a závislostí kódu do daného prostředí. Tato izolace pomáhá snížit neočekávané neshody verzí balíčků a kolize závislostí kódu. - Inicializuje proměnnou prostředí s názvem
AIRFLOW_HOME
nastavenou na cestu k adresářiairflow
. - Nainstaluje Airflow a balíčky poskytovatele Databricks Airflow.
airflow/dags
Vytvoří adresář. Airflow používádags
adresář k ukládání definic DAG.- Inicializuje databázi SQLite, kterou Airflow používá ke sledování metadat. V produkčním nasazení Airflow byste nakonfigurovali Airflow se standardní databází. Databáze SQLite a výchozí konfigurace pro nasazení Airflow se inicializují v
airflow
adresáři. - Vytvoří uživatele správce pro Airflow.
Tip
Pokud chcete potvrdit instalaci poskytovatele Databricks, spusťte v instalačním adresáři Airflow následující příkaz:
airflow providers list
Spuštění webového serveru a plánovače Airflow
Webový server Airflow se vyžaduje k zobrazení uživatelského rozhraní Airflow. Pokud chcete spustit webový server, otevřete terminál v instalačním adresáři Airflow a spusťte následující příkazy:
Poznámka:
Pokud se webovému serveru Airflow nepodaří spustit kvůli konfliktu portů, můžete změnit výchozí port v konfiguraci Airflow.
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver
Plánovač je komponenta Airflow, která plánuje DAG. Pokud chcete spustit plánovač, otevřete nový terminál v instalačním adresáři Airflow a spusťte následující příkazy:
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler
Otestování instalace Airflow
Pokud chcete ověřit instalaci Airflow, můžete spustit jednu z ukázkových DAG, které jsou součástí Airflow:
- V okně prohlížeče otevřete
http://localhost:8080/home
. Přihlaste se k uživatelskému rozhraní Airflow pomocí uživatelského jména a hesla, které jste vytvořili při instalaci Airflow. Zobrazí se stránka DAG airflow. - Kliknutím na přepínač Pozastavit/Zrušit zapůjení daG odblokujte některou z ukázkových dag, například .
example_python_operator
- Spusťte ukázku DAG kliknutím na tlačítko DaG triggeru.
- Kliknutím na název DAG zobrazíte podrobnosti, včetně stavu spuštění dag.
Vytvoření tokenu pat pro Azure Databricks pro Airflow
Airflow se připojuje k Databricks pomocí tokenu PAT (Personal Access Token) Azure Databricks. Vytvoření patu:
- V pracovním prostoru Azure Databricks klikněte na své uživatelské jméno Azure Databricks v horním panelu a pak v rozevíracím seznamu vyberte Nastavení .
- Klikněte na Vývojář.
- Vedle přístupových tokenů klikněte na Spravovat.
- Klikněte na Vygenerovat nový token.
- (Volitelné) Zadejte komentář, který vám pomůže identifikovat tento token v budoucnu a změnit výchozí životnost tokenu na 90 dnů. Pokud chcete vytvořit token bez životnosti (nedoporučuje se), nechte pole Životnost (dny) prázdné (prázdné).
- Klikněte na Vygenerovat.
- Zkopírujte zobrazený token do zabezpečeného umístění a klikněte na tlačítko Hotovo.
Poznámka:
Nezapomeňte zkopírovaný token uložit do zabezpečeného umístění. Nesdílejte svůj zkopírovaný token s ostatními. Pokud ztratíte zkopírovaný token, nemůžete tento úplně stejný token znovu vygenerovat. Místo toho musíte tento postup zopakovat, abyste vytvořili nový token. Pokud ztratíte zkopírovaný token nebo se domníváte, že došlo k ohrožení zabezpečení tokenu, databricks důrazně doporučuje tento token okamžitě odstranit z pracovního prostoru kliknutím na ikonu koše (Odvolat) vedle tokenu na stránce Přístupové tokeny .
Pokud v pracovním prostoru nemůžete vytvářet nebo používat tokeny, může to být proto, že správce pracovního prostoru zakázal tokeny nebo vám neudělil oprávnění k vytváření nebo používání tokenů. Projděte si správce pracovního prostoru nebo následující témata:
Poznámka:
Osvědčeným postupem při ověřování pomocí automatizovaných nástrojů, systémů, skriptů a aplikací doporučuje Databricks místo uživatelů pracovního prostoru používat tokeny patního přístupu, které patří instančním objektům . Pokud chcete vytvořit tokeny pro instanční objekty, přečtěte si téma Správa tokenů instančního objektu.
Můžete se také ověřit v Azure Databricks pomocí tokenu ID Microsoft Entra. Viz dokumentace k Připojení Databricks v dokumentaci k Airflow.
Konfigurace připojení Azure Databricks
Instalace Airflow obsahuje výchozí připojení pro Azure Databricks. Pokud chcete aktualizovat připojení pro připojení k pracovnímu prostoru pomocí tokenu pat, který jste vytvořili výše:
- V okně prohlížeče otevřete
http://localhost:8080/connection/list/
. Pokud se zobrazí výzva k přihlášení, zadejte uživatelské jméno a heslo správce. - V části Conn ID vyhledejte databricks_default a klikněte na tlačítko Upravit záznam.
- Hodnotu v poli Hostitel nahraďte názvem instance pracovního prostoru vašeho nasazení Azure Databricks,
https://adb-123456789.cloud.databricks.com
například . - Do pole Heslo zadejte osobní přístupový token Azure Databricks.
- Klikněte na Uložit.
Pokud používáte token Microsoft Entra ID, informace o konfiguraci ověřování najdete v dokumentaci ke službě Airflow v tématu Připojení Databricks.
Příklad: Vytvoření DAG Airflow pro spuštění úlohy Azure Databricks
Následující příklad ukazuje, jak vytvořit jednoduché nasazení Airflow, které běží na místním počítači, a nasadí ukázkovou sadu DAG pro aktivaci spuštění v Azure Databricks. V tomto příkladu:
- Vytvořte nový poznámkový blok a přidejte kód pro tisk pozdravu na základě nakonfigurovaného parametru.
- Vytvořte úlohu Azure Databricks s jednou úlohou, která spustí poznámkový blok.
- Nakonfigurujte připojení Airflow k pracovnímu prostoru Azure Databricks.
- Vytvořte DAG Airflow, který aktivuje úlohu poznámkového bloku. DaG definujete ve skriptu Pythonu pomocí
DatabricksRunNowOperator
. - Pomocí uživatelského rozhraní Airflow aktivujte DAG a zobrazte stav spuštění.
Vytvoření poznámkového bloku
V tomto příkladu se používá poznámkový blok obsahující dvě buňky:
- První buňka obsahuje textový widget nástrojů Databricks definující proměnnou s názvem
greeting
nastavenou na výchozí hodnotuworld
. - Druhá buňka vytiskne hodnotu
greeting
proměnné s předponouhello
.
Vytvoření poznámkového bloku:
Přejděte do pracovního prostoru Azure Databricks, na bočním panelu klikněte na Nový a vyberte Poznámkový blok.
Pojmenujte poznámkový blok, například Hello Airflow, a ujistěte se, že je výchozí jazyk nastavený na Python.
Zkopírujte následující kód Pythonu a vložte ho do první buňky poznámkového bloku.
dbutils.widgets.text("greeting", "world", "Greeting") greeting = dbutils.widgets.get("greeting")
Přidejte novou buňku pod první buňku a zkopírujte do nové buňky následující kód Pythonu:
print("hello {}".format(greeting))
Vytvoření úlohy
Na bočním panelu klikněte na Pracovní postupy.
Klikněte na .
Zobrazí se karta Úkoly s dialogovým oknem vytvořit úkol.
Nahraďte název vaší úlohy... názvem vaší úlohy.
Do pole Název úkolu zadejte název úkolu, například pozdrav-úkol.
V rozevírací nabídce Typ vyberte Poznámkový blok.
V rozevírací nabídce Zdroj vyberte Pracovní prostor.
Klikněte na textové pole Cesta a v prohlížeči souborů vyhledejte poznámkový blok, který jste vytvořili, klikněte na název poznámkového bloku a klikněte na Potvrdit.
Klikněte na Přidat v části Parametry. Do pole Klíč zadejte
greeting
. Do pole Hodnota zadejteAirflow user
.Klikněte na Vytvořit úkol.
Na panelu Podrobnosti úlohy zkopírujte hodnotu ID úlohy. Tato hodnota se vyžaduje k aktivaci úlohy z Airflow.
Spuštění úlohy
Pokud chcete novou úlohu otestovat v uživatelském rozhraní úloh Azure Databricks, klikněte v pravém horním rohu. Po dokončení spuštění můžete výstup ověřit zobrazením podrobností o spuštění úlohy.
Vytvoření nového DAG Airflow
V souboru Pythonu definujete DAG Airflow. Vytvoření DAG pro aktivaci ukázkové úlohy poznámkového bloku:
V textovém editoru nebo integrovaném vývojovém prostředí vytvořte nový soubor s názvem
databricks_dag.py
s následujícím obsahem:from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow' } with DAG('databricks_dag', start_date = days_ago(2), schedule_interval = None, default_args = default_args ) as dag: opr_run_now = DatabricksRunNowOperator( task_id = 'run_now', databricks_conn_id = 'databricks_default', job_id = JOB_ID )
Nahraďte
JOB_ID
hodnotou ID úlohy, které jste uložili dříve.Uložte soubor do
airflow/dags
adresáře. Airflow automaticky čte a instaluje soubory DAG uložené vairflow/dags/
.
Instalace a ověření DAG v Airflow
Aktivace a ověření DAG v uživatelském rozhraní Airflow:
- V okně prohlížeče otevřete
http://localhost:8080/home
. Zobrazí se obrazovka DAG Airflow. - Vyhledejte
databricks_dag
a klikněte na přepínač Pozastavit nebo zrušit pozastavení DAG a zrušte tak pozastavení DAG . - Spusťte DAG kliknutím na tlačítko DaG triggeru.
- Kliknutím na spuštění ve sloupci Spuštění zobrazíte stav a podrobnosti spuštění.