다음을 통해 공유


워크플로를 사용하여 구조적 스트리밍 쿼리 실패에서 복구

구조적 스트리밍은 스트리밍 쿼리에 대한 내결함성 및 데이터 일관성을 제공합니다. Azure Databricks 워크플로를 사용하여 실패 시 자동으로 다시 시작되도록 구조적 스트리밍 쿼리를 쉽게 구성할 수 있습니다. 스트리밍 쿼리에 대한 검사점 설정을 활성화하면 실패 후 쿼리를 다시 시작할 수 있습니다. 다시 시작한 쿼리는 실패한 쿼리가 중단된 곳에서 계속됩니다.

구조적 스트리밍 쿼리에 검사점 설정 사용

Databricks는 쿼리를 시작하기 전에 항상 클라우드 스토리지 경로로 checkpointLocation 옵션을 지정하는 것을 권장합니다. 예시:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

이 검사점 위치는 쿼리를 식별하는 모든 필수 정보를 보존합니다. 각 쿼리에는 다른 검사점 위치가 있어야 합니다. 여러 쿼리의 위치가 같으면 안 됩니다. 자세한 내용은 구조적 스트리밍 프로그래밍 가이드를 참조하세요.

참고 항목

checkpointLocation은 대부분의 출력 싱크 형식에서 필수 옵션이지만, 메모리 싱크와 같은 일부 싱크는 checkpointLocation이 제공되지 않은 경우 임시 검사점 위치를 자동으로 생성할 수 있습니다. 이러한 임시 검사점 위치는 내결함성 또는 데이터 일관성을 보장하지 않으며, 제대로 정리되지 않을 수 있습니다. 항상 checkpointLocation을 지정하여 잠재적인 문제를 방지합니다.

실패 시 스트리밍 쿼리를 다시 시작하도록 구조적 스트리밍 작업 구성

스트리밍 쿼리가 있는 Notebook 또는 JAR를 사용하여 Azure Databricks 작업을 만들고 다음과 같이 구성할 수 있습니다.

  • 항상 새 클러스터 사용
  • 오류 발생 시 항상 다시 시도

스키마가 진화된 스트리밍 워크로드를 구성할 때 작업 실패 시 자동으로 다시 시작하는 것이 특히 중요합니다. 스키마 진화는 스키마 변경이 감지될 때 예상되는 오류를 발생시키고 작업이 다시 시작될 때 새 스키마를 사용하여 데이터를 제대로 처리하여 Azure Databricks에서 작동합니다. Databricks는 항상 Databricks 워크플로에서 자동으로 다시 시작하도록 스키마가 진화된 쿼리를 포함하는 스트리밍 작업을 구성하는 것이 좋습니다.

작업은 구조적 스트리밍 API와 긴밀하게 통합되어 있으며, 실행에서 활성 상태인 모든 스트리밍 쿼리를 모니터링할 수 있습니다. 이 구성은 쿼리의 일부가 실패하면 작업이 자동으로 실행을 종료하고(다른 모든 쿼리와 함께) 새 클러스터에서 새 실행을 시작하도록 합니다. 이는 Notebook 또는 JAR 코드를 다시 실행하고 모든 쿼리를 다시 시작합니다. 이것은 좋은 상태로 돌아가는 가장 안전한 방법입니다.

참고 항목

  • 활성 스트리밍 쿼리가 하나라도 실패하면 활성 실행이 실패하고, 다른 모든 스트리밍 쿼리가 종료됩니다.
  • Notebook 끝에서 streamingQuery.awaitTermination() 또는 spark.streams.awaitAnyTermination()을 사용할 필요는 없습니다. 스트리밍 쿼리가 활성 상태이면 작업에서 자동으로 실행이 완료되지 않도록 합니다.
  • Databricks는 구조적 스트리밍 Notebook을 오케스트레이션하는 대신 %run 작업을 dbutils.notebook.run() 사용하는 것이 좋습니다. 다른 Notebook에서 Databricks Notebook 실행을 참조하세요.

다음은 권장되는 작업 구성의 예제입니다.

  • 클러스터: 항상 새 클러스터를 사용하고 최신 Spark 버전(또는 버전 2.1 이상)을 사용하도록 설정합니다. Spark 2.1 이상에서 시작된 쿼리는 쿼리 및 Spark 버전 업그레이드 후에 복구할 수 있습니다.
  • 알림: 오류 발생 시 이메일 알림을 받으려면 설정하세요.
  • 일정: 일정을 설정하지 마세요.
  • 시간 제한: 시간 제한을 설정하지 마세요. 스트리밍 쿼리는 무기한으로 실행됩니다.
  • 최대 동시 실행: 1로 설정합니다. 각 쿼리의 인스턴스 1개만 동시에 활성 상태여야 합니다.
  • 다시 시도: 제한 없음으로 설정합니다.

이러한 구성을 이해하려면 Azure Databricks 작업 만들기 및 실행을 참조하세요.

구조적 스트리밍 쿼리 변경 후 복구

동일한 검사점 위치에서 다시 시작할 때 허용되는 스트리밍 쿼리 변경에는 제한이 있습니다. 다음은 허용되지 않거나 변경의 효과가 잘 정의되지 않은 몇 가지 변경 내용입니다. 다음은 모든 변경에 적용됩니다.

  • 허용되는 용어는 지정된 변경을 수행할 수 있지만 해당 효과의 의미 체계가 잘 정의되었는지 여부는 쿼리 및 변경에 따라 달라집니다.
  • 허용되지 않는 용어는 다시 시작한 쿼리가 예측할 수 없는 오류로 인해 실패할 가능성이 있으므로 지정된 변경을 수행해서는 안 된다는 것을 의미합니다.
  • sdfsparkSession.readStream으로 생성된 스트리밍 DataFrame/데이터 세트를 나타냅니다.

