foreach (DataStreamWriter)

Mengatur output kueri streaming yang akan diproses menggunakan penulis yang disediakan. Logika pemrosesan dapat ditentukan sebagai fungsi yang mengambil baris sebagai input, atau sebagai objek dengan process(row) dan opsional open(partition_id, epoch_id) dan close(error) metode.

Sintaksis

foreach(f)

Parameter-parameternya

Parameter Tipe Deskripsi
f dapat dipanggil atau objek Fungsi yang mengambil Baris sebagai input, atau objek dengan process(row) metode dan opsional open dan close metode.

Pengembalian Barang

DataStreamWriter

Catatan

Objek yang disediakan harus dapat diserialisasikan. Inisialisasi apa pun untuk menulis data (misalnya, membuka koneksi) harus dilakukan di dalam open(), bukan pada waktu konstruksi.

Examples

import time
df = spark.readStream.format("rate").load()

Memproses setiap baris menggunakan fungsi:

def print_row(row):
    print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

Proses setiap baris menggunakan objek dengan openmetode , process, dan close :

class RowPrinter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True

    def process(self, row):
        print(row)

    def close(self, error):
        print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()