Megosztás a következőn keresztül:


Oktatóanyag: ETL-folyamat létrehozása a Lakeflow Spark deklaratív folyamataival

Ez az oktatóanyag bemutatja, hogyan hozhat létre és helyezhet üzembe ETL-folyamatokat (kinyerés, átalakítás és betöltés) az adatvezényléshez a Lakeflow Spark deklaratív folyamataival és az automatikus betöltővel. Az ETL-folyamatok implementálják a forrásrendszerekből származó adatok beolvasásának lépéseit, átalakítják az adatokat olyan követelmények alapján, mint az adatminőség-ellenőrzések és a duplikációk rögzítése, és az adatokat egy célrendszerbe, például egy adattárházba vagy egy adattóba írják.

Ebben az oktatóanyagban adatcsővezetékeket és az Auto Loader eszközt fog használni a következő feladatokra:

  • Nyers forrásadatok betöltése céltáblába.
  • Alakítsa át a nyers forrásadatokat, és írja az átalakított adatokat két cél materializált nézetbe.
  • Az átalakított adatok lekérdezése.
  • Automatizálja az ETL-folyamatot egy Databricks-feladattal.

További információ a folyamatokról és az automatikus betöltőről: Lakeflow Spark Deklaratív folyamatok és Mi az automatikus betöltő?

Követelmények

Az oktatóanyag elvégzéséhez meg kell felelnie a következő követelményeknek:

Tudnivalók az adatkészletről

Az ebben a példában használt adatkészlet a Million Song Dataset egy részhalmaza, amely a kortárs zeneszámok funkcióinak és metaadatainak gyűjteménye. Ez az adatkészlet az Azure Databricks-munkaterületen található mintaadatkészletekben érhető el.

1. lépés: Folyamat létrehozása

Először hozzon létre egy folyamatot a fájlokban lévő adathalmazok (más néven forráskód) folyamatszintaxissal történő definiálásával. Minden forráskódfájl csak egy nyelvet tartalmazhat, de több nyelvspecifikus fájlt is hozzáadhat a folyamathoz. További információ: Lakeflow Spark Deklaratív folyamatok

Ez az oktatóanyag kiszolgáló nélküli számítást és Unity-katalógust használ. A nem megadott összes konfigurációs beállításhoz használja az alapértelmezett beállításokat. Ha a kiszolgáló nélküli számítás nem engedélyezett vagy támogatott a munkaterületen, az oktatóanyagot az alapértelmezett számítási beállítások használatával írott módon végezheti el.

Új folyamat létrehozásához kövesse az alábbi lépéseket:

  1. A munkaterületen kattintson a Plusz ikonra.Új az oldalsávon, majd válassza az ETL-folyamat lehetőséget.
  2. Adjon egyedi nevet a pipeline-nak.
  3. A név alatt válassza ki a létrehozott adatok alapértelmezett katalógusát és sémáját. Az átalakítások során más célhelyeket is megadhat, de ez az oktatóanyag ezeket az alapértelmezett értékeket használja. A létrehozott katalógushoz és sémához engedélyekkel kell rendelkeznie. Lásd: Követelmények.
  4. Ebben az oktatóanyagban válassza az Indítás üres fájllal lehetőséget.
  5. A Mappa elérési útján adja meg a forrásfájlok helyét, vagy fogadja el az alapértelmezettet (a felhasználói mappát).
  6. Válassza a Pythont vagy az SQL-t az első forrásfájl nyelveként (a folyamatok keverhetik és egyeztethetik a nyelveket, de minden fájlnak egyetlen nyelven kell lennie).
  7. Kattintson a Kijelölés.

Megjelenik az új folyamat folyamatszerkesztője. Létrejön egy üres forrásfájl a nyelvhez, amely készen áll az első átalakításra.

2. lépés: A folyamatlogika fejlesztése

Ebben a lépésben a Lakeflow Pipelines-szerkesztőt fogja használni a folyamat forráskódjának interaktív fejlesztéséhez és ellenőrzéséhez.

A kód automatikus betöltőt használ a növekményes adatbetöltéshez. Az automatikus betöltő automatikusan észleli és feldolgozza az új fájlokat, amikor megérkeztek a felhőobjektum-tárolóba. További információ: Mi az automatikus betöltő?

