Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Mengatur output kueri streaming yang akan diproses menggunakan penulis yang disediakan. Logika pemrosesan dapat ditentukan sebagai fungsi yang mengambil baris sebagai input, atau sebagai objek dengan process(row) dan opsional open(partition_id, epoch_id) dan close(error) metode.
Sintaksis
foreach(f)
Parameter-parameternya
| Parameter | Tipe | Deskripsi |
|---|---|---|
f |
dapat dipanggil atau objek | Fungsi yang mengambil Baris sebagai input, atau objek dengan process(row) metode dan opsional open dan close metode. |
Pengembalian Barang
DataStreamWriter
Catatan
Objek yang disediakan harus dapat diserialisasikan. Inisialisasi apa pun untuk menulis data (misalnya, membuka koneksi) harus dilakukan di dalam open(), bukan pada waktu konstruksi.
Examples
import time
df = spark.readStream.format("rate").load()
Memproses setiap baris menggunakan fungsi:
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
Proses setiap baris menggunakan objek dengan openmetode , process, dan close :
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()