Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Tutorial ini menjelaskan cara membuat dan menyebarkan alur ETL (ekstrak, transformasi, dan muat) untuk orkestrasi data menggunakan Alur Deklaratif Lakeflow Spark dan Auto Loader. Alur ETL menerapkan langkah-langkah untuk membaca data dari sistem sumber, mengubah data tersebut berdasarkan persyaratan, seperti pemeriksaan kualitas data dan merekam de-duplikasi, dan menulis data ke sistem target, seperti gudang data atau data lake.
Dalam tutorial ini, Anda akan menggunakan alur dan Auto Loader untuk:
- Memasukkan data sumber mentah ke dalam tabel target.
- Ubah data sumber mentah dan masukkan data yang telah diubah ke dalam dua tampilan materialisasi target.
- Mengkueri data yang ditransformasi.
- Mengotomatiskan alur ETL dengan tugas Databricks.
Untuk informasi selengkapnya tentang alur dan Auto Loader, lihat Alur Deklaratif Lakeflow Spark dan Apa itu Auto Loader?
Persyaratan
Untuk menyelesaikan tutorial ini, Anda harus memenuhi persyaratan berikut:
- Masuk ke ruang kerja Azure Databricks.
- Mengaktifkan Unity Catalog untuk ruang kerja Anda.
- Mengaktifkan komputasi tanpa server untuk akun Anda. Alur Deklaratif Lakeflow Spark tanpa server tidak tersedia di semua wilayah ruang kerja. Lihat Fitur dengan ketersediaan regional terbatas untuk wilayah yang tersedia.
- Memiliki izin untuk membuat sumber daya komputasi atau akses ke sumber daya komputasi.
- Memiliki izin untuk membuat skema baru dalam katalog. Izin yang diperlukan adalah
USE CATALOGdanCREATE SCHEMA. - Memiliki izin untuk membuat volume baru dalam skema yang ada. Izin yang diperlukan adalah
USE SCHEMAdanCREATE VOLUME.
Tentang himpunan data
Himpunan data yang digunakan dalam contoh ini adalah subset dari Himpunan Data Jutaan Lagu, kumpulan fitur dan metadata untuk trek musik kontemporer. Himpunan data ini tersedia dalam himpunan data sampel yang disertakan di ruang kerja Azure Databricks Anda.
Langkah 1: Membuat alur
Pertama, buat alur dengan menentukan himpunan data dalam file (disebut kode sumber) menggunakan sintaks alur. Setiap file kode sumber hanya dapat berisi satu bahasa, tetapi Anda dapat menambahkan beberapa file khusus bahasa dalam alur. Untuk mempelajari selengkapnya, lihat Alur Deklaratif Lakeflow Spark
Tutorial ini menggunakan komputasi tanpa server dan Katalog Unity. Untuk semua opsi konfigurasi yang tidak ditentukan, gunakan pengaturan default. Jika komputasi tanpa server tidak diaktifkan atau didukung di ruang kerja, Anda dapat menyelesaikan tutorial seperti yang ditulis menggunakan pengaturan komputasi default.
Untuk membuat alur baru, ikuti langkah-langkah berikut:
- Di ruang kerja Anda, klik
Baru di bar samping, lalu pilih Alur ETL.
- Beri nama yang unik pada alur Anda.
- Tepat di bawah nama, pilih katalog dan skema default untuk data yang Anda hasilkan. Anda dapat menentukan tujuan lain dalam transformasi Anda, tetapi tutorial ini menggunakan default ini. Anda harus memiliki izin ke katalog dan skema yang Anda buat. Lihat Persyaratan.
- Untuk tutorial ini, pilih Mulai dengan file kosong.
- Di Jalur folder, tentukan lokasi untuk file sumber Anda, atau terima default (folder pengguna Anda).
- Pilih Python atau SQL sebagai bahasa untuk file sumber pertama Anda (alur dapat mencampur dan mencocokkan bahasa, tetapi setiap file harus dalam satu bahasa).
- Klik Pilih.
Editor pipeline muncul untuk pipeline baru. File sumber kosong untuk bahasa Anda dibuat, siap untuk transformasi pertama Anda.
Langkah 2: Kembangkan logika alur Anda
Dalam langkah ini, Anda akan menggunakan Editor Alur Lakeflow untuk mengembangkan dan memvalidasi kode sumber untuk alur secara interaktif.
Kode ini menggunakan Auto Loader untuk penyerapan data inkremental. Auto Loader secara otomatis mendeteksi dan memproses file baru saat tiba di penyimpanan objek cloud. Untuk mempelajari selengkapnya, lihat Apa itu Auto Loader?
File kode sumber kosong secara otomatis dibuat dan dikonfigurasi untuk pipeline. File dibuat di dalam folder transformasi alur Anda. Secara default, semua file *.py dan *.sql di folder transformasi adalah bagian dari sumber untuk alur Anda.
Salin dan tempel kode berikut ke dalam file sumber Anda. Pastikan untuk menggunakan bahasa yang Anda pilih untuk file di Langkah 1.
Phyton
# Import modules from pyspark import pipelines as dp from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dp.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path)) # Define a materialized view that validates data and renames a column @dp.materialized_view( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dp.expect("valid_artist_name", "artist_name IS NOT NULL") @dp.expect("valid_title", "song_title IS NOT NULL") @dp.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dp.materialized_view( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/part*', format => "csv", header => "false", delimiter => "\t", schema => """ artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING """, schemaEvolutionMode => "none"); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC;Sumber ini mencakup kode untuk tiga kueri. Anda juga dapat menempatkan kueri tersebut dalam file terpisah, untuk mengatur file dan kode seperti yang Anda inginkan.
Klik
Jalankan file atau Jalankan alur untuk memulai pembaruan untuk alur yang tersambung. Dengan hanya satu file sumber dalam alur Anda, ini setara secara fungsional.
Setelah pembaruan selesai, editor diperbarui dengan informasi tentang pipeline Anda.
- Grafik alur (DAG), di bar samping di sebelah kanan kode Anda, menunjukkan tiga tabel,
songs_raw,songs_prepared, dantop_artists_by_year. - Ringkasan pembaruan ditampilkan di bagian atas browser aset alur.
- Detail tabel yang dihasilkan diperlihatkan di panel bawah, dan Anda bisa menelusuri data dari tabel dengan memilihnya.
Ini termasuk data mentah dan dibersihkan, serta beberapa analisis sederhana untuk menemukan artis teratas menurut tahun. Pada langkah berikutnya, Anda membuat kueri ad-hoc untuk analisis lebih lanjut dalam file terpisah di alur Anda.
Langkah 3: Jelajahi himpunan data yang dibuat oleh alur Anda
Dalam langkah ini, Anda melakukan kueri ad-hoc pada data yang diproses di alur ETL untuk menganalisis data lagu di Editor SQL Databricks. Kueri ini menggunakan rekaman yang disiapkan yang dibuat pada langkah sebelumnya.
Pertama, jalankan kueri yang menemukan artis yang telah merilis lagu terbanyak setiap tahun sejak 1990.
Dari bilah sisi browser aset alur, klik
Tambahkan lalu Eksplorasi.
Masukkan Nama dan pilih SQL untuk file eksplorasi. Buku catatan SQL dibuat di folder baru
explorations. File dalamexplorationsfolder tidak dijalankan sebagai bagian dari pembaruan alur secara default. Buku catatan SQL memiliki sel yang bisa Anda jalankan bersama atau terpisah.Untuk membuat tabel artis yang merilis lagu terbanyak di setiap tahun setelah 1990, masukkan kode berikut dalam file SQL baru (jika ada kode sampel dalam file, ganti). Karena buku catatan ini bukan bagian dari alur, buku catatan ini tidak menggunakan katalog dan skema default. Ganti
<catalog>.<schema>dengan katalog dan skema yang Anda gunakan sebagai default untuk proses alur:-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC;Klik
atau tekan
Shift + Enteruntuk menjalankan kueri ini.
Sekarang, jalankan kueri lain yang menemukan lagu dengan ritme 4/4 dan tempo yang dapat didansa.
Tambahkan kode berikut ke sel berikutnya dalam file yang sama. Sekali lagi, ganti
<catalog>.<schema>dengan katalog dan skema yang Anda gunakan sebagai default untuk alur:-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;Klik
atau tekan
Shift + Enteruntuk menjalankan kueri ini.
Langkah 4: Buat tugas untuk menjalankan alur kerja
Selanjutnya, buat alur kerja untuk mengotomatiskan langkah-langkah penyerapan, pemrosesan, dan analisis data menggunakan pekerjaan Databricks yang berjalan sesuai jadwal.
- Di bagian atas editor, pilih tombol Jadwalkan .
- Jika dialog Jadwal muncul, pilih Tambahkan jadwal.
- Ini membuka dialog Jadwal baru , di mana Anda dapat membuat pekerjaan untuk menjalankan alur Anda sesuai jadwal.
- Secara opsional, beri nama pekerjaan.
- Secara default, jadwal diatur untuk berjalan sekali per hari. Anda dapat menerima defaut ini, atau mengatur jadwal Anda sendiri. Memilih Tingkat Lanjut memberi Anda opsi untuk mengatur waktu tertentu yang akan dijalankan pekerjaan. Memilih Opsi lainnya memungkinkan Anda membuat pemberitahuan saat pekerjaan berjalan.
- Pilih Buat untuk menerapkan perubahan dan membuat pekerjaan.
Sekarang pekerjaan akan berjalan setiap hari untuk menjaga alur Anda tetap terbarui. Anda dapat memilih Jadwalkan lagi untuk melihat daftar jadwal. Anda dapat mengelola jadwal untuk alur Anda dari dialog tersebut, termasuk menambahkan, mengedit, atau menghapus jadwal.
Mengklik nama jadwal (atau pekerjaan) akan membawa Anda ke halaman pekerjaan di daftar Pekerjaan & alur . Dari sana Anda dapat melihat detail tentang eksekusi pekerjaan, termasuk riwayat eksekusi, atau segera menjalankan pekerjaan dengan tombol Jalankan sekarang .
Lihat Pemantauan dan observabilitas untuk Pekerjaan Lakeflow untuk informasi selengkapnya tentang pelaksanaan pekerjaan.
Pelajari lebih lanjut
- Untuk mempelajari selengkapnya tentang alur pemrosesan data, lihat Alur Deklaratif Lakeflow Spark
- Untuk mempelajari selengkapnya tentang Buku Catatan Databricks, lihat Buku catatan Databricks.
- Untuk mempelajari selengkapnya tentang Pekerjaan Lakeflow, lihat Apa itu pekerjaan?
- Untuk mempelajari selengkapnya tentang Delta Lake, lihat Apa itu Delta Lake di Azure Databricks?