Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Anda dapat menggunakan Azure Databricks untuk mengkueri sumber data streaming menggunakan Streaming Terstruktur. Azure Databricks menyediakan dukungan ekstensif untuk beban kerja streaming di Python dan Scala, dan mendukung sebagian besar fungsionalitas Streaming Terstruktur dengan SQL.
Contoh berikut menunjukkan penggunaan sink memori untuk inspeksi manual data streaming selama pengembangan interaktif di notebook. Karena batas jumlah baris di antarmuka pengguna notebook, Anda mungkin tidak mengamati semua data yang dibaca oleh kueri streaming. Dalam beban kerja produksi, Anda hanya boleh memicu kueri streaming dengan menulisnya ke tabel target atau sistem eksternal.
Catatan
Dukungan SQL untuk kueri interaktif pada data streaming terbatas pada notebook yang berjalan pada komputasi serba guna. Anda juga dapat menggunakan SQL saat mendeklarasikan tabel streaming di Databricks SQL atau Lakeflow Spark Declarative Pipelines. Lihat Tabel streaming dan Alur Deklaratif Lakeflow Spark.
Mengkueri data dari sistem streaming
Azure Databricks menyediakan pembaca data streaming untuk sistem streaming berikut:
- Kafka
- Kinetika
- PubSub
- Pulsar
Anda harus memberikan detail konfigurasi saat menginisialisasi kueri pada sistem ini, yang bervariasi tergantung pada lingkungan yang telah Anda konfigurasi dan sistem yang Anda pilih untuk dibaca. Lihat Konektor standar di Lakeflow Connect.
Beban kerja umum yang melibatkan sistem streaming mencakup pengambilan data ke lakehouse dan pemrosesan aliran untuk menyalurkan data ke sistem eksternal. Untuk informasi selengkapnya tentang beban kerja streaming, lihat Konsep Streaming Terstruktur.
Contoh berikut menunjukkan bacaan streaming interaktif dari Kafka:
Phyton
display(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'latest'
);
Menjalankan kueri pada tabel sebagai pembacaan streaming
Azure Databricks membuat semua tabel menggunakan Delta Lake secara default. Saat Anda melakukan kueri streaming terhadap tabel Delta, kueri secara otomatis mengambil rekaman baru ketika versi tabel dikomit. Secara default, kueri streaming mengharapkan tabel sumber hanya berisi rekaman yang ditambahkan. Jika Anda perlu bekerja dengan data streaming yang berisi pembaruan dan penghapusan, Databricks merekomendasikan penggunaan Alur Deklaratif Lakeflow Spark dan AUTO CDC ... INTO. Lihat API CDC OTOMATIS: Menyederhanakan perubahan pengambilan data dengan alur.
Contoh berikut menunjukkan melakukan pembacaan streaming interaktif dari tabel:
Phyton
display(spark.readStream.table("table_name"))
SQL
SELECT * FROM STREAM table_name
Mengkueri data dalam penyimpanan objek cloud dengan Auto Loader
Anda dapat melakukan streaming data dari penyimpanan objek cloud menggunakan Auto Loader, konektor data cloud Azure Databricks. Anda dapat menggunakan konektor dengan file yang disimpan dalam volume Katalog Unity atau lokasi penyimpanan objek cloud lainnya. Databricks merekomendasikan penggunaan volume untuk mengelola akses ke data di penyimpanan objek cloud. Lihat Menyambungkan ke sumber data dan layanan eksternal.
Azure Databricks mengoptimalkan konektor ini untuk penyerapan streaming data dalam penyimpanan objek cloud yang disimpan dalam format terstruktur, semi terstruktur, dan tidak terstruktur yang populer. Databricks merekomendasikan penyimpanan data yang diserap dalam format yang hampir mentah untuk memaksimalkan throughput dan meminimalkan potensi kehilangan data karena rekaman atau perubahan skema yang rusak.
Untuk rekomendasi selengkapnya tentang menyerap data dari penyimpanan objek cloud, lihat Konektor standar di Lakeflow Connect.
Contoh berikut menunjukkan streaming interaktif yang dibaca dari direktori file JSON dalam volume:
Phyton
display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SQL
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')