구조적 스트리밍 쿼리의 변경 유형

  • 입력 원본의 개수 또는 형식(즉, 다른 원본) 변경: 허용되지 않습니다.
  • 입력 원본의 매개 변수 변경: 허용 여부 및 변경의 의미 체계가 잘 정의되었는지 여부는 원본과 쿼리에 따라 다릅니다. 다음은 몇 가지 예입니다.
    • 속도 제한 추가, 삭제, 수정은 허용됩니다. 예:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • 구독된 문서 및 파일 변경은 결과를 예측할 수 없기 때문에 일반적으로 허용되지 않습니다. 예: spark.readStream.format("kafka").option("subscribe", "article") -> spark.readStream.format("kafka").option("subscribe", "newarticle")

  • 트리거 간격 변경: 증분 일괄 처리와 시간 간격 간에 트리거를 변경할 수 있습니다. 실행 간의 트리거 간격 변경을 참조 하세요.
  • 출력 싱크 형식 변경: 몇 가지 특정 싱크 조합 간 변경은 허용됩니다. 사례별로 확인해야 합니다. 다음은 몇 가지 예입니다.
    • 파일 싱크 -> Kafka 싱크 변경은 허용됩니다. Kafka에는 새 데이터만 표시됩니다.
    • Kafka 싱크 -> 파일 싱크 변경은 허용되지 않습니다.
    • Kafka 싱크 -> foreach 변경 또는 그 반대로 변경은 허용됩니다.
  • 출력 싱크의 매개 변수 변경: 허용 여부 및 변경의 의미 체계가 잘 정의되었는지 여부는 싱크와 쿼리에 따라 다릅니다. 다음은 몇 가지 예입니다.
    • 파일 싱크의 출력 디렉터리 변경은 허용되지 않습니다. 예: sdf.writeStream.format("parquet").option("path", "/somePath") -> sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • 출력 항목에 대한 변경은 다음을 수행할 수 있습니다. sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2")
    • 사용자 정의 foreach 싱크(즉, ForeachWriter 코드) 변경은 허용되지만 변경 의미 체계는 코드에 따라 다릅니다.
  • 프로젝션/필터/맵 유사 작업 변경: 일부 사례는 허용됩니다. 예:
    • 필터 추가/삭제는 허용됩니다. 예: sdf.selectExpr("a") -> sdf.where(...).selectExpr("a").filter(...)
    • 동일한 출력 스키마를 사용하는 프로젝션 변경은 허용됩니다. 예: sdf.selectExpr("stringColumn AS json").writeStream -> sdf.select(to_json(...).as("json")).writeStream
    • 다른 출력 스키마를 사용하는 프로젝션 변경은 조건부로 허용됩니다. 예: sdf.selectExpr("a").writeStream -> sdf.selectExpr("b").writeStream 변경은 출력 싱크에서 "a" -> "b" 스키마 변경을 허용하는 경우에만 허용됩니다.
  • 상태 저장 작업 변경: 스트리밍 쿼리의 일부 작업은 결과를 지속적으로 업데이트하기 위해 상태 데이터를 유지 관리해야 합니다. 구조적 스트리밍은 상태 데이터 검사점을 내결함성 스토리지(예: DBFS, Azure Blob Storage)에 자동으로 설정하고, 다시 시작된 후 복원합니다. 그러나 상태 데이터의 스키마는 다시 시작된 후에도 동일하게 유지된다고 가정합니다. 즉, 스트리밍 쿼리의 상태 저장 작업에 대한 변경(즉, 추가, 삭제 또는 스키마 수정)은 다시 시작 사이에 허용되지 않습니다. 다음은 상태 복구를 보장하기 위해 다시 시작 사이에 스키마를 변경해서는 안 되는 상태 저장 작업 목록입니다.
    • 스트리밍 집계: 예를 들면 sdf.groupBy("a").agg(...)입니다. 그룹화 키 또는 집계의 개수 또는 형식 변경은 허용되지 않습니다.
    • 스트리밍 중복 제거: 예를 들면 sdf.dropDuplicates("a")입니다. 그룹화 키 또는 집계의 개수 또는 형식 변경은 허용되지 않습니다.
    • 스트림-스트림 조인: 예를 들면 sdf1.join(sdf2, ...)입니다(즉, 두 입력이 모두 sparkSession.readStream으로 생성됨). 스키마 또는 동등 조인 열 변경은 허용되지 않습니다. 조인 유형(외부 또는 내부) 변경은 허용되지 않습니다. 기타 조인 조건 변경은 잘 정의되어 있지 않습니다.
    • 임의 상태 저장 작업: 예를 들면 sdf.groupByKey(...).mapGroupsWithState(...) 또는 sdf.groupByKey(...).flatMapGroupsWithState(...)입니다. 사용자 정의 상태 스키마 및 시간 제한 유형 변경은 허용되지 않습니다. 사용자 정의 상태 매핑 함수 내의 변경은 허용되지만 변경 영향의 의미 체계는 사용자 정의 논리에 따라 다릅니다. 상태 스키마 변경을 지원하려는 경우 스키마 마이그레이션을 지원하는 인코딩/디코딩 체계를 사용하여 복잡한 상태 데이터 구조를 바이트로 명시적으로 인코드/디코드할 수 있습니다. 예를 들어 상태를 Avro 인코딩된 바이트로 저장하는 경우 이진 상태를 복원하므로 쿼리를 다시 시작할 때 Avro 상태 스키마를 변경할 수 있습니다.