Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ustawia dane wyjściowe zapytania przesyłania strumieniowego do przetworzenia przy użyciu dostarczonego składnika zapisywania. Logikę przetwarzania można określić jako funkcję, która przyjmuje wiersz jako dane wejściowe, lub jako obiekt z metodami i process(row) opcjonalnymi.open(partition_id, epoch_id)close(error)
Składnia
foreach(f)
Parametry
| Parameter | Typ | Opis |
|---|---|---|
f |
wywoływanie lub obiekt | Funkcja, która przyjmuje wiersz jako dane wejściowe, lub obiekt z metodą i metodami process(row) opcjonalnymi openclose. |
Zwroty
DataStreamWriter
Notatki
Podany obiekt musi być serializowalny. Wszelkie inicjowanie do zapisywania danych (na przykład otwarcie połączenia) powinno odbywać się wewnątrz open(), a nie w czasie budowy.
Examples
import time
df = spark.readStream.format("rate").load()
Przetwarzanie każdego wiersza przy użyciu funkcji:
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
Przetwarzaj każdy wiersz przy użyciu obiektu z metodami open, processi close :
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()