Bagikan melalui


Jalankan beban kerja Streaming Terstruktur pertama Anda

Artikel ini menyediakan contoh kode dan penjelasan konsep dasar yang diperlukan untuk menjalankan kueri Streaming Terstruktur pertama Anda di Azure Databricks. Anda dapat menggunakan Streaming Terstruktur untuk beban kerja pemrosesan mendekati real-time dan inkremental.

Streaming Terstruktur adalah salah satu dari beberapa teknologi yang mendukung tabel streaming di Tabel Langsung Delta. Databricks merekomendasikan penggunaan Tabel Langsung Delta untuk semua beban kerja ETL, penyerapan, dan Streaming Terstruktur baru. Lihat Apa itu Tabel Langsung Delta?.

Catatan

Meskipun Tabel Langsung Delta menyediakan sintaksis yang sedikit dimodifikasi untuk mendeklarasikan tabel streaming, sintaks umum untuk mengonfigurasi bacaan dan transformasi streaming berlaku untuk semua kasus penggunaan streaming di Azure Databricks. Tabel Langsung Delta juga menyederhanakan streaming dengan mengelola informasi status, metadata, dan banyak konfigurasi.

Membaca dari aliran data

Anda dapat menggunakan Streaming Terstruktur untuk menyerap data secara bertahap dari sumber data yang didukung. Beberapa sumber data paling umum yang digunakan dalam beban kerja Streaming Terstruktur Azure Databricks meliputi yang berikut ini:

  • File data di penyimpanan objek cloud
  • Bus dan antrean pesan
  • Delta Lake

Databricks merekomendasikan penggunaan Auto Loader untuk penyerapan streaming dari penyimpanan objek cloud. Auto Loader mendukung sebagian besar format file yang didukung oleh Streaming Terstruktur. Lihat Apa itu Pemuat Otomatis?.

Setiap sumber data menyediakan sejumlah opsi untuk menentukan cara memuat batch data. Selama konfigurasi pembaca, opsi utama yang mungkin perlu Anda atur termasuk dalam kategori berikut:

  • Opsi yang menentukan sumber data atau format (misalnya, jenis file, pemisah, dan skema).
  • Opsi yang mengonfigurasi akses ke sistem sumber (misalnya, pengaturan port dan kredensial).
  • Opsi yang menentukan tempat memulai dalam aliran (misalnya, offset Kafka atau membaca semua file yang ada).
  • Opsi yang mengontrol berapa banyak data yang diproses di setiap batch (misalnya, offset maks, file, atau byte per batch).

Menggunakan Auto Loader untuk membaca data streaming dari penyimpanan objek

Contoh berikut menunjukkan pemuatan data JSON dengan Auto Loader, yang digunakan cloudFiles untuk menunjukkan format dan opsi. Opsi ini schemaLocation memungkinkan inferensi dan evolusi skema. Tempelkan kode berikut di sel buku catatan Databricks dan jalankan sel untuk membuat DataFrame streaming bernama raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Seperti operasi baca lainnya di Azure Databricks, mengonfigurasi bacaan streaming tidak benar-benar memuat data. Anda harus memicu tindakan pada data sebelum aliran dimulai.

Catatan

Memanggil display() di DataFrame streaming memulai pekerjaan streaming. Untuk sebagian besar kasus penggunaan Streaming Terstruktur, tindakan yang memicu aliran harus menulis data ke sink. Lihat Menyiapkan kode Streaming Terstruktur Anda untuk produksi.

Melakukan transformasi streaming

Streaming Terstruktur mendukung sebagian besar transformasi yang tersedia di Azure Databricks dan Spark SQL. Anda bahkan dapat memuat model MLflow sebagai UDF dan membuat prediksi streaming sebagai transformasi.

Contoh kode berikut menyelesaikan transformasi sederhana untuk memperkaya data JSON yang diserap dengan informasi tambahan menggunakan fungsi Spark SQL:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

Yang dihasilkan transformed_df berisi instruksi kueri untuk memuat dan mengubah setiap rekaman saat tiba di sumber data.

Catatan

Streaming Terstruktur memperlakukan sumber data sebagai himpunan data yang tidak terbatas atau tak terbatas. Dengan demikian, beberapa transformasi tidak didukung dalam beban kerja Streaming Terstruktur karena akan memerlukan pengurutan jumlah item yang tak terbatas.

Sebagian besar agregasi dan banyak gabungan memerlukan pengelolaan informasi status dengan marka air, jendela, dan mode output. Lihat Menerapkan marka air untuk mengontrol ambang batas pemrosesan data.

Menulis ke sink data

Sink data adalah target operasi penulisan streaming. Sink umum yang digunakan dalam beban kerja streaming Azure Databricks meliputi yang berikut ini:

  • Delta Lake
  • Bus dan antrean pesan
  • Database kunci-nilai

