Share via


foreachBatch를 사용하여 임의 데이터 싱크에 쓰기

이 문서에서는 구조적 스트리밍을 사용하여 foreachBatch 기존 스트리밍 싱크가 없는 데이터 원본에 스트리밍 쿼리의 출력을 쓰는 방법을 설명합니다.

코드 패턴을 streamingDF.writeStream.foreachBatch(...) 사용하면 스트리밍 쿼리의 모든 마이크로 일괄 처리의 출력 데이터에 일괄 처리 함수를 적용할 수 있습니다. 다음 두 매개 변수를 foreachBatch 사용하는 데 사용되는 함수:

  • 마이크로 일괄 처리의 출력 데이터가 있는 DataFrame입니다.
  • 마이크로 일괄 처리의 고유 ID입니다.

구조적 스트리밍에서 Delta Lake 병합 작업에 사용해야 foreachBatch 합니다. foreachBatch를 사용하여 스트리밍 쿼리에서 Upsert를 참조하세요.

추가 DataFrame 작업 적용

Spark는 이러한 경우 증분 계획 생성을 지원하지 않으므로 많은 DataFrame 및 데이터 세트 작업이 스트리밍 DataFrame에서 지원되지 않습니다. foreachBatch()를 사용하면 각 마이크로 일괄 처리 출력에 이러한 작업 중 일부를 적용할 수 있습니다. 예를 들어 SQL MERGE INTO 작업을 사용하여 foreachBath() 업데이트 모드에서 스트리밍 집계의 출력을 델타 테이블에 쓸 수 있습니다. MERGE INTO에서 자세한 내용을 참조하세요.

Important

  • foreachBatch()는 한 번 이상의 쓰기 보장만 제공합니다. 그러나 출력을 중복 제거하고 정확히 한 번 보장을 받는 방법으로 함수에 제공된 batchId를 사용할 수 있습니다. 두 경우 모두 엔드투엔드 의미 체계에 대해 직접 추론해야 합니다.
  • foreachBatch()는 기본적으로 스트리밍 쿼리의 마이크로 일괄 처리 실행에 의존하기 때문에 연속 처리 모드에서 작동하지 않습니다. 연속 모드로 데이터를 작성하는 경우 대신 foreach()를 사용합니다.

foreachBatch()를 사용하여 빈 데이터 프레임을 호출할 수 있으며 사용자 코드는 적절한 작업을 허용하기 위해 복원력이 있어야 합니다. 예는 다음과 같습니다.

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Databricks Runtime 14.0의 동작 변경 내용 foreachBatch

공유 액세스 모드 forEachBatch 로 구성된 컴퓨팅의 Databricks Runtime 14.0 이상에서는 REPL 환경이 아닌 Apache Spark에서 별도의 격리된 Python 프로세스에서 실행됩니다. 직렬화되고 Spark로 푸시되며 세션 기간 동안 전역 spark 개체에 액세스할 수 없습니다.

다른 모든 컴퓨팅 구성 foreachBatch 에서는 코드의 나머지 부분을 실행하는 동일한 Python REPL에서 실행됩니다. 결과적으로 함수는 serialize되지 않습니다.

공유 액세스 모드로 구성된 컴퓨팅에서 Databricks Runtime 14.0 이상을 사용하는 경우 다음 코드 예제와 같이 Python에서 사용할 foreachBatch 때 로컬 DataFrame으로 범위가 지정된 변수를 사용해야 sparkSession 합니다.

def example_function(df, batch_id):
  df.sparkSession.sql("<query>")

다음 동작 변경 내용이 적용되었습니다.

  • 함수 내에서는 전역 Python 변수에 액세스할 수 없습니다.
  • print() 명령은 드라이버 로그에 출력을 씁니다.
  • 함수에서 참조되는 모든 파일, 모듈 또는 개체는 직렬화 가능하고 Spark에서 사용할 수 있어야 합니다.

기존 일괄 처리 데이터 원본 다시 사용

를 사용하면 foreachBatch()구조적 스트리밍이 지원되지 않을 수 있는 데이터 싱크에 기존 일괄 처리 데이터 기록기를 사용할 수 있습니다. 다음은 몇 가지 예입니다.

다른 많은 일괄 처리 데이터 원본은 .에서 foreachBatch()사용할 수 있습니다. 데이터 원본에 대한 커넥트 참조하세요.

여러 위치에 쓰기

스트리밍 쿼리의 출력을 여러 위치에 작성해야 하는 경우 Databricks는 최상의 병렬화 및 처리량을 위해 여러 구조적 스트리밍 기록기를 사용하는 것이 좋습니다.

여러 싱크에 쓰기를 사용하면 foreachBatch 스트리밍 쓰기의 실행이 직렬화되어 각 마이크로 일괄 처리에 대한 대기 시간이 증가할 수 있습니다.

여러 델타 테이블에 쓰는 데 사용하는 foreachBatch 경우 foreachBatch의 Idempotent 테이블 쓰기를 참조하세요.