foreach (DataStreamWriter)

Ustawia dane wyjściowe zapytania przesyłania strumieniowego do przetworzenia przy użyciu dostarczonego składnika zapisywania. Logikę przetwarzania można określić jako funkcję, która przyjmuje wiersz jako dane wejściowe, lub jako obiekt z metodami i process(row) opcjonalnymi.open(partition_id, epoch_id)close(error)

Składnia

foreach(f)

Parametry

Parameter Typ Opis
f wywoływanie lub obiekt Funkcja, która przyjmuje wiersz jako dane wejściowe, lub obiekt z metodą i metodami process(row) opcjonalnymi openclose.

Zwroty

DataStreamWriter

Notatki

Podany obiekt musi być serializowalny. Wszelkie inicjowanie do zapisywania danych (na przykład otwarcie połączenia) powinno odbywać się wewnątrz open(), a nie w czasie budowy.

Examples

import time
df = spark.readStream.format("rate").load()

Przetwarzanie każdego wiersza przy użyciu funkcji:

def print_row(row):
    print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

Przetwarzaj każdy wiersz przy użyciu obiektu z metodami open, processi close :

class RowPrinter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True

    def process(self, row):
        print(row)

    def close(self, error):
        print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()