foreach(DataStreamWriter)

제공된 기록기를 사용하여 처리할 스트리밍 쿼리의 출력을 설정합니다. 처리 논리는 행을 입력으로 사용하는 함수 또는 선택 사항 process(row)open(partition_id, epoch_id) 메서드가 있는 close(error) 개체로 지정할 수 있습니다.

문법

foreach(f)

매개 변수

매개 변수 유형 설명
f callable 또는 object Row를 입력으로 사용하는 함수이거나 메서드와 선택 사항 process(row)open 및 메서드가 있는 close 개체입니다.

Returns

DataStreamWriter

Notes

제공된 개체는 직렬화할 수 있어야 합니다. 데이터 쓰기(예: 연결 열기)에 대한 초기화는 생성 시가 아니라 내부에서 open()수행해야 합니다.

예제

import time
df = spark.readStream.format("rate").load()

함수를 사용하여 각 행 처리:

def print_row(row):
    print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

및 메서드를 사용하여 개체openprocess를 사용하여 각 행을 close 처리합니다.

class RowPrinter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True

    def process(self, row):
        print(row)

    def close(self, error):
        print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()