Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Pelajari cara membuat dan menyebarkan alur ETL (ekstrak, transformasi, dan muat) untuk orkestrasi data menggunakan Alur Deklaratif Lakeflow dan Auto Loader. Alur ETL menerapkan langkah-langkah untuk membaca data dari sistem sumber, mengubah data tersebut berdasarkan persyaratan, seperti pemeriksaan kualitas data dan merekam de-duplikasi, dan menulis data ke sistem target, seperti gudang data atau data lake.
Dalam tutorial ini, Anda akan menggunakan Alur Deklaratif Lakeflow dan Auto Loader untuk:
- Memasukkan data sumber mentah ke dalam tabel target.
- Ubah data sumber mentah dan masukkan data yang telah diubah ke dalam dua tampilan materialisasi target.
- Mengkueri data yang ditransformasi.
- Mengotomatiskan alur ETL dengan tugas Databricks.
Untuk informasi selengkapnya tentang Alur Deklaratif Lakeflow dan Auto Loader, lihat Alur Deklaratif Lakeflow dan Apa itu Auto Loader?
Persyaratan
Untuk menyelesaikan tutorial ini, Anda harus memenuhi persyaratan berikut:
- Masuk ke ruang kerja Azure Databricks.
- Mengaktifkan Unity Catalog untuk ruang kerja Anda.
- Mengaktifkan komputasi tanpa server untuk akun Anda. Alur Deklaratif Lakeflow Tanpa Server tidak tersedia di semua wilayah ruang kerja. Lihat Fitur dengan ketersediaan regional terbatas untuk wilayah yang tersedia.
- Memiliki izin untuk membuat sumber daya komputasi atau akses ke sumber daya komputasi.
- Memiliki izin untuk membuat skema baru dalam katalog. Izin yang diperlukan adalah
ALL PRIVILEGES
atauUSE CATALOG
danCREATE SCHEMA
. - Memiliki izin untuk membuat volume baru dalam skema yang ada. Izin yang diperlukan adalah
ALL PRIVILEGES
atauUSE SCHEMA
danCREATE VOLUME
.
Tentang himpunan data
Himpunan data yang digunakan dalam contoh ini adalah subset dari Himpunan Data Jutaan Lagu, kumpulan fitur dan metadata untuk trek musik kontemporer. Himpunan data ini tersedia dalam himpunan data sampel yang disertakan di ruang kerja Azure Databricks Anda.
Langkah 1: Membuat alur
Pertama, Anda akan membuat pipeline ETL di Lakeflow Declarative Pipelines. Alur Deklaratif Lakeflow membuat alur dengan menyelesaikan dependensi yang ditentukan dalam buku catatan atau file (disebut kode sumber) menggunakan sintaksIs Alur Deklaratif Lakeflow. Setiap file kode sumber hanya boleh berisi satu bahasa, tetapi Anda dapat menambahkan beberapa buku catatan atau file khusus bahasa dalam alur. Untuk mempelajari selengkapnya, lihat Alur Deklaratif Lakeflow
Penting
Biarkan bidang Kode sumber kosong untuk membuat dan mengonfigurasi buku catatan untuk penulisan kode sumber secara otomatis.
Tutorial ini menggunakan komputasi tanpa server dan Katalog Unity. Untuk semua opsi konfigurasi yang tidak ditentukan, gunakan pengaturan default. Jika komputasi tanpa server tidak diaktifkan atau didukung di ruang kerja, Anda dapat menyelesaikan tutorial seperti yang ditulis menggunakan pengaturan komputasi default. Jika Anda menggunakan pengaturan komputasi default, Anda harus memilih Katalog Unity secara manual di bawah Opsi penyimpanan di bagian Tujuan dari UI Buat alur .
Untuk membuat alur ETL baru di Alur Deklaratif Lakeflow, ikuti langkah-langkah berikut:
- Di ruang kerja Anda, klik
ikon Alur Kerja. Pekerjaan & Alur Kerja di bar samping. - Di bawah Baru, klik Alur ETL.
- Di Nama alur, ketik nama alur yang unik.
- Pilih kotak centang Tanpa Server .
- Di Tujuan, untuk mengonfigurasi lokasi Katalog Unity tempat tabel diterbitkan, pilih Katalog yang sudah ada dan tulis nama baru di Skema untuk membuat skema baru di katalog Anda.
- Klik Buat.
Antarmuka pipeline muncul untuk pipeline baru.
Langkah 2: Mengembangkan alur
Penting
Notebook hanya dapat berisi satu bahasa pemrograman. Jangan mencampur kode Python dan SQL dalam notebook kode sumber alur.
Dalam langkah ini, Anda akan menggunakan Notebook Databricks untuk mengembangkan dan memvalidasi kode sumber untuk Alur Deklaratif Lakeflow secara interaktif.
Kode ini menggunakan Auto Loader untuk penyerapan data inkremental. Auto Loader secara otomatis mendeteksi dan memproses file baru saat tiba di penyimpanan objek cloud. Untuk mempelajari selengkapnya, lihat Apa itu Auto Loader?
Buku catatan kode sumber kosong secara otomatis dibuat dan dikonfigurasi untuk rangkaian proses. Buku catatan dibuat di direktori baru di direktori pengguna Anda. Nama direktori dan file baru cocok dengan nama alur Anda. Contohnya,/Users/someone@example.com/my_pipeline/my_pipeline
.
Saat mengembangkan alur, Anda dapat memilih Python atau SQL. Contoh disertakan untuk kedua bahasa. Berdasarkan pilihan bahasa Anda, periksa apakah Anda memilih bahasa buku catatan default. Untuk mempelajari selengkapnya tentang dukungan buku catatan untuk pengembangan kode Alur Deklaratif Lakeflow, lihat Mengembangkan dan men-debug alur ETL dengan buku catatan di Alur Deklaratif Lakeflow.
Tautan untuk mengakses buku catatan ini berada di bawah bidang Kode sumber di panel Detail alur . Klik tautan untuk membuka buku catatan sebelum melanjutkan ke langkah berikutnya.
Klik Sambungkan di kanan atas untuk membuka menu konfigurasi komputasi.
Arahkan mouse ke atas nama alur yang Anda buat di Langkah 1.
Klik Sambungkan.
Di samping judul buku catatan Anda di bagian atas, pilih bahasa default buku catatan (Python atau SQL).
Salin dan tempel kode berikut ke dalam sel di buku catatan.
Phyton
# Import modules import dlt from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dlt.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .option("inferSchema", True) .load(file_path)) # Define a materialized view that validates data and renames a column @dlt.table( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dlt.expect("valid_artist_name", "artist_name IS NOT NULL") @dlt.expect("valid_title", "song_title IS NOT NULL") @dlt.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dlt.table( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )
SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw ( artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING, value STRING ) COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/'); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC
Klik Mulai untuk memulai pembaruan untuk alur yang tersambung.
Langkah 3: Mengkueri data yang diubah
Dalam langkah ini, Anda akan mengkueri data yang diproses di alur ETL untuk menganalisis data lagu. Kueri ini menggunakan rekaman yang disiapkan yang dibuat pada langkah sebelumnya.
Pertama, jalankan kueri yang menemukan artis yang telah merilis lagu terbanyak setiap tahun sejak 1990.
Di bilah sisi, klik ikon
SQL Editor.
Klik ikon
tab baru dan pilih Buat kueri baru dari menu.
Masukkan yang berikut ini:
-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC
Ganti
<catalog>
dan<schema>
dengan nama katalog dan skema tabel tersebut berada. Contohnya,data_pipelines.songs_data.top_artists_by_year
.Klik Jalankan yang dipilih.
Sekarang, jalankan kueri lain yang menemukan lagu dengan ritme 4/4 dan tempo yang dapat didansa.
Klik ikon
ikon ketuk baru dan pilih Buat kueri baru dari menu.
Masukkan kode berikut:
-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;
Ganti
<catalog>
dan<schema>
dengan nama katalog dan skema tabel tersebut berada. Contohnya,data_pipelines.songs_data.songs_prepared
.Klik Jalankan yang dipilih.
Langkah 4: Buat tugas untuk menjalankan alur kerja
Selanjutnya, buat alur kerja untuk mengotomatiskan langkah-langkah penyerapan, pemrosesan, dan analisis data menggunakan pekerjaan Databricks.
- Di ruang kerja Anda, klik
ikon Alur Kerja. Pekerjaan & Alur Kerja di bar samping. - Di bawah Baru, klik Pekerjaan.
- Dalam kotak judul tugas, ganti tanggal dan waktu< Pekerjaan >Baru dengan nama pekerjaan Anda. Contohnya,
Songs workflow
. - Di Nama tugas, masukkan nama untuk tugas pertama, misalnya,
ETL_songs_data
. - Di Jenis, pilih Alur.
- Di Alur, pilih alur yang Anda buat di langkah 1.
- Klik Buat.
- Untuk menjalankan alur kerja, klik Jalankan Sekarang. Untuk melihat detail eksekusi, klik tab Jalankan . Klik tugas untuk menampilkan detail untuk eksekusi tugas.
- Untuk melihat hasil saat alur kerja selesai, klik Buka eksekusi terbaru yang berhasil atau Waktu mulai untuk eksekusi pekerjaan. Halaman Output muncul dan menampilkan hasil kueri.
Lihat Pemantauan dan observabilitas untuk Pekerjaan Lakeflow untuk informasi selengkapnya tentang pelaksanaan pekerjaan.
Langkah 5: Menjadwalkan pekerjaan alur
Untuk menjalankan alur ETL sesuai jadwal, ikuti langkah-langkah berikut:
- Navigasikan ke UI Pekerjaan & Alur di ruang kerja Azure Databricks yang sama dengan pekerjaan.
- Secara opsional, pilih filter Pekerjaan dan Dimiliki oleh saya .
- Di kolom Nama , klik nama pekerjaan. Panel samping menampilkan detail Pekerjaan.
- Klik Tambahkan pemicu di panel Jadwal & Pemicu dan pilih Terjadwal dalam Jenis pemicu.
- Tentukan periode, waktu mulai, dan zona waktu.
- Klik Simpan.
Pelajari lebih lanjut
- Untuk mempelajari selengkapnya tentang alur pemrosesan data dengan Alur Deklaratif Lakeflow, lihat Alur Deklaratif Lakeflow
- Untuk mempelajari selengkapnya tentang Buku Catatan Databricks, lihat Pengantar buku catatan Databricks.
- Untuk mempelajari selengkapnya tentang Pekerjaan Lakeflow, lihat Apa itu pekerjaan?
- Untuk mempelajari selengkapnya tentang Delta Lake, lihat Apa itu Delta Lake di Azure Databricks?