Végpontok közötti adatfolyam létrehozása a Databricksben
Ez a cikk bemutatja, hogyan hozhat létre és helyezhet üzembe egy végpontok közötti adatfeldolgozási folyamatot, beleértve a nyers adatok betöltését, az adatok átalakítását és a feldolgozott adatok elemzéseinek futtatását.
Feljegyzés
Bár ez a cikk bemutatja, hogyan hozhat létre teljes adatfolyamot a Databricks-jegyzetfüzetek és egy Azure Databricks-feladat használatával a munkafolyamatok vezényléséhez, a Databricks a Delta Live Tables deklaratív felületét javasolja megbízható, karbantartható és tesztelhető adatfeldolgozási folyamatok létrehozásához.
Mi az az adatfolyam?
Az adatfolyam végrehajtja az adatok forrásrendszerekből való áthelyezéséhez, az adatok követelmények alapján történő átalakításához és az adatok célrendszerben való tárolásához szükséges lépéseket. Az adatfolyam tartalmazza azokat a folyamatokat, amelyek ahhoz szükségesek, hogy a nyers adatokat előkészített adatokká alakítják, amelyeket a felhasználók felhasználhatnak. Egy adatfolyam például előkészítheti az adatokat, hogy az adatelemzők és adattudósok elemzéssel és jelentéskészítéssel nyerhessenek ki értéket az adatokból.
Az adatfolyamatok gyakori példája a kinyerési, átalakítási és betöltési (ETL-) munkafolyamat. Az ETL-feldolgozás során az adatok a forrásrendszerekből lesznek betöltve, és egy átmeneti területre kerülnek, a követelmények (az adatminőség biztosítása, a rekordok deduplikálása stb.) alapján alakulnak át, majd egy célrendszerbe, például adattárházba vagy data lake-be írnak.
Az adatfolyam lépései
Az Azure Databricksen futó adatfolyamok létrehozásának megkezdéséhez a cikkben szereplő példa egy adatfeldolgozási munkafolyamat létrehozását ismerteti:
- Az Azure Databricks funkcióival felfedezhet egy nyers adathalmazt.
- Hozzon létre egy Databricks-jegyzetfüzetet a nyers forrásadatok betöltéséhez és a nyers adatok céltáblába való írásához.
- Hozzon létre egy Databricks-jegyzetfüzetet a nyers forrásadatok átalakításához és az átalakított adatok céltáblába írásához.
- Hozzon létre egy Databricks-jegyzetfüzetet az átalakított adatok lekérdezéséhez.
- Automatizálja az adatfolyamot egy Azure Databricks-feladattal.
Követelmények
- Bejelentkezett az Azure Databricksbe és a Adattudomány > Mérnöki munkaterületre.
- Jogosult fürt létrehozására vagy fürthöz való hozzáférésre.
- (Nem kötelező) Ha táblákat szeretne közzétenni a Unity Catalogban, létre kell hoznia egy katalógust és sémát a Unity Catalogban.
Példa: Millió dal adatkészlet
Az ebben a példában használt adatkészlet a Million Song Dataset egy részhalmaza, amely a kortárs zeneszámok funkcióinak és metaadatainak gyűjteménye. Ez az adatkészlet az Azure Databricks-munkaterületen található mintaadatkészletekben érhető el.
1. lépés: Fürt létrehozása
A példában szereplő adatfeldolgozás és elemzés elvégzéséhez hozzon létre egy fürtöt, amely biztosítja a parancsok futtatásához szükséges számítási erőforrásokat.
Feljegyzés
Mivel ez a példa egy DBFS-ben tárolt mintaadatkészletet használ, és azt javasolja, hogy a táblák megmaradjanak a Unity Catalogban, létre kell hoznia egy fürtöt, amely egyfelhasználós hozzáférési móddal van konfigurálva. Az egyfelhasználós hozzáférési mód teljes hozzáférést biztosít a DBFS-hez, ugyanakkor lehetővé teszi a Unity Katalógushoz való hozzáférést is. Tekintse meg a DBFS és a Unity Katalógus ajánlott eljárásait.
- Kattintson a Számítás gombra az oldalsávon.
- A Számítás lapon kattintson a Fürt létrehozása elemre.
- Az Új fürt lapon adja meg a fürt egyedi nevét.
- Access módban válassza az Egy felhasználó lehetőséget.
- Az egyfelhasználós vagy szolgáltatásnév-hozzáférésben válassza ki a felhasználónevét.
- Hagyja meg a fennmaradó értékeket az alapértelmezett állapotban, és kattintson a Fürt létrehozása gombra.
A Databricks-fürtökkel kapcsolatos további információkért lásd: Compute.
2. lépés: A forrásadatok megismerése
Ha tudni szeretné, hogyan használhatja az Azure Databricks-felületet a nyers forrásadatok megismerésére, tekintse meg az adatfolyam forrásadatait. Ha közvetlenül az adatok betöltésére és előkészítésére szeretne lépni, folytassa a 3. lépésben: A nyers adatok betöltése.
3. lépés: A nyers adatok betöltése
Ebben a lépésben betölti a nyers adatokat egy táblába, hogy azokat további feldolgozás céljából elérhetővé tegye. A Databricks platform adategységeinek( például táblák) kezeléséhez a Databricks a Unity Catalog szolgáltatást javasolja. Ha azonban nincs engedélye arra, hogy létrehozza a szükséges katalógust és sémát a táblák Unity Catalogban való közzétételéhez, akkor is elvégezheti a következő lépéseket a táblák Hive-metaadattárban való közzétételével.
Az adatok betöltéséhez a Databricks az Automatikus betöltő használatát javasolja. Az automatikus betöltő automatikusan észleli és feldolgozza az új fájlokat, amikor megérkeztek a felhőobjektum-tárolóba.
Az Automatikus betöltőt úgy konfigurálhatja, hogy automatikusan észlelje a betöltött adatok sémáját, így anélkül inicializálhatja a táblákat, hogy explicit módon deklarálja az adatsémát, és új oszlopok bevezetésekor fejleszti a táblázatsémát. Ez szükségtelenné teszi a sémamódosítások manuális nyomon követését és alkalmazását az idő függvényében. A Databricks sémakövetkeztetést javasol az Automatikus betöltő használatakor. Az adatfeltárási lépésben látható módon azonban a dalok adatai nem tartalmaznak fejlécadatokat. Mivel a fejléc nem az adatokkal van tárolva, explicit módon kell definiálnia a sémát, ahogyan az a következő példában is látható.
Az oldalsávon kattintson az Új gombra, és válassza a Jegyzetfüzet lehetőséget a menüből. Megjelenik a Jegyzetfüzet létrehozása párbeszédpanel.
Adja meg például a jegyzetfüzet
Ingest songs data
nevét. Alapértelmezés szerint:- A Python a kiválasztott nyelv.
- A jegyzetfüzet az utolsó használt fürthöz van csatolva. Ebben az esetben az 1. lépésben létrehozott fürt: Fürt létrehozása.
Írja be a következőt a jegyzetfüzet első cellájába:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define variables used in the code below file_path = "/databricks-datasets/songs/data-001/" table_name = "<table-name>" checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data" schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name) )
A Unity Catalog használata esetén cserélje le
<table-name>
a katalógust, a sémát és a táblanevet a betöltött rekordok (példáuldata_pipelines.songs_data.raw_song_data
) nevére. Ellenkező esetben cserélje le<table-name>
egy tábla nevére, hogy az tartalmazza a betöltött rekordokat, példáulraw_song_data
: .Cserélje le
<checkpoint-path>
egy könyvtár elérési útjára a DBFS-ben az ellenőrzőpontfájlok karbantartásához, például/tmp/pipeline_get_started/_checkpoint/song_data
.Kattintson a gombra, és válassza a Cella futtatása parancsot. Ez a példa az adatsémát a benne található
file_path
összes fájlbólREADME
betölti, és a megadotttable_name
táblába írja az adatokat.
4. lépés: A nyers adatok előkészítése
A nyers adatok elemzésre való előkészítéséhez az alábbi lépések átalakítják a nyers zeneszámok adatait a szükségtelen oszlopok szűrésével és egy új, időbélyeget tartalmazó mező hozzáadásával az új rekord létrehozásához.
Az oldalsávon kattintson az Új gombra, és válassza a Jegyzetfüzet lehetőséget a menüből. Megjelenik a Jegyzetfüzet létrehozása párbeszédpanel.
Adja meg a jegyzetfüzet nevét. Például:
Prepare songs data
. Módosítsa az alapértelmezett nyelvet SQL-re.Írja be a következőt a jegyzetfüzet első cellájába:
CREATE OR REPLACE TABLE <table-name> ( artist_id STRING, artist_name STRING, duration DOUBLE, release STRING, tempo DOUBLE, time_signature DOUBLE, title STRING, year DOUBLE, processed_time TIMESTAMP ); INSERT INTO <table-name> SELECT artist_id, artist_name, duration, release, tempo, time_signature, title, year, current_timestamp() FROM <raw-songs-table-name>
A Unity Catalog használata esetén cserélje le
<table-name>
a katalógus, a séma és a tábla nevét a szűrt és átalakított rekordok (példáuldata_pipelines.songs_data.prepared_song_data
) nevére. Ellenkező esetben cserélje le<table-name>
egy tábla nevére a szűrt és átalakított rekordokat (példáulprepared_song_data
).Cserélje le
<raw-songs-table-name>
az előző lépésben betöltött nyers zeneszámok rekordjait tartalmazó táblázat nevére.Kattintson a gombra, és válassza a Cella futtatása parancsot.
5. lépés: Az átalakított adatok lekérdezése
Ebben a lépésben kibővítheti a feldolgozási folyamatot lekérdezések hozzáadásával a dalok adatainak elemzéséhez. Ezek a lekérdezések az előző lépésben létrehozott előkészített rekordokat használják.
Az oldalsávon kattintson az Új gombra, és válassza a Jegyzetfüzet lehetőséget a menüből. Megjelenik a Jegyzetfüzet létrehozása párbeszédpanel.
Adja meg a jegyzetfüzet nevét. Például:
Analyze songs data
. Módosítsa az alapértelmezett nyelvet SQL-re.Írja be a következőt a jegyzetfüzet első cellájába:
-- Which artists released the most songs each year? SELECT artist_name, count(artist_name) AS num_songs, year FROM <prepared-songs-table-name> WHERE year > 0 GROUP BY artist_name, year ORDER BY num_songs DESC, year DESC
Cserélje le
<prepared-songs-table-name>
az előkészített adatokat tartalmazó tábla nevére. Például:data_pipelines.songs_data.prepared_song_data
.Kattintson a cellaműveletek menüre, válassza az Alábbi cella hozzáadása lehetőséget, és írja be a következőt az új cellába:
-- Find songs for your DJ list SELECT artist_name, title, tempo FROM <prepared-songs-table-name> WHERE time_signature = 4 AND tempo between 100 and 140;
Cserélje le
<prepared-songs-table-name>
az előző lépésben létrehozott előkészített tábla nevére. Például:data_pipelines.songs_data.prepared_song_data
.A lekérdezések futtatásához és a kimenet megtekintéséhez kattintson az Összes futtatása elemre.
6. lépés: Azure Databricks-feladat létrehozása a folyamat futtatásához
Létrehozhat egy munkafolyamatot, amely automatizálja az adatbetöltési, feldolgozási és elemzési lépések futtatását egy Azure Databricks-feladat használatával.
- A Adattudomány > Mérnöki munkaterületen tegye az alábbiak egyikét:
- Kattintson a Munkafolyamatok elemre az oldalsávon, és kattintson a gombra .
- Az oldalsávon kattintson az Új gombra, és válassza a Feladat lehetőséget.
- A Feladatok lap Feladat párbeszédpanelén cserélje le a Feladat neve hozzáadása... elemet a feladat nevére. Például a "Songs munkafolyamat".
- A Tevékenység név mezőbe írja be az első tevékenység nevét, például
Ingest_songs_data
: . - A Típus mezőben válassza ki a Jegyzetfüzet feladattípust.
- A Forrás területen válassza a Munkaterület lehetőséget.
- A fájlböngészővel keresse meg az adatbetöltési jegyzetfüzetet, kattintson a jegyzetfüzet nevére, és kattintson a Megerősítés gombra.
- A Fürt területen válassza ki a Shared_job_cluster vagy a lépésben létrehozott fürtöt
Create a cluster
. - Kattintson a Létrehozás gombra.
- Kattintson az imént létrehozott feladat alá, és válassza a Jegyzetfüzet lehetőséget.
- A Tevékenység név mezőbe írja be például
Prepare_songs_data
a tevékenység nevét. - A Típus mezőben válassza ki a Jegyzetfüzet feladattípust.
- A Forrás területen válassza a Munkaterület lehetőséget.
- A fájlböngészővel keresse meg az adatelőkészítési jegyzetfüzetet, kattintson a jegyzetfüzet nevére, és kattintson a Megerősítés gombra.
- A Fürt területen válassza ki a Shared_job_cluster vagy a lépésben létrehozott fürtöt
Create a cluster
. - Kattintson a Létrehozás gombra.
- Kattintson az imént létrehozott feladat alá, és válassza a Jegyzetfüzet lehetőséget.
- A Tevékenység név mezőbe írja be például
Analyze_songs_data
a tevékenység nevét. - A Típus mezőben válassza ki a Jegyzetfüzet feladattípust.
- A Forrás területen válassza a Munkaterület lehetőséget.
- A fájlböngészővel keresse meg az adatelemzési jegyzetfüzetet, kattintson a jegyzetfüzet nevére, majd a Megerősítés gombra.
- A Fürt területen válassza ki a Shared_job_cluster vagy a lépésben létrehozott fürtöt
Create a cluster
. - Kattintson a Létrehozás gombra.
- A munkafolyamat futtatásához kattintson a gombra . A futtatás részleteinek megtekintéséhez kattintson a feladatfuttatási nézetben a futtatás Kezdési idő oszlopában található hivatkozásra. Kattintson az egyes tevékenységekre a feladatfuttatás részleteinek megtekintéséhez.
- Ha meg szeretné tekinteni az eredményeket a munkafolyamat befejezésekor, kattintson a végső adatelemzési feladatra. Megjelenik a Kimenet lap, és megjeleníti a lekérdezés eredményeit.
7. lépés: Az adatfolyam-feladat ütemezése
Feljegyzés
Az Azure Databricks-feladat ütemezett munkafolyamatok vezénylésére való használatának bemutatásához ez az első lépéseket bemutató példa külön jegyzetfüzetekre bontja a betöltési, előkészítési és elemzési lépéseket, majd minden egyes jegyzetfüzetet használ egy feladat létrehozására a feladatban. Ha az összes feldolgozás egyetlen jegyzetfüzetben található, egyszerűen ütemezheti a jegyzetfüzetet közvetlenül az Azure Databricks notebook felhasználói felületéről. Lásd: Ütemezett jegyzetfüzet-feladatok létrehozása és kezelése.
Gyakori követelmény egy adatfolyam ütemezett futtatása. A folyamatot futtató feladat ütemezésének meghatározása:
- Kattintson a Munkafolyamatok elemre az oldalsávon.
- A Név oszlopban kattintson a feladat nevére. Az oldalpanelen megjelennek a Feladat részletei.
- Kattintson az Eseményindító hozzáadása elemre a Feladat részletei panelen, és válassza az Ütemezett eseményindító típusa lehetőséget.
- Adja meg az időszakot, a kezdési időt és az időzónát. Ha szeretné, jelölje be a Cron szintaxis megjelenítése jelölőnégyzetet az ütemezés kvarc cron szintaxisban való megjelenítéséhez és szerkesztéséhez.
- Kattintson a Mentés gombra.
További információ
- A Databricks-jegyzetfüzetekről további információt a Databricks-jegyzetfüzetek bemutatása című témakörben talál.
- Az Azure Databricks-feladatokról további információt a Mik azok a Databricks-feladatok?
- További információ a Delta Lake-ről: Mi a Delta Lake?
- Ha többet szeretne megtudni az adatfeldolgozási folyamatokról a Delta Live Tables használatával, olvassa el a Mi az a Delta Live Tables?