A rendszer automatikusan létrehoz és konfigurál egy üres forráskódfájlt a folyamathoz. A fájl a folyamat átalakítások mappájában jön létre. Alapértelmezés szerint az átalakítások mappában lévő összes *.py és *.sql fájl a folyamat forrásának része.

  1. Másolja és illessze be a következő kódot a forrásfájlba. Ügyeljen arra, hogy az 1. lépésben a fájlhoz kiválasztott nyelvet használja.

    Python

    # Import modules
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dp.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dp.materialized_view(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dp.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dp.expect("valid_title", "song_title IS NOT NULL")
    @dp.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dp.materialized_view(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
      '/databricks-datasets/songs/data-001/part*',
      format => "csv",
      header => "false",
      delimiter => "\t",
      schema => """
        artist_id STRING,
        artist_lat DOUBLE,
        artist_long DOUBLE,
        artist_location STRING,
        artist_name STRING,
        duration DOUBLE,
        end_of_fade_in DOUBLE,
        key INT,
        key_confidence DOUBLE,
        loudness DOUBLE,
        release STRING,
        song_hotnes DOUBLE,
        song_id STRING,
        start_of_fade_out DOUBLE,
        tempo DOUBLE,
        time_signature INT,
        time_signature_confidence DOUBLE,
        title STRING,
        year INT,
        partial_sequence STRING
      """,
      schemaEvolutionMode => "none");
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
      artist_name,
      year,
      COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC;
    

    Ez a forrás három lekérdezés kódját tartalmazza. Ezeket a lekérdezéseket külön fájlokba is elhelyezheti, hogy a kívánt módon rendszerezze a fájlokat és a kódot.

  2. Kattintson a Lejátszás ikonra.Futtassa a fájlt vagy a futtatási folyamatot a csatlakoztatott folyamat frissítésének elindításához. A folyamat egyetlen forrásfájljával ezek funkcionálisan egyenértékűek.

Amikor a frissítés befejeződik, a szerkesztő frissítve lesz a munkafolyamatra vonatkozó információkkal.

  • A folyamatdiagram (DAG) a kódtól jobbra található oldalsávon három táblát jelenít meg, songs_rawsongs_preparedés top_artists_by_year.
  • A frissítés összegzése a folyamateszközök böngésző tetején látható.
  • A létrehozott táblák részletei az alsó panelen jelennek meg, és a táblák adatai között böngészhet az egyik kiválasztásával.

Ez magában foglalja a nyers és megtisztított adatokat, valamint néhány egyszerű elemzést, hogy megtalálja a legjobb művészeket évről évre. A következő lépésben ad hoc lekérdezéseket hoz létre további elemzés céljából egy külön fájlban a folyamat során.

3. lépés: A folyamat által létrehozott adathalmazok megismerése

Ebben a lépésben alkalmi lekérdezéseket hajt végre az ETL-folyamatban feldolgozott adatokról a Databricks SQL-szerkesztőben a daladatok elemzéséhez. Ezek a lekérdezések az előző lépésben létrehozott előkészített rekordokat használják.

Először futtasson egy lekérdezést, amely megkeresi azokat a művészeket, akik 1990 óta minden évben a legtöbb dalt adják ki.

  1. A pipeline eszközök böngésző oldalsávjáról kattintson a Plusz ikonra.Adja hozzá majd Feltárás.

  2. Adjon meg egy nevet , és válassza ki az SQL-t a feltárási fájlhoz. Egy SQL-jegyzetfüzet jön létre egy új explorations mappában. A mappában lévő explorations fájlok alapértelmezés szerint nem egy folyamatfrissítés részeként futnak. Az SQL-jegyzetfüzetben vannak olyan cellák, amelyeket együtt vagy külön futtathat.

  3. Ha olyan előadói táblázatot szeretne létrehozni, amely 1990 után minden évben kiadja a legtöbb dalt, írja be a következő kódot az új SQL-fájlba (ha van mintakód a fájlban, cserélje le). Mivel ez a jegyzetfüzet nem része a folyamatnak, nem használja az alapértelmezett katalógust és sémát. Cserélje le a <catalog>.<schema>-t annak a katalógusnak és sémának a használatára, amelyet alapértelmezettként használ a folyamat során.

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.top_artists_by_year
      WHERE year >= 1990
      ORDER BY total_number_of_songs DESC, year DESC;
    
  4. Kattintson a Lejátszás ikonra, vagy nyomja le Shift + Enter a lekérdezés futtatásához.

Most futtasson egy másik lekérdezést, amely 4/4 ütemes és táncolható tempóval rendelkező dalokat talál.

  1. Adja hozzá a következő kódot ugyanabban a fájlban lévő következő cellához. Ismét cserélje le a <catalog>.<schema> elemet azokra a katalógusra és sémára, amelyeket a folyamat alapértelmezettjeként használt.

    -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.songs_prepared
      WHERE time_signature = 4 AND tempo between 100 and 140;
    
  2. Kattintson a Lejátszás ikonra, vagy nyomja le Shift + Enter a lekérdezés futtatásához.

4. lépés: Feladat létrehozása a folyamat futtatásához

Ezután hozzon létre egy munkafolyamatot az adatbetöltési, feldolgozási és elemzési lépések automatizálásához egy ütemezés szerint futó Databricks-feladattal.

  1. A szerkesztő felületének tetején válassza ki az Ütemezés gombot.
  2. Ha megjelenik az Ütemezések párbeszédpanel, válassza az Ütemezés hozzáadása lehetőséget.
  3. Ekkor megnyílik az Új ütemezés párbeszédpanel, ahol létrehozhat egy feladatot a folyamat ütemezés szerinti futtatásához.
  4. Ha szeretné, adjon nevet a feladatnak.
  5. Alapértelmezés szerint az ütemezés naponta egyszer fut. Elfogadhatja ezt a hibaeltéréseket, vagy beállíthatja a saját ütemezését. A Speciális lehetőséget választva megadhatja, hogy a feladat mikor fusson. A További beállítások lehetőséget választva értesítéseket hozhat létre a feladat futtatásakor.
  6. A módosítások alkalmazásához és a feladat létrehozásához válassza a Létrehozás lehetőséget .

Most a feladat naponta fut, hogy naprakészen tartsa a pipeline-t. Az ütemezések listájának megtekintéséhez válassza ismét az Ütemezés lehetőséget. A folyamat ütemezéseit ezen a párbeszédpanelen kezelheti, beleértve az ütemezések hozzáadását, szerkesztését vagy eltávolítását.

Az ütemezés (vagy feladat) nevére kattintva a feladat lapjára léphet a Feladatok > folyamatok listájában. Innen megtekintheti a feladatfuttatások részleteit, beleértve a futtatások előzményeit, vagy futtathatja a feladatot azonnal a Futtatás most gombbal.

A feladatfuttatásokkal kapcsolatos további információkért tekintse meg a Lakeflow-feladatok monitorozását és megfigyelhetőségét .

További információ