Azure Databricks-feladatok vezénylése az Apache Airflow használatával
Ez a cikk azt ismerteti, hogy az Apache Airflow támogatja-e az adatfolyamok Azure Databricks-szel való vezénylését, útmutatást tartalmaz az Airflow helyi telepítéséhez és konfigurálásához, és példát mutat egy Azure Databricks-munkafolyamat üzembe helyezésére és futtatására az Airflow használatával.
Feladat vezénylése egy adatfolyamban
Az adatfeldolgozási folyamatok fejlesztéséhez és üzembe helyezéséhez gyakran összetett függőségek kezelése szükséges a tevékenységek között. Előfordulhat például, hogy egy folyamat adatokat olvas egy forrásból, megtisztítja az adatokat, átalakítja a megtisztított adatokat, és megírja az átalakított adatokat egy célhelyre. A folyamat üzembe helyezésekor a tesztelési, ütemezési és hibaelhárítási hibákhoz is támogatásra van szüksége.
A munkafolyamat-rendszerek úgy oldják meg ezeket a kihívásokat, hogy lehetővé teszik a tevékenységek közötti függőségek meghatározását, a folyamatok futásának ütemezését és a munkafolyamatok monitorozását. Az Apache Airflow egy nyílt forráskód megoldás az adatfolyamok kezelésére és ütemezésére. Az Airflow az adatfolyamokat a műveletek irányított aciklikus gráfjaiként (DAG-k) jelöli. Egy munkafolyamatot definiál egy Python-fájlban, és az Airflow kezeli az ütemezést és a végrehajtást. Az Airflow Azure Databricks-kapcsolat lehetővé teszi az Azure Databricks által kínált optimalizált Spark-motor előnyeit az Airflow ütemezési funkcióival.
Követelmények
- Az Airflow és az Azure Databricks közötti integrációhoz az Airflow 2.5.0-s és újabb verziója szükséges. A cikkben szereplő példákat az Airflow 2.6.1-es verziójával teszteljük.
- Az Airflow használatához Python 3.8, 3.9, 3.10 vagy 3.11 szükséges. A cikkben szereplő példákat a Python 3.8 teszteli.
- Az Airflow telepítéséhez és futtatásához a jelen cikkben szereplő utasítások megkövetelik , hogy a pipenv létrehozhasson egy Python virtuális környezetet.
Airflow operátorok a Databrickshez
Az Airflow DAG feladatokból áll, ahol minden tevékenység egy Airflow-operátort futtat. A Databricksbe való integrációt támogató Airflow-operátorok a Databricks-szolgáltatóban vannak implementálva.
A Databricks-szolgáltató operátorokat tartalmaz, amelyek számos feladatot futtatnak egy Azure Databricks-munkaterületen, beleértve az adatok táblázatba való importálását, AZ SQL-lekérdezések futtatását és a Databricks Git-mappák használatát.
A Databricks-szolgáltató két operátort implementál a feladatok aktiválásához:
- A DatabricksRunNowOperator egy meglévő Azure Databricks-feladatot igényel, és a POST /api/2.1/jobs/run-now API-kérést használja a futtatás elindításához. A Databricks azért javasolja a használatát, mert csökkenti a
DatabricksRunNowOperator
feladatdefiníciók duplikálását, és az operátorral aktivált feladatfuttatások megtalálhatók a Feladatok felhasználói felületén. - A DatabricksSubmitRunOperator nem igényel feladatot az Azure Databricksben, és a POST /api/2.1/jobs/run/submit API-kérést használja a feladat specifikációjának elküldéséhez és a futtatás aktiválásához.
Új Azure Databricks-feladat létrehozásához vagy meglévő feladat alaphelyzetbe állításához a Databricks-szolgáltató implementálja a DatabricksCreateJobsOperatort. A DatabricksCreateJobsOperator
post /api/2.1/jobs/create és a POST /api/2.1/jobs/reset API-kéréseket használja. A használatával DatabricksRunNowOperator
létrehozhat és futtathat DatabricksCreateJobsOperator
egy feladatot.
Feljegyzés
A Databricks operátorainak a feladat aktiválásához hitelesítő adatokat kell megadni a Databricks kapcsolatkonfigurációjában. Lásd: Azure Databricks személyes hozzáférési jogkivonat létrehozása az Airflow-hoz.
A Databricks Airflow operátorok minden egyes polling_period_seconds
adásban megírják a feladatfuttatási oldal URL-címét az Airflow-naplókba (az alapértelmezett érték 30 másodperc). További információ: apache-airflow-providers-databricks csomag oldal az Airflow webhelyén.
Az Airflow Azure Databricks-integráció helyi telepítése
Az Airflow és a Databricks-szolgáltató helyi telepítéséhez a teszteléshez és fejlesztéshez kövesse az alábbi lépéseket. Az Airflow egyéb telepítési lehetőségeiről, beleértve az éles telepítés létrehozását is, tekintse meg a telepítést az Airflow dokumentációjában.
Nyisson meg egy terminált, és futtassa a következő parancsokat:
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>
Cserélje le <firstname>
a felhasználónevet <lastname>
és <email>
az e-mail-címet. A rendszer kérni fogja, hogy adjon meg jelszót a rendszergazda felhasználónak. Mentse ezt a jelszót, mert be kell jelentkeznie az Airflow felhasználói felületére.
Ez a szkript a következő lépéseket hajtja végre:
- Létrehoz egy elnevezett
airflow
könyvtárat, és módosítja az adott könyvtárat. - Python-alapú virtuális környezetek létrehozására és létrehozására használható
pipenv
. A Databricks egy Python virtuális környezet használatát javasolja a csomagverziók és a környezet kódfüggőségeinek elkülönítéséhez. Ez az elkülönítés segít csökkenteni a csomagverziók váratlan eltéréseit és a kódfüggőség ütközéseit. - Inicializál egy olyan környezeti változót, amely
AIRFLOW_HOME
aairflow
könyvtár elérési útjára van állítva. - Telepíti az Airflow-t és az Airflow Databricks-szolgáltató csomagokat.
- Létrehoz egy könyvtárat
airflow/dags
. Az Airflow a címtár használatával tárolja adags
DAG-definíciókat. - Inicializál egy SQLite-adatbázist, amelyet az Airflow a metaadatok nyomon követésére használ. Éles Airflow-üzemelő példányban az Airflow-t szabványos adatbázissal konfigurálná. Az SQLite-adatbázis és az Airflow-telepítés alapértelmezett konfigurációja inicializálva van a
airflow
címtárban. - Létrehoz egy rendszergazdai felhasználót az Airflow-hoz.
Tipp.
A Databricks-szolgáltató telepítésének megerősítéséhez futtassa a következő parancsot az Airflow telepítési könyvtárában:
airflow providers list
Az Airflow webkiszolgáló és ütemező indítása
Az Airflow-webkiszolgáló szükséges az Airflow felhasználói felületének megtekintéséhez. A webkiszolgáló elindításához nyisson meg egy terminált az Airflow telepítési könyvtárában, és futtassa a következő parancsokat:
Feljegyzés
Ha az Airflow webkiszolgáló egy portütközés miatt nem indul el, módosíthatja az alapértelmezett portot az Airflow konfigurációjában.
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver
Az ütemező a DAG-ket ütemező Airflow-összetevő. Az ütemező elindításához nyisson meg egy új terminált az Airflow telepítési könyvtárában, és futtassa a következő parancsokat:
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler
Az Airflow telepítésének tesztelése
Az Airflow telepítésének ellenőrzéséhez futtassa az Airflow-hoz tartozó példa DAG-k egyikét:
- Nyissa meg
http://localhost:8080/home
a böngészőablakban. Jelentkezzen be az Airflow felhasználói felületére az Airflow telepítésekor létrehozott felhasználónévvel és jelszóval. Megjelenik az Airflow DAG-lap . - Kattintson a Szüneteltetés/A DAG feloldása váltógombra az egyik példa DAG-k( például a
example_python_operator
. - A DAG példa aktiválásához kattintson a TRIGGER DAG gombra.
- A DAG nevére kattintva megtekintheti a részleteket, beleértve a DAG futtatási állapotát is.
Azure Databricks személyes hozzáférési jogkivonat létrehozása az Airflow-hoz
Az Airflow egy Azure Databricks személyes hozzáférési jogkivonat (PAT) használatával csatlakozik a Databrickshez. PAT létrehozása:
- Az Azure Databricks-munkaterületen kattintson az Azure Databricks-felhasználónevére a felső sávon, majd válassza a legördülő menü Beállítások elemét .
- Kattintson a Fejlesztőeszközök elemre.
- Az Access-jogkivonatok mellett kattintson a Kezelés gombra.
- Kattintson az Új jogkivonat létrehozása elemre.
- (Nem kötelező) Írjon be egy megjegyzést, amely segít azonosítani a jogkivonatot a jövőben, és módosíthatja a jogkivonat alapértelmezett 90 napos élettartamát. Élettartam nélküli (nem ajánlott) jogkivonat létrehozásához hagyja üresen az Élettartam (nap) mezőt (üres).
- Kattintson a Létrehozás lehetőségre.
- Másolja a megjelenített jogkivonatot egy biztonságos helyre, majd kattintson a Kész gombra.
Feljegyzés
Ügyeljen arra, hogy a másolt jogkivonatot biztonságos helyre mentse. Ne ossza meg másokkal a másolt jogkivonatot. Ha elveszíti a másolt jogkivonatot, nem tudja pontosan ugyanazt a jogkivonatot újragenerálni. Ehelyett meg kell ismételnie ezt az eljárást egy új jogkivonat létrehozásához. Ha elveszíti a másolt jogkivonatot, vagy úgy véli, hogy a jogkivonat sérült, a Databricks határozottan javasolja, hogy azonnal törölje a jogkivonatot a munkaterületről az Access-jogkivonatok lapon a jogkivonat melletti kuka (Visszavonás) ikonra kattintva.
Ha nem tud jogkivonatokat létrehozni vagy használni a munkaterületen, ennek az lehet az oka, hogy a munkaterület rendszergazdája letiltotta a jogkivonatokat, vagy nem adott engedélyt a jogkivonatok létrehozására vagy használatára. Tekintse meg a munkaterület rendszergazdáját vagy a következő témaköröket:
Feljegyzés
Ajánlott biztonsági eljárásként, ha automatizált eszközökkel, rendszerekkel, szkriptekkel és alkalmazásokkal hitelesít, a Databricks azt javasolja, hogy munkaterület-felhasználók helyett a szolgáltatásnevekhez tartozó személyes hozzáférési jogkivonatokat használja. A szolgáltatásnevek jogkivonatainak létrehozásáról a szolgáltatásnév jogkivonatainak kezelése című témakörben olvashat.
Az Azure Databricksben microsoft Entra ID-jogkivonattal is hitelesíthet. Lásd a Databricks-kapcsolatot az Airflow dokumentációjában.
Azure Databricks-kapcsolat konfigurálása
Az Airflow telepítése alapértelmezett kapcsolatot tartalmaz az Azure Databrickshez. A kapcsolat frissítése a munkaterülethez való csatlakozáshoz a fent létrehozott személyes hozzáférési jogkivonat használatával:
- Nyissa meg
http://localhost:8080/connection/list/
a böngészőablakban. Ha a rendszer kéri a bejelentkezést, adja meg a rendszergazdai felhasználónevét és jelszavát. - A Conn-azonosító alatt keresse meg a databricks_default, majd kattintson a Rekord szerkesztése gombra.
- Cserélje le a Gazdagép mező értékét az Azure Databricks-üzembe helyezés munkaterület-példányának nevére ,
https://adb-123456789.cloud.databricks.com
például. - A Jelszó mezőbe írja be az Azure Databricks személyes hozzáférési jogkivonatát.
- Kattintson a Mentés gombra.
Ha Microsoft Entra-azonosító jogkivonatot használ, a hitelesítés konfigurálásával kapcsolatos információkért tekintse meg a Databricks-kapcsolatot az Airflow dokumentációjában.
Példa: Airflow DAG létrehozása Azure Databricks-feladat futtatásához
Az alábbi példa bemutatja, hogyan hozhat létre egy egyszerű Airflow-üzembe helyezést, amely a helyi gépen fut, és üzembe helyez egy példa DAG-t az Azure Databricksben való futtatások aktiválásához. Ebben a példában a következőt fogja:
- Hozzon létre egy új jegyzetfüzetet, és adjon hozzá kódot a megszólítás nyomtatásához egy konfigurált paraméter alapján.
- Hozzon létre egy Azure Databricks-feladatot egyetlen, a jegyzetfüzetet futtató feladattal.
- Airflow-kapcsolat konfigurálása az Azure Databricks-munkaterülethez.
- Hozzon létre egy Airflow DAG-t a jegyzetfüzet-feladat aktiválásához. A DAG definiálása Python-szkriptben a következő használatával
DatabricksRunNowOperator
történik: . - Az Airflow felhasználói felületén aktiválhatja a DAG-t, és megtekintheti a futtatás állapotát.
Jegyzetfüzet létrehozása
Ez a példa egy két cellát tartalmazó jegyzetfüzetet használ:
- Az első cella tartalmaz egy Databricks Utilities szöveg widgetet, amely egy alapértelmezett értékre
world
beállított változótgreeting
határoz meg. - A második cella a változó előtagjának
greeting
értékét nyomtatjahello
ki.
A jegyzetfüzet létrehozása:
Nyissa meg az Azure Databricks-munkaterületet, kattintson az Új gombra az oldalsávon, és válassza a Jegyzetfüzet lehetőséget.
Adjon nevet a jegyzetfüzetnek, például a Hello Airflow-nak, és győződjön meg arról, hogy az alapértelmezett nyelv Pythonra van állítva.
Másolja ki a következő Python-kódot, és illessze be a jegyzetfüzet első cellájába.
dbutils.widgets.text("greeting", "world", "Greeting") greeting = dbutils.widgets.get("greeting")
Vegyen fel egy új cellát az első cella alá, és másolja és illessze be a következő Python-kódot az új cellába:
print("hello {}".format(greeting))
Feladat létrehozása
Kattintson a Munkafolyamatok elemre az oldalsávon.
Kattintson a parancsra.
A Feladatok lap a Feladat létrehozása párbeszédpanelen jelenik meg.
Cserélje le a feladat nevét a feladat nevére.
A Tevékenységnév mezőben adja meg a tevékenység nevét, például a megszólítást.
A Típus legördülő menüben válassza a Jegyzetfüzet lehetőséget.
A Forrás legördülő menüben válassza a Munkaterület lehetőséget.
Kattintson az Elérési út szövegmezőre, és a fájlböngészővel keresse meg a létrehozott jegyzetfüzetet, kattintson a jegyzetfüzet nevére, majd a Megerősítés gombra.
Kattintson a Hozzáadás gombra a Paraméterek területen. A Kulcs mezőbe írja be a következőt
greeting
: Az Érték mezőbe írja be a következőtAirflow user
:Kattintson a Feladat létrehozása gombra.
A Feladat részletei panelen másolja ki a feladatazonosító értékét. Ez az érték szükséges a feladat Airflow-ból való aktiválásához.
A feladat futtatása
Ha az új feladatot az Azure Databricks-feladatok felhasználói felületén szeretné tesztelni, kattintson a jobb felső sarokban. A futtatás befejezése után a feladatfuttatás részleteinek megtekintésével ellenőrizheti a kimenetet.
Új Airflow DAG létrehozása
Egy Airflow DAG-t definiál egy Python-fájlban. Dag létrehozása a példajegyzetfüzet-feladat aktiválásához:
Szövegszerkesztőben vagy IDE-ben hozzon létre egy új fájlt
databricks_dag.py
a következő tartalommal:from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow' } with DAG('databricks_dag', start_date = days_ago(2), schedule_interval = None, default_args = default_args ) as dag: opr_run_now = DatabricksRunNowOperator( task_id = 'run_now', databricks_conn_id = 'databricks_default', job_id = JOB_ID )
Cserélje le
JOB_ID
a korábban mentett feladatazonosító értékét.Mentse a fájlt a
airflow/dags
könyvtárban. Az Airflow automatikusan beolvassa és telepíti a fájlbanairflow/dags/
tárolt DAG-fájlokat.
A DAG telepítése és ellenőrzése az Airflow-ban
A DAG aktiválása és ellenőrzése az Airflow felhasználói felületén:
- Nyissa meg
http://localhost:8080/home
a böngészőablakban. Megjelenik az Airflow DAGs képernyő. - Keresse meg
databricks_dag
és kattintson a Dag szüneteltetése/feloldása váltógombra a DAG feloldásához. - A DAG aktiválásához kattintson a TRIGGER DAG gombra.
- Kattintson egy futtatásra a Futtatások oszlopban a futtatás állapotának és részleteinek megtekintéséhez.