Tutorial: Menjalankan alur analitik lakehouse end-to-end
Tutorial ini menunjukkan kepada Anda cara menyiapkan alur analitik end-to-end untuk lakehouse Azure Databricks.
Penting
Tutorial ini menggunakan notebook interaktif untuk menyelesaikan tugas ETL umum di Python pada kluster yang diaktifkan Katalog Unity. Jika Anda tidak menggunakan Unity Catalog, lihat Menjalankan beban kerja ETL pertama Anda di Azure Databricks.
Tugas dalam tutorial ini
Pada akhir artikel ini, Anda akan merasa nyaman:
- Meluncurkan kluster komputasi yang diaktifkan Katalog Unity.
- Membuat buku catatan Databricks.
- Menulis dan membaca data dari lokasi eksternal Katalog Unity.
- Mengonfigurasi penyerapan data inkremental ke tabel Unity Catalog dengan Auto Loader.
- Menjalankan sel buku catatan untuk memproses, mengkueri, dan mempratinjau data.
- Menjadwalkan buku catatan sebagai pekerjaan Databricks.
- Mengkueri tabel Unity Catalog dari Databricks SQL
Azure Databricks menyediakan serangkaian alat siap produksi yang memungkinkan profesional data mengembangkan dan menyebarkan alur ekstrak, transformasi, dan pemuatan (ETL) dengan cepat. Katalog Unity memungkinkan pengurus data untuk mengonfigurasi dan mengamankan kredensial penyimpanan, lokasi eksternal, dan objek database untuk pengguna di seluruh organisasi. Databricks SQL memungkinkan analis untuk menjalankan kueri SQL terhadap tabel yang sama yang digunakan dalam beban kerja ETL produksi, memungkinkan kecerdasan bisnis real time dalam skala besar.
Anda juga dapat menggunakan Tabel Langsung Delta untuk membangun alur ETL. Databricks membuat Tabel Langsung Delta untuk mengurangi kompleksitas membangun, menyebarkan, dan memelihara alur ETL produksi. Lihat Tutorial: Menjalankan alur Tabel Langsung Delta pertama Anda.
Persyaratan
Catatan
Jika Anda tidak memiliki hak istimewa kontrol kluster, Anda masih dapat menyelesaikan sebagian besar langkah di bawah ini selama Anda memiliki akses ke kluster.
Langkah 1: Membuat kluster
Untuk melakukan analisis data eksploratif dan rekayasa data, buat kluster untuk menyediakan sumber daya komputasi yang diperlukan untuk menjalankan perintah.
- Klik Komputasi di bilah samping.
- Klik Baru di bar samping, lalu pilih Kluster. Ini membuka halaman Kluster/Komputasi Baru.
- Tentukan nama unik untuk kluster.
- Pilih tombol radio Simpul tunggal.
- Pilih Pengguna Tunggal dari menu dropdown Mode akses.
- Pastikan alamat email Anda terlihat di bidang Pengguna Tunggal.
- Pilih versi runtime Databricks yang diinginkan, 11.1 atau lebih tinggi untuk menggunakan Unity Catalog.
- Klik Buat komputasi untuk membuat kluster.
Untuk mempelajari selengkapnya tentang kluster Databricks, lihat Komputasi.
Langkah 2: Membuat buku catatan Databricks
Untuk membuat buku catatan di ruang kerja Anda, klik Baru di bilah samping, lalu klik Buku Catatan. Buku catatan kosong terbuka di ruang kerja.
Untuk mempelajari selengkapnya tentang membuat dan mengelola buku catatan, lihat Mengelola buku catatan.
Langkah 3: Menulis dan membaca data dari lokasi eksternal yang dikelola oleh Unity Catalog
Databricks merekomendasikan penggunaan Auto Loader untuk penyerapan data bertahap. Auto Loader secara otomatis mendeteksi dan memproses file baru saat tiba di penyimpanan objek cloud.
Gunakan Unity Catalog untuk mengelola akses aman ke lokasi eksternal. Pengguna atau perwakilan layanan dengan READ FILES
izin di lokasi eksternal dapat menggunakan Auto Loader untuk menyerap data.
Biasanya, data akan tiba di lokasi eksternal karena penulisan dari sistem lain. Dalam demo ini, Anda dapat mensimulasikan kedatangan data dengan menulis file JSON ke lokasi eksternal.
Salin kode di bawah ini ke dalam sel buku catatan. Ganti nilai string untuk catalog
dengan nama katalog dengan CREATE CATALOG
izin dan USE CATALOG
. Ganti nilai string untuk external_location
dengan jalur untuk lokasi eksternal dengan READ FILES
izin , WRITE FILES
, dan CREATE EXTERNAL TABLE
.
Lokasi eksternal dapat didefinisikan sebagai seluruh kontainer penyimpanan, tetapi sering menunjuk ke direktori yang disarangkan dalam kontainer.
Format yang benar untuk jalur lokasi eksternal adalah "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"
.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
Menjalankan sel ini harus mencetak baris yang bertuliskan 12 byte, mencetak string "Halo dunia!", dan menampilkan semua database yang ada dalam katalog yang disediakan. Jika Anda tidak dapat menjalankan sel ini, konfirmasikan bahwa Anda berada di ruang kerja yang diaktifkan Katalog Unity dan meminta izin yang tepat dari administrator ruang kerja Anda untuk menyelesaikan tutorial ini.
Kode Python di bawah ini menggunakan alamat email Anda untuk membuat database unik di katalog yang disediakan dan lokasi penyimpanan unik di lokasi eksternal yang disediakan. Menjalankan sel ini akan menghapus semua data yang terkait dengan tutorial ini, memungkinkan Anda untuk menjalankan contoh ini secara idempotensi. Kelas didefinisikan dan dibuat yang akan Anda gunakan untuk mensimulasikan batch data yang tiba dari sistem yang terhubung ke lokasi eksternal sumber Anda.
Salin kode ini ke sel baru di buku catatan Anda dan jalankan untuk mengonfigurasi lingkungan Anda.
Catatan
Variabel yang ditentukan dalam kode ini akan memungkinkan Anda untuk menjalankannya dengan aman tanpa risiko bertentangan dengan aset ruang kerja yang ada atau pengguna lain. Izin jaringan atau penyimpanan terbatas akan menimbulkan kesalahan saat menjalankan kode ini; hubungi administrator ruang kerja Anda untuk memecahkan masalah pembatasan ini.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
Anda sekarang dapat mendaratkan batch data dengan menyalin kode berikut ke dalam sel dan mengeksekusinya. Anda dapat menjalankan sel ini secara manual hingga 60 kali untuk memicu kedatangan data baru.
RawData.land_batch()
Langkah 4: Mengonfigurasi Auto Loader untuk menyerap data ke Katalog Unity
Databricks merekomendasikan penyimpanan data dengan Delta Lake. Delta Lake adalah lapisan penyimpanan sumber terbuka yang menyediakan transaksi ACID dan memungkinkan data lakehouse. Delta Lake adalah format default untuk tabel yang dibuat di Databricks.
Untuk mengonfigurasi Auto Loader untuk menyerap data ke tabel Katalog Unity, salin dan tempel kode berikut ke dalam sel kosong di buku catatan Anda:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Untuk mempelajari selengkapnya tentang Auto Loader, lihat Apa itu Auto Loader?.
Untuk mempelajari selengkapnya tentang Streaming Terstruktur dengan Katalog Unity, lihat Menggunakan Katalog Unity dengan Streaming Terstruktur.
Langkah 5: Memproses dan berinteraksi dengan data
Notebook menjalankan logika sel demi sel. Gunakan langkah-langkah ini untuk menjalankan logika di sel Anda:
Untuk menjalankan sel yang Anda selesaikan di langkah sebelumnya, pilih sel dan tekan SHIFT+ENTER.
Untuk mengkueri tabel yang baru saja Anda buat, salin dan tempel kode berikut ke dalam sel kosong, lalu tekan SHIFT+ENTER untuk menjalankan sel.
df = spark.read.table(table)
Untuk mempratinjau data di DataFrame Anda, salin dan tempel kode berikut ke dalam sel kosong, lalu tekan SHIFT+ENTER untuk menjalankan sel.
display(df)
Untuk mempelajari selengkapnya tentang opsi interaktif untuk memvisualisasikan data, lihat Visualisasi di buku catatan Databricks.
Langkah 6: Menjadwalkan pekerjaan
Anda dapat menjalankan buku catatan Databricks sebagai skrip produksi dengan menambahkannya sebagai tugas dalam pekerjaan Databricks. Dalam langkah ini, Anda akan membuat pekerjaan baru yang dapat Anda picu secara manual.
Untuk menjadwalkan buku catatan Anda sebagai tugas:
- Klik Jadwalkan di sisi kanan bilah header.
- Masukkan nama unik untuk Nama pekerjaan.
- Klik Manual.
- Di drop-down Kluster, pilih kluster yang Anda buat di langkah 1.
- Klik Buat.
- Di jendela yang muncul, klik Jalankan sekarang.
- Untuk melihat hasil eksekusi pekerjaan, klik ikon di samping tanda waktu eksekusi terakhir.
Untuk informasi selengkapnya tentang pekerjaan, lihat Apa itu Pekerjaan Databricks?.
Langkah 7: Tabel kueri dari Databricks SQL
Siapa pun yang memiliki USE CATALOG
izin pada katalog saat ini, USE SCHEMA
izin pada skema saat ini, dan SELECT
izin pada tabel dapat mengkueri konten tabel dari API Databricks pilihan mereka.
Anda memerlukan akses ke gudang SQL yang sedang berjalan untuk menjalankan kueri di Databricks SQL.
Tabel yang Anda buat sebelumnya dalam tutorial ini memiliki nama target_table
. Anda dapat mengkuerinya menggunakan katalog yang Anda berikan di sel pertama dan database dengan patern e2e_lakehouse_<your-username>
. Anda bisa menggunakan Catalog Explorer untuk menemukan objek data yang Anda buat.
Integrasi Tambahan
Pelajari selengkapnya tentang integrasi dan alat untuk rekayasa data dengan Azure Databricks: