Bagikan melalui


Tutorial: Membangun alur ETL dengan Alur Deklaratif Lakeflow

Pelajari cara membuat dan menyebarkan alur ETL (ekstrak, transformasi, dan muat) untuk orkestrasi data menggunakan Alur Deklaratif Lakeflow dan Auto Loader. Alur ETL menerapkan langkah-langkah untuk membaca data dari sistem sumber, mengubah data tersebut berdasarkan persyaratan, seperti pemeriksaan kualitas data dan merekam de-duplikasi, dan menulis data ke sistem target, seperti gudang data atau data lake.

Dalam tutorial ini, Anda akan menggunakan Alur Deklaratif Lakeflow dan Auto Loader untuk:

  • Memasukkan data sumber mentah ke dalam tabel target.
  • Ubah data sumber mentah dan masukkan data yang telah diubah ke dalam dua tampilan materialisasi target.
  • Mengkueri data yang ditransformasi.
  • Mengotomatiskan alur ETL dengan tugas Databricks.

Untuk informasi selengkapnya tentang Alur Deklaratif Lakeflow dan Auto Loader, lihat Alur Deklaratif Lakeflow dan Apa itu Auto Loader?

Persyaratan

Untuk menyelesaikan tutorial ini, Anda harus memenuhi persyaratan berikut:

Tentang himpunan data

Himpunan data yang digunakan dalam contoh ini adalah subset dari Himpunan Data Jutaan Lagu, kumpulan fitur dan metadata untuk trek musik kontemporer. Himpunan data ini tersedia dalam himpunan data sampel yang disertakan di ruang kerja Azure Databricks Anda.

Langkah 1: Membuat alur

Pertama, Anda akan membuat pipeline ETL di Lakeflow Declarative Pipelines. Alur Deklaratif Lakeflow membuat alur dengan menyelesaikan dependensi yang ditentukan dalam buku catatan atau file (disebut kode sumber) menggunakan sintaksIs Alur Deklaratif Lakeflow. Setiap file kode sumber hanya boleh berisi satu bahasa, tetapi Anda dapat menambahkan beberapa buku catatan atau file khusus bahasa dalam alur. Untuk mempelajari selengkapnya, lihat Alur Deklaratif Lakeflow

Penting

Biarkan bidang Kode sumber kosong untuk membuat dan mengonfigurasi buku catatan untuk penulisan kode sumber secara otomatis.

Tutorial ini menggunakan komputasi tanpa server dan Katalog Unity. Untuk semua opsi konfigurasi yang tidak ditentukan, gunakan pengaturan default. Jika komputasi tanpa server tidak diaktifkan atau didukung di ruang kerja, Anda dapat menyelesaikan tutorial seperti yang ditulis menggunakan pengaturan komputasi default. Jika Anda menggunakan pengaturan komputasi default, Anda harus memilih Katalog Unity secara manual di bawah Opsi penyimpanan di bagian Tujuan dari UI Buat alur .

Untuk membuat alur ETL baru di Alur Deklaratif Lakeflow, ikuti langkah-langkah berikut:

  1. Di ruang kerja Anda, klik ikon Alur Kerja.Pekerjaan & Alur Kerja di bar samping.
  2. Di bawah Baru, klik Alur ETL.
  3. Di Nama alur, ketik nama alur yang unik.
  4. Pilih kotak centang Tanpa Server .
  5. Di Tujuan, untuk mengonfigurasi lokasi Katalog Unity tempat tabel diterbitkan, pilih Katalog yang sudah ada dan tulis nama baru di Skema untuk membuat skema baru di katalog Anda.
  6. Klik Buat.

Antarmuka pipeline muncul untuk pipeline baru.

Langkah 2: Mengembangkan alur

Penting

Notebook hanya dapat berisi satu bahasa pemrograman. Jangan mencampur kode Python dan SQL dalam notebook kode sumber alur.

Dalam langkah ini, Anda akan menggunakan Notebook Databricks untuk mengembangkan dan memvalidasi kode sumber untuk Alur Deklaratif Lakeflow secara interaktif.

Kode ini menggunakan Auto Loader untuk penyerapan data inkremental. Auto Loader secara otomatis mendeteksi dan memproses file baru saat tiba di penyimpanan objek cloud. Untuk mempelajari selengkapnya, lihat Apa itu Auto Loader?

Buku catatan kode sumber kosong secara otomatis dibuat dan dikonfigurasi untuk rangkaian proses. Buku catatan dibuat di direktori baru di direktori pengguna Anda. Nama direktori dan file baru cocok dengan nama alur Anda. Contohnya,/Users/someone@example.com/my_pipeline/my_pipeline.

