Menggunakan Delta Lake untuk streaming data

Selesai

Semua data yang telah kami jelajahi hingga saat ini telah menjadi data statis dalam file. Namun, banyak skenario analitik data melibatkan data streaming yang harus diproses mendekati real time. Misalnya, Anda mungkin perlu mengambil pembacaan yang dipancarkan oleh perangkat internet-of-things (IoT) dan menyimpannya dalam tabel saat terjadi.

Spark Structured Streaming

Solusi pemrosesan aliran biasa melibatkan pembacaan aliran data secara terus-menerus dari sumber, secara opsional memprosesnya untuk memilih bidang tertentu, nilai agregat dan grup, atau memanipulasi data, dan menulis hasilnya ke sink.

Spark menyertakan dukungan asli untuk streaming data melalui Streaming Terstruktur Spark, sebuah API yang didasarkan pada kerangka data tanpa batas tempat data streaming ditangkap untuk diproses. Kerangka data Streaming Terstruktur Spark dapat membaca data dari berbagai jenis sumber streaming, termasuk port jaringan, layanan perantara pesan waktu nyata seperti Azure Event Hubs atau Kafka, atau lokasi sistem file.

Tip

Untuk informasi selengkapnya tentang Streaming Terstruktur Spark, lihat panduan pemrograman Streaming Terstruktur Spark.

Streaming dengan tabel Delta Lake

Anda dapat menggunakan tabel Delta Lake sebagai sumber atau sink untuk Streaming Terstruktur Spark. Misalnya, Anda bisa menangkap aliran data waktu nyata dari perangkat IoT dan menulis aliran langsung ke tabel Delta Lake sebagai sink - memungkinkan Anda untuk membuat kueri tabel untuk melihat data streaming terbaru. Atau, Anda dapat membaca Tabel Delta sebagai sumber streaming, memungkinkan Anda untuk terus melaporkan data baru saat ditambahkan ke tabel.

Menggunakan tabel Delta Lake sebagai sumber streaming

Dalam contoh PySpark berikut, tabel Delta Lake digunakan untuk menyimpan detail pesanan penjualan Internet. Aliran dibuat yang membaca data dari folder tabel Delta Lake saat data baru ditambahkan.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .load("/delta/internetorders")

# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.show()

Catatan

Saat menggunakan tabel Delta Lake sebagai sumber streaming, hanya operasi tambahkan yang dapat disertakan dalam aliran. Modifikasi data akan menyebabkan kesalahan kecuali Anda menentukan opsi ignoreChanges atau ignoreDeletes.

Setelah membaca data dari tabel Delta Lake ke dalam kerangka data streaming, Anda dapat menggunakan API Streaming Terstruktur Spark untuk memprosesnya. Dalam contoh di atas, kerangka data hanya ditampilkan; tetapi Anda dapat menggunakan Streaming Terstruktur Spark untuk menggabungkan data melalui jendela temporal (misalnya untuk menghitung jumlah pesanan yang dilakukan setiap menit) dan mengirimkan hasil agregat ke proses hilir untuk visualisasi hampir real-time.

Menggunakan tabel Delta Lake sebagai sink streaming

Dalam contoh PySpark berikut, aliran data dibaca dari file JSON dalam folder. Data JSON di setiap file berisi status untuk perangkat IoT dalam format Data baru {"device":"Dev1","status":"ok"} ditambahkan ke aliran setiap kali file ditambahkan ke folder. Aliran input adalah kerangka data tanpa batas, yang kemudian ditulis dalam format delta ke lokasi folder untuk tabel Delta Lake.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
streamFolder = '/streamingdata/'
jsonSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)

Catatan

Opsi checkpointLocation ini digunakan untuk menulis file titik pemeriksaan yang melacak status pemrosesan aliran. File ini memungkinkan Anda memulihkan dari kegagalan pada titik di mana pemrosesan streaming ditinggalkan.

Setelah proses streaming dimulai, Anda dapat mengkueri tabel Delta Lake tempat output streaming ditulis untuk melihat data terbaru. Misalnya, kode berikut membuat tabel katalog untuk folder tabel Delta Lake dan mengkuerinya:

%sql

CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';

SELECT device, status
FROM DeviceTable;

Untuk menghentikan aliran data yang ditulis ke tabel Delta Lake, Anda bisa menggunakan metode stop kueri streaming:

delta_stream.stop()

Tip

Untuk informasi selengkapnya tentang menggunakan tabel Delta Lake untuk streaming data, lihat Streaming tabel membaca dan menulis dalam dokumentasi Delta Lake.