Oharra
Baimena behar duzu orria atzitzeko. Direktorioetan saioa has dezakezu edo haiek alda ditzakezu.
Baimena behar duzu orria atzitzeko. Direktorioak alda ditzakezu.
Establece la salida de la consulta de streaming que se va a procesar mediante el sistema de escritura proporcionado. La lógica de procesamiento se puede especificar como una función que toma una fila como entrada, o como un objeto con process(row) y métodos y open(partition_id, epoch_id) opcionalesclose(error).
Sintaxis
foreach(f)
Parámetros
| Parámetro | Tipo | Descripción |
|---|---|---|
f |
callable o object | Función que toma una fila como entrada o un objeto con un process(row) método y métodos opcionales y openclose . |
Devoluciones
DataStreamWriter
Notas
El objeto proporcionado debe ser serializable. Cualquier inicialización para escribir datos (por ejemplo, abrir una conexión) debe realizarse dentro open()de , no en tiempo de construcción.
Ejemplos
import time
df = spark.readStream.format("rate").load()
Procese cada fila mediante una función:
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
Procese cada fila mediante un objeto con openlos métodos , processy close :
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()