Saat mengembangkan alur, Anda dapat memilih Python atau SQL. Contoh disertakan untuk kedua bahasa. Berdasarkan pilihan bahasa Anda, periksa apakah Anda memilih bahasa buku catatan default. Untuk mempelajari selengkapnya tentang dukungan buku catatan untuk pengembangan kode Alur Deklaratif Lakeflow, lihat Mengembangkan dan men-debug alur ETL dengan buku catatan di Alur Deklaratif Lakeflow.

  1. Tautan untuk mengakses buku catatan ini berada di bawah bidang Kode sumber di panel Detail alur . Klik tautan untuk membuka buku catatan sebelum melanjutkan ke langkah berikutnya.

  2. Klik Sambungkan di kanan atas untuk membuka menu konfigurasi komputasi.

  3. Arahkan mouse ke atas nama alur yang Anda buat di Langkah 1.

  4. Klik Sambungkan.

  5. Di samping judul buku catatan Anda di bagian atas, pilih bahasa default buku catatan (Python atau SQL).

  6. Salin dan tempel kode berikut ke dalam sel di buku catatan.

    Phyton

    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dlt.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .option("inferSchema", True)
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dlt.table(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
     artist_id STRING,
     artist_lat DOUBLE,
     artist_long DOUBLE,
     artist_location STRING,
     artist_name STRING,
     duration DOUBLE,
     end_of_fade_in DOUBLE,
     key INT,
     key_confidence DOUBLE,
     loudness DOUBLE,
     release STRING,
     song_hotnes DOUBLE,
     song_id STRING,
     start_of_fade_out DOUBLE,
     tempo DOUBLE,
     time_signature INT,
     time_signature_confidence DOUBLE,
     title STRING,
     year INT,
     partial_sequence STRING,
     value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
     artist_name,
     year,
     COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC
    
  7. Klik Mulai untuk memulai pembaruan untuk alur yang tersambung.

Langkah 3: Mengkueri data yang diubah

Dalam langkah ini, Anda akan mengkueri data yang diproses di alur ETL untuk menganalisis data lagu. Kueri ini menggunakan rekaman yang disiapkan yang dibuat pada langkah sebelumnya.

Pertama, jalankan kueri yang menemukan artis yang telah merilis lagu terbanyak setiap tahun sejak 1990.

  1. Di bilah sisi, klik ikon SQL EditorSQL Editor.

  2. Klik ikon tambah atau ikon plus tab baru dan pilih Buat kueri baru dari menu.

  3. Masukkan yang berikut ini:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC
    

    Ganti <catalog> dan <schema> dengan nama katalog dan skema tabel tersebut berada. Contohnya,data_pipelines.songs_data.top_artists_by_year.

  4. Klik Jalankan yang dipilih.

Sekarang, jalankan kueri lain yang menemukan lagu dengan ritme 4/4 dan tempo yang dapat didansa.

  1. Klik ikon Tambah atau ikon plus ikon ketuk baru dan pilih Buat kueri baru dari menu.

  2. Masukkan kode berikut:

     -- Find songs with a 4/4 beat and danceable tempo
     SELECT artist_name, song_title, tempo
     FROM <catalog>.<schema>.songs_prepared
     WHERE time_signature = 4 AND tempo between 100 and 140;
    

    Ganti <catalog> dan <schema> dengan nama katalog dan skema tabel tersebut berada. Contohnya,data_pipelines.songs_data.songs_prepared.

  3. Klik Jalankan yang dipilih.

Langkah 4: Buat tugas untuk menjalankan alur kerja

Selanjutnya, buat alur kerja untuk mengotomatiskan langkah-langkah penyerapan, pemrosesan, dan analisis data menggunakan pekerjaan Databricks.

  1. Di ruang kerja Anda, klik ikon Alur Kerja.Pekerjaan & Alur Kerja di bar samping.
  2. Di bawah Baru, klik Pekerjaan.
  3. Dalam kotak judul tugas, ganti tanggal dan waktu< Pekerjaan >Baru dengan nama pekerjaan Anda. Contohnya,Songs workflow.
  4. Di Nama tugas, masukkan nama untuk tugas pertama, misalnya, ETL_songs_data.
  5. Di Jenis, pilih Alur.
  6. Di Alur, pilih alur yang Anda buat di langkah 1.
  7. Klik Buat.
  8. Untuk menjalankan alur kerja, klik Jalankan Sekarang. Untuk melihat detail eksekusi, klik tab Jalankan . Klik tugas untuk menampilkan detail untuk eksekusi tugas.
  9. Untuk melihat hasil saat alur kerja selesai, klik Buka eksekusi terbaru yang berhasil atau Waktu mulai untuk eksekusi pekerjaan. Halaman Output muncul dan menampilkan hasil kueri.

Lihat Pemantauan dan observabilitas untuk Pekerjaan Lakeflow untuk informasi selengkapnya tentang pelaksanaan pekerjaan.

Langkah 5: Menjadwalkan pekerjaan alur

Untuk menjalankan alur ETL sesuai jadwal, ikuti langkah-langkah berikut:

  1. Navigasikan ke UI Pekerjaan & Alur di ruang kerja Azure Databricks yang sama dengan pekerjaan.
  2. Secara opsional, pilih filter Pekerjaan dan Dimiliki oleh saya .
  3. Di kolom Nama , klik nama pekerjaan. Panel samping menampilkan detail Pekerjaan.
  4. Klik Tambahkan pemicu di panel Jadwal & Pemicu dan pilih Terjadwal dalam Jenis pemicu.
  5. Tentukan periode, waktu mulai, dan zona waktu.
  6. Klik Simpan.

Pelajari lebih lanjut