이 문서에서는 구조적 스트리밍을 사용하여 foreachBatch 기존 스트리밍 싱크가 없는 데이터 원본에 스트리밍 쿼리의 출력을 쓰는 방법을 설명합니다.
코드 패턴을 streamingDF.writeStream.foreachBatch(...) 사용하면 스트리밍 쿼리의 모든 마이크로 일괄 처리의 출력 데이터에 일괄 처리 함수를 적용할 수 있습니다.
foreachBatch 사용되는 함수는 다음 두 매개 변수를 사용합니다.
- 마이크로 일괄 처리의 출력 데이터가 있는 DataFrame입니다.
- 마이크로 일괄 처리의 고유 ID입니다.
구조적 스트리밍에서 Delta Lake 병합 작업에 사용해야 foreachBatch 합니다.
를 사용하여 foreachBatch스트리밍 쿼리에서 Upsert를 참조하세요.
추가 DataFrame 작업 적용
Spark는 이러한 경우 증분 계획 생성을 지원하지 않으므로 많은 DataFrame 및 데이터 세트 작업이 스트리밍 DataFrame에서 지원되지 않습니다.
foreachBatch()를 사용하면 각 마이크로 일괄 처리 출력에 이러한 작업 중 일부를 적용할 수 있습니다. 예를 들어 foreachBatch() 및 SQL MERGE INTO 작업을 사용하여 스트리밍 집계의 출력을 업데이트 모드에서 델타 테이블에 쓸 수 있습니다. 자세한 내용은 MERGE INTO에서 확인하세요.
중요합니다
-
foreachBatch()는 한 번 이상의 쓰기 보장만 제공합니다. 그러나 함수에 제공된batchId를 사용하여 출력을 중복 제거하고 단 한 번만 실행되도록 보장받을 수 있습니다. 두 경우 모두 엔드투엔드 의미 체계에 대해 직접 추론해야 합니다. -
foreachBatch()는 기본적으로 스트리밍 쿼리의 마이크로 일괄 처리 실행에 의존하기 때문에 연속 처리 모드에서 작동하지 않습니다. 연속 모드로 데이터를 작성하는 경우 대신foreach()를 사용합니다. - 상태 저장 연산자를 사용하는
foreachBatch경우 처리가 완료되기 전에 각 일괄 처리를 완전히 사용하는 것이 중요합니다. 각 일괄 처리 데이터 프레임을 완전히 소비하는 방법 보기
foreachBatch()를 사용하여 빈 데이터 프레임을 호출할 수 있으며 사용자 코드는 적절한 작업을 허용하기 위해 복원력이 있어야 합니다. 예는 다음과 같습니다.
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Databricks Runtime 14.0의 동작 변경 내용 foreachBatch
표준 액세스 모드로 구성된 컴퓨팅의 Databricks Runtime 14.0 이상에서는 다음과 같은 동작 변경이 적용됩니다.
-
print()명령은 드라이버 로그에 출력을 씁니다. - 함수 내의
dbutils.widgets하위 코드에 액세스할 수 없습니다. - 함수에서 참조되는 모든 파일, 모듈 또는 개체는 직렬화 가능하며 Spark에서 사용할 수 있어야 합니다.
기존 일괄 처리 데이터 원본 다시 사용
를 사용하면 foreachBatch()구조적 스트리밍이 지원되지 않을 수 있는 데이터 싱크에 기존 일괄 처리 데이터 기록기를 사용할 수 있습니다. 다음은 몇 가지 예입니다.
다른 많은 일괄 처리 데이터 원본은 .에서 foreachBatch()사용할 수 있습니다.
데이터 원본 및 외부 서비스에 대한 연결을 참조하세요.
여러 위치에 쓰기
스트리밍 쿼리의 출력을 여러 위치에 작성해야 하는 경우 Databricks는 최상의 병렬화 및 처리량을 위해 여러 구조적 스트리밍 기록기를 사용하는 것이 좋습니다.
여러 싱크에 쓰기를 사용하면 foreachBatch 스트리밍 쓰기의 실행이 직렬화되어 각 마이크로 일괄 처리에 대한 대기 시간이 증가할 수 있습니다.
여러 델타 테이블에 쓰는 데 사용하는 foreachBatch 경우 Idempotent 테이블 쓰기를 foreachBatch참조하세요.
각 배치 데이터 프레임을 완전히 소비합니다.
상태 저장 연산자(예: dropDuplicatesWithinWatermark 사용)를 사용하는 경우 각 배치 반복에서 전체 DataFrame을 소비하거나 쿼리를 재시작해야 합니다. 전체 DataFrame을 사용하지 않으면 다음 일괄 처리로 스트리밍 쿼리가 실패합니다.
여러 상황에서 발생할 수 있습니다. 다음 예제에서는 DataFrame을 올바르게 사용하지 않는 쿼리를 수정하는 방법을 보여 줍니다.
의도적으로 일괄 처리의 하위 집합 사용
일괄 처리의 하위 집합에만 관심이 있는 경우 다음과 같은 코드를 사용할 수 있습니다.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
이 경우, batch_df.show(2)는 일괄 처리에서 처음 두 항목만을 다룹니다. 이는 예상된 일이지만, 더 많은 항목이 있는 경우, 해당 항목들도 반드시 소비되어야 합니다. 다음 코드는 전체 DataFrame을 사용합니다.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
여기서 함수는 do_nothing 나머지 DataFrame을 자동으로 무시합니다.
일괄 처리에서 오류 처리
프로세스를 실행하는 동안 오류가 발생할 수 있습니다 foreachBatch . 다음과 같은 코드를 사용할 수 있습니다. 여기서는 문제가 무엇인지 표시하기 위해 샘플이 의도적으로 오류를 발생시킵니다.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
오류를 조용히 처리하면 나머지 일괄 처리가 처리되지 않을 수 있습니다. 이 상황을 처리하기 위한 두 가지 옵션이 있습니다.
먼저, 동일한 오류를 다시 발생시켜 이를 오케스트레이션 계층으로 전달하여 일괄 처리를 재시도할 수 있습니다. 이 문제가 일시적이라면 오류를 해결할 수 있고, 그렇지 않다면 운영 팀에게 문제를 제기하여 수동으로 수정하도록 할 수 있습니다. 이렇게 하려면 코드를 다음과 같이 partial_func 변경합니다.
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
두 번째 옵션은 예외를 catch하고 나머지 일괄 처리를 무시하려는 경우 코드를 이 옵션으로 변경하는 것입니다.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
이 코드는 함수를 do_nothing 사용하여 나머지 일괄 처리를 자동으로 무시합니다.