Seperti sumber data, sebagian besar sink data menyediakan sejumlah opsi untuk mengontrol bagaimana data ditulis ke sistem target. Selama konfigurasi penulis, opsi utama yang mungkin perlu Anda atur termasuk dalam kategori berikut:

  • Mode output (tambahkan secara default).
  • Lokasi titik pemeriksaan (diperlukan untuk setiap penulis).
  • Interval pemicu; lihat Mengonfigurasi interval pemicu Streaming Terstruktur.
  • Opsi yang menentukan sink atau format data (misalnya, jenis file, pemisah, dan skema).
  • Opsi yang mengonfigurasi akses ke sistem target (misalnya, pengaturan port dan kredensial).

Melakukan penulisan batch bertahap ke Delta Lake

Contoh berikut menulis ke Delta Lake menggunakan jalur file dan titik pemeriksaan tertentu.

Penting

Selalu pastikan Anda menentukan lokasi titik pemeriksaan unik untuk setiap penulis streaming yang Anda konfigurasi. Titik pemeriksaan menyediakan identitas unik untuk streaming Anda, melacak semua rekaman yang diproses dan informasi status yang terkait dengan kueri streaming Anda.

Pengaturan availableNow untuk pemicu menginstruksikan Streaming Terstruktur untuk memproses semua rekaman yang sebelumnya tidak diproses dari himpunan data sumber lalu dimatikan, sehingga Anda dapat dengan aman menjalankan kode berikut tanpa khawatir meninggalkan aliran yang berjalan:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

Dalam contoh ini, tidak ada catatan baru yang tiba di sumber data kami, jadi eksekusi ulangi kode ini tidak menyerap rekaman baru.

Peringatan

Eksekusi Streaming Terstruktur dapat mencegah penghentian otomatis mematikan sumber daya komputasi. Untuk menghindari biaya tak terduga, pastikan untuk menghentikan kueri streaming.

Menyiapkan kode Streaming Terstruktur Anda untuk produksi

Databricks merekomendasikan penggunaan Tabel Langsung Delta untuk sebagian besar beban kerja Streaming Terstruktur. Rekomendasi berikut memberikan titik awal untuk menyiapkan beban kerja Streaming Terstruktur untuk produksi:

  • Hapus kode yang tidak perlu dari notebook yang akan mengembalikan hasil, seperti display dan count.
  • Jangan menjalankan beban kerja Streaming Terstruktur pada kluster interaktif; selalu jadwalkan streaming sebagai pekerjaan.
  • Untuk membantu pekerjaan streaming pulih secara otomatis, konfigurasikan pekerjaan dengan percobaan ulang tak terbatas.
  • Jangan gunakan penskalaan otomatis untuk beban kerja dengan Streaming Terstruktur.

Untuk rekomendasi selengkapnya, lihat Pertimbangan produksi untuk Streaming Terstruktur.

Membaca data dari Delta Lake, mengubah, dan menulis ke Delta Lake

Delta Lake memiliki dukungan luas untuk bekerja dengan Streaming Terstruktur sebagai sumber dan sink. Lihat Pembacaan dan penulisan streaming tabel Delta.

Contoh berikut menunjukkan sintaks contoh untuk memuat semua rekaman baru secara bertahap dari tabel Delta, menggabungkannya dengan rekam jepret tabel Delta lain, dan menulisnya ke tabel Delta:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Anda harus memiliki izin yang tepat yang dikonfigurasi untuk membaca tabel sumber dan menulis ke tabel target dan lokasi titik pemeriksaan yang ditentukan. Isi semua parameter yang ditandai dengan tanda kurung sudut (<>) menggunakan nilai yang relevan untuk sumber data dan sink Anda.

Catatan

Tabel Langsung Delta menyediakan sintaksis deklaratif penuh untuk membuat alur Delta Lake dan mengelola properti seperti pemicu dan titik pemeriksaan secara otomatis. Lihat Apa itu Tabel Langsung Delta?.

Membaca data dari Kafka, mengubah, dan menulis ke Kafka

Apache Kafka dan bus olahpesan lainnya menyediakan beberapa latensi terendah yang tersedia untuk himpunan data besar. Anda dapat menggunakan Azure Databricks untuk menerapkan transformasi ke data yang diserap dari Kafka lalu menulis data kembali ke Kafka.

Catatan

Menulis data ke penyimpanan objek cloud menambahkan overhead latensi tambahan. Jika Anda ingin menyimpan data dari bus olahpesan di Delta Lake tetapi memerlukan latensi serendah mungkin untuk beban kerja streaming, Databricks merekomendasikan untuk mengonfigurasi pekerjaan streaming terpisah untuk menyerap data ke lakehouse dan menerapkan transformasi mendekati real-time untuk sink bus olahpesan hilir.

Contoh kode berikut menunjukkan pola sederhana untuk memperkaya data dari Kafka dengan menggabungkannya dengan data dalam tabel Delta lalu menulis kembali ke Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Anda harus memiliki izin yang tepat yang dikonfigurasi untuk akses ke layanan Kafka Anda. Isi semua parameter yang ditandai dengan tanda kurung sudut (<>) menggunakan nilai yang relevan untuk sumber data dan sink Anda. Lihat Pemrosesan aliran dengan Apache Kafka dan Azure Databricks.