Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Ez az oktatóanyag bemutatja, hogyan hozhat létre és helyezhet üzembe ETL-folyamatokat (kinyerés, átalakítás és betöltés) az adatvezényléshez a Lakeflow Spark deklaratív folyamataival és az automatikus betöltővel. Az ETL-folyamatok implementálják a forrásrendszerekből származó adatok beolvasásának lépéseit, átalakítják az adatokat olyan követelmények alapján, mint az adatminőség-ellenőrzések és a duplikációk rögzítése, és az adatokat egy célrendszerbe, például egy adattárházba vagy egy adattóba írják.
Ebben az oktatóanyagban adatcsővezetékeket és az Auto Loader eszközt fog használni a következő feladatokra:
- Nyers forrásadatok betöltése céltáblába.
- Alakítsa át a nyers forrásadatokat, és írja az átalakított adatokat két cél materializált nézetbe.
- Az átalakított adatok lekérdezése.
- Automatizálja az ETL-folyamatot egy Databricks-feladattal.
További információ a folyamatokról és az automatikus betöltőről: Lakeflow Spark Deklaratív folyamatok és Mi az automatikus betöltő?
Követelmények
Az oktatóanyag elvégzéséhez meg kell felelnie a következő követelményeknek:
- Be kell jelentkeznie egy Azure Databricks-munkaterületre.
- Engedélyezze a Unity-katalógust a munkaterületen.
- Engedélyezze a kiszolgáló nélküli számítást a fiókjához. A kiszolgáló nélküli Lakeflow Spark deklaratív adatfolyamok nem érhetők el minden munkaterületi régióban. Tekintse meg az elérhető régiók korlátozott regionális rendelkezésre állásával rendelkező funkciókat .
- Rendelkezik engedéllyel számítási erőforrás létrehozására vagy egy számítási erőforráshoz való hozzáférésre.
- Új séma katalógusban való létrehozására vonatkozó engedélyekkel rendelkezik. A szükséges engedélyek a következők:
ALL PRIVILEGESvagyUSE CATALOG.CREATE SCHEMA - Rendelkezik engedéllyel egy új kötet meglévő sémában való létrehozásához. A szükséges engedélyek a következők:
ALL PRIVILEGESvagyUSE SCHEMA.CREATE VOLUME
Tudnivalók az adatkészletről
Az ebben a példában használt adatkészlet a Million Song Dataset egy részhalmaza, amely a kortárs zeneszámok funkcióinak és metaadatainak gyűjteménye. Ez az adatkészlet az Azure Databricks-munkaterületen található mintaadatkészletekben érhető el.
1. lépés: Folyamat létrehozása
Először hozzon létre egy folyamatot a fájlokban lévő adathalmazok (más néven forráskód) folyamatszintaxissal történő definiálásával. Minden forráskódfájl csak egy nyelvet tartalmazhat, de több nyelvspecifikus fájlt is hozzáadhat a folyamathoz. További információ: Lakeflow Spark Deklaratív folyamatok
Ez az oktatóanyag kiszolgáló nélküli számítást és Unity-katalógust használ. A nem megadott összes konfigurációs beállításhoz használja az alapértelmezett beállításokat. Ha a kiszolgáló nélküli számítás nem engedélyezett vagy támogatott a munkaterületen, az oktatóanyagot az alapértelmezett számítási beállítások használatával írott módon végezheti el.
Új folyamat létrehozásához kövesse az alábbi lépéseket:
- A munkaterületen kattintson a
Új az oldalsávon, majd válassza az ETL-folyamat lehetőséget.
- Adjon egyedi nevet a pipeline-nak.
- A név alatt válassza ki a létrehozott adatok alapértelmezett katalógusát és sémáját. Az átalakítások során más célhelyeket is megadhat, de ez az oktatóanyag ezeket az alapértelmezett értékeket használja. A létrehozott katalógushoz és sémához engedélyekkel kell rendelkeznie. Lásd: Követelmények.
- Ebben az oktatóanyagban válassza az Indítás üres fájllal lehetőséget.
- A Mappa elérési útján adja meg a forrásfájlok helyét, vagy fogadja el az alapértelmezettet (a felhasználói mappát).
- Válassza a Pythont vagy az SQL-t az első forrásfájl nyelveként (a folyamatok keverhetik és egyeztethetik a nyelveket, de minden fájlnak egyetlen nyelven kell lennie).
- Kattintson a Kijelölés.
Megjelenik az új folyamat folyamatszerkesztője. Létrejön egy üres forrásfájl a nyelvhez, amely készen áll az első átalakításra.
2. lépés: A folyamatlogika fejlesztése
Ebben a lépésben a Lakeflow Pipelines-szerkesztőt fogja használni a folyamat forráskódjának interaktív fejlesztéséhez és ellenőrzéséhez.
A kód automatikus betöltőt használ a növekményes adatbetöltéshez. Az automatikus betöltő automatikusan észleli és feldolgozza az új fájlokat, amikor megérkeztek a felhőobjektum-tárolóba. További információ: Mi az automatikus betöltő?
A rendszer automatikusan létrehoz és konfigurál egy üres forráskódfájlt a folyamathoz. A fájl a folyamat átalakítások mappájában jön létre. Alapértelmezés szerint az átalakítások mappában lévő összes *.py és *.sql fájl a folyamat forrásának része.
Másolja és illessze be a következő kódot a forrásfájlba. Ügyeljen arra, hogy az 1. lépésben a fájlhoz kiválasztott nyelvet használja.
Python
# Import modules from pyspark import pipelines as dp from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dp.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path)) # Define a materialized view that validates data and renames a column @dp.materialized_view( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dp.expect("valid_artist_name", "artist_name IS NOT NULL") @dp.expect("valid_title", "song_title IS NOT NULL") @dp.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dp.materialized_view( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/part*', format => "csv", header => "false", delimiter => "\t", schema => """ artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING """, schemaEvolutionMode => "none"); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC;Ez a forrás három lekérdezés kódját tartalmazza. Ezeket a lekérdezéseket külön fájlokba is elhelyezheti, hogy a kívánt módon rendszerezze a fájlokat és a kódot.
Kattintson a
Futtassa a fájlt vagy a futtatási folyamatot a csatlakoztatott folyamat frissítésének elindításához. A folyamat egyetlen forrásfájljával ezek funkcionálisan egyenértékűek.
Amikor a frissítés befejeződik, a szerkesztő frissítve lesz a munkafolyamatra vonatkozó információkkal.
- A folyamatdiagram (DAG) a kódtól jobbra található oldalsávon három táblát jelenít meg,
songs_rawsongs_preparedéstop_artists_by_year. - A frissítés összegzése a folyamateszközök böngésző tetején látható.
- A létrehozott táblák részletei az alsó panelen jelennek meg, és a táblák adatai között böngészhet az egyik kiválasztásával.
Ez magában foglalja a nyers és megtisztított adatokat, valamint néhány egyszerű elemzést, hogy megtalálja a legjobb művészeket évről évre. A következő lépésben ad hoc lekérdezéseket hoz létre további elemzés céljából egy külön fájlban a folyamat során.
3. lépés: A folyamat által létrehozott adathalmazok megismerése
Ebben a lépésben alkalmi lekérdezéseket hajt végre az ETL-folyamatban feldolgozott adatokról a Databricks SQL-szerkesztőben a daladatok elemzéséhez. Ezek a lekérdezések az előző lépésben létrehozott előkészített rekordokat használják.
Először futtasson egy lekérdezést, amely megkeresi azokat a művészeket, akik 1990 óta minden évben a legtöbb dalt adják ki.
A pipeline eszközök böngésző oldalsávjáról kattintson a
Adja hozzá majd Feltárás.
Adjon meg egy nevet , és válassza ki az SQL-t a feltárási fájlhoz. Egy SQL-jegyzetfüzet jön létre egy új
explorationsmappában. A mappában lévőexplorationsfájlok alapértelmezés szerint nem egy folyamatfrissítés részeként futnak. Az SQL-jegyzetfüzetben vannak olyan cellák, amelyeket együtt vagy külön futtathat.Ha olyan előadói táblázatot szeretne létrehozni, amely 1990 után minden évben kiadja a legtöbb dalt, írja be a következő kódot az új SQL-fájlba (ha van mintakód a fájlban, cserélje le). Mivel ez a jegyzetfüzet nem része a folyamatnak, nem használja az alapértelmezett katalógust és sémát. Cserélje le a
<catalog>.<schema>-t annak a katalógusnak és sémának a használatára, amelyet alapértelmezettként használ a folyamat során.-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC;Kattintson a
vagy nyomja le
Shift + Entera lekérdezés futtatásához.
Most futtasson egy másik lekérdezést, amely 4/4 ütemes és táncolható tempóval rendelkező dalokat talál.
Adja hozzá a következő kódot ugyanabban a fájlban lévő következő cellához. Ismét cserélje le a
<catalog>.<schema>elemet azokra a katalógusra és sémára, amelyeket a folyamat alapértelmezettjeként használt.-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;Kattintson a
vagy nyomja le
Shift + Entera lekérdezés futtatásához.
4. lépés: Feladat létrehozása a folyamat futtatásához
Ezután hozzon létre egy munkafolyamatot az adatbetöltési, feldolgozási és elemzési lépések automatizálásához egy ütemezés szerint futó Databricks-feladattal.
- A szerkesztő felületének tetején válassza ki az Ütemezés gombot.
- Ha megjelenik az Ütemezések párbeszédpanel, válassza az Ütemezés hozzáadása lehetőséget.
- Ekkor megnyílik az Új ütemezés párbeszédpanel, ahol létrehozhat egy feladatot a folyamat ütemezés szerinti futtatásához.
- Ha szeretné, adjon nevet a feladatnak.
- Alapértelmezés szerint az ütemezés naponta egyszer fut. Elfogadhatja ezt a hibaeltéréseket, vagy beállíthatja a saját ütemezését. A Speciális lehetőséget választva megadhatja, hogy a feladat mikor fusson. A További beállítások lehetőséget választva értesítéseket hozhat létre a feladat futtatásakor.
- A módosítások alkalmazásához és a feladat létrehozásához válassza a Létrehozás lehetőséget .
Most a feladat naponta fut, hogy naprakészen tartsa a pipeline-t. Az ütemezések listájának megtekintéséhez válassza ismét az Ütemezés lehetőséget. A folyamat ütemezéseit ezen a párbeszédpanelen kezelheti, beleértve az ütemezések hozzáadását, szerkesztését vagy eltávolítását.
Az ütemezés (vagy feladat) nevére kattintva a feladat lapjára léphet a Feladatok > folyamatok listájában. Innen megtekintheti a feladatfuttatások részleteit, beleértve a futtatások előzményeit, vagy futtathatja a feladatot azonnal a Futtatás most gombbal.
A feladatfuttatásokkal kapcsolatos további információkért tekintse meg a Lakeflow-feladatok monitorozását és megfigyelhetőségét .
További információ
- További információ az adatfeldolgozási folyamatokról: Lakeflow Spark Deklaratív folyamatok
- A Databricks-jegyzetfüzetekről további információt a Databricks-jegyzetfüzetek című témakörben talál.
- További információ a Lakeflow-feladatokról: Mik azok a feladatok?
- További információ a Delta Lake-ről: Mi az a Delta Lake az Azure Databricksben?