이 페이지에서는 델타 테이블을 Spark Structured Streaming 의 원본 및 싱크로 readStream 사용하는 방법을 설명합니다 writeStream. Delta Lake는 스트리밍 시스템 및 파일에 대한 일반적인 성능 및 안정성 문제를 해결합니다. 이점은 다음과 같습니다.
- 저지연 수집으로 생성된 작은 파일을 조합하여 성능을 향상시킵니다.
- 둘 이상의 스트림(또는 동시 일괄 처리 작업)을 사용하여 "정확히 한 번" 처리를 유지합니다.
- 파일을 스트림 원본으로 사용할 때 새 파일을 효율적으로 검색합니다.
Databricks SQL에서 스트리밍 테이블을 사용하여 데이터를 로드하는 방법을 알아보려면 Databricks SQL에서 스트리밍 테이블 사용을 참조하세요.
Delta Lake를 사용한 스트림 정적 조인은 Stream-static 조인을 참조하세요.
델타 테이블을 싱크로 사용
구조적 스트리밍을 사용하여 델타 테이블에 데이터를 쓸 수 있습니다. Delta Lake 트랜잭션 로그는 테이블에 대해 동시에 실행되는 다른 스트림 또는 일괄 처리 쿼리가 있는 경우에도 정확히 한 번 처리를 보장합니다.
구조적 스트리밍 싱크를 사용하여 델타 테이블에 쓸 때 epochId = -1와 빈 커밋이 표시될 수 있습니다. 이는 예상되며 일반적으로 발생합니다.
- 스트리밍 쿼리의 각 실행의 첫 번째 일괄 처리에서(모든 일괄 처리에 대해
Trigger.AvailableNow발생) - 스키마가 변경되면(예: 열 추가)
이러한 빈 커밋은 의도적이며 오류를 나타내지 않습니다. 쿼리의 정확성이나 성능에는 영향을 주지 않습니다.
Note
Delta Lake VACUUM 함수는 Delta Lake에서 관리하지 않는 모든 파일을 제거하되, _로 시작되는 디렉터리는 건너뜁니다.
<table-name>/_checkpoints같은 디렉터리 구조를 사용하여 Delta 테이블에 대한 다른 데이터 및 메타데이터와 함께 검사점을 안전하게 저장할 수 있습니다.
메트릭을 사용하여 백로그 모니터링
다음 메트릭을 사용하여 스트리밍 쿼리 프로세스의 백로그를 모니터링합니다.
-
numBytesOutstanding: 백로그에서 아직 처리되지 않은 바이트 수입니다. -
numFilesOutstanding: 백로그에서 아직 처리할 파일 수입니다. -
numNewListedFiles: 이 일괄 처리의 백로그를 계산하기 위해 나열된 Delta Lake 파일 수입니다. -
backlogEndOffset: 백로그를 계산하는 데 사용되는 델타 테이블 버전입니다.
Notebook에서 스트리밍 쿼리 진행률 대시보드의 원시 데이터 탭 아래에서 이러한 메트릭을 봅니다.
{
"sources": [
{
"description": "DeltaSource[file:/path/to/source]",
"metrics": {
"numBytesOutstanding": "3456",
"numFilesOutstanding": "8"
}
}
]
}
추가 모드
기본적으로 스트림은 추가 모드에서 실행되며 테이블에 새 레코드만 추가합니다.
toTable 테이블로 스트리밍할 때 다음 메서드를 사용합니다.
Python
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
Scala
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
전체 모드
전체 모드로 구조적 스트리밍을 사용하여 모든 일괄 처리 후에 전체 테이블을 바꿉다. 예를 들어 고객별로 집계된 요약 이벤트 테이블을 지속적으로 업데이트할 수 있습니다.
Python
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
Scala
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
엄격한 대기 시간 요구 사항이 없는 애플리케이션의 경우 다음과 같은 AvailableNow일회성 트리거를 사용하여 컴퓨팅 리소스 및 비용을 절약할 수 있습니다. 예를 들어 이 트리거를 사용하여 지정된 일정에 따라 요약 집계 테이블을 업데이트하고 마지막 업데이트 이후 도착한 새 데이터만 처리합니다. 참조 AvailableNow: 증분 일괄 처리.
원본 델타 테이블의 변경 내용 처리
구조적 스트리밍은 델타 테이블을 증분 방식으로 읽습니다. 스트리밍 쿼리가 Delta 테이블을 참조하여 읽을 때, 새 테이블 버전이 원본 테이블에 커밋되면 새 레코드는 멱등성을 유지하며 처리됩니다. 구조적 스트리밍은 추가 입력만 허용하고 원본 델타 테이블에서 수정이 발생하는 경우 예외를 throw합니다. 예를 들어 , UPDATE또는 DELETEMERGE INTO 작업이 스트리밍 쿼리에서 읽은 원본 델타 테이블을 수정하는 경우 OVERWRITE스트림이 오류와 함께 실패합니다.
사용 사례에 따라 원본 델타 테이블에 대한 업스트림 변경 내용을 처리하는 네 가지 일반적인 방법이 있습니다. 참조 테이블과 각 항목에 대한 세부 정보는 다음과 같습니다.
| 접근법 | 장점 | Cons |
|---|---|---|
skipChangeCommits |
단순하지만 복잡한 논리를 작성할 필요는 없습니다. 업스트림 변경 내용이 별도로 처리되는 추가 전용 처리 또는 잘못된 레코드를 일시적으로 처리하는 데 유용합니다. | 변경 내용을 전파하지 않고 추가만 처리합니다. |
| 전체 새로 고침 | 또한 단순하지만 복잡한 논리를 작성할 필요가 없습니다. 드문 업스트림 변경이 있는 작은 데이터 세트에 유용합니다. | 큰 데이터 세트의 경우 비용이 많이 듭니다. 모든 다운스트림 테이블을 다시 처리해야 합니다. |
| 데이터 피드 변경 | 모든 변경 형식(삽입, 업데이트 및 삭제)을 처리합니다. Databricks는 가능하면 테이블에서 직접 스트리밍하기보다는 델타 테이블의 CDC 피드를 통해 스트리밍하는 것을 권장합니다. | 각 변경 유형을 처리하기 위해 더 복잡한 논리를 작성해야 합니다. |
| 머터리얼라이즈드 뷰 | 자동 변경 전파가 있는 구조적 스트리밍에 대한 간단한 대안입니다. | 대기 시간이 더 깁니다. Lakeflow Spark 선언적 파이프라인 및 Databricks SQL에서만 사용할 수 있습니다. |
업스트림 변경 커밋 건너뛰기skipChangeCommits
기존 레코드를 삭제하거나 수정하는 트랜잭션을 무시하고 추가만 처리하도록 설정합니다 skipChangeCommits . 이는 기존 데이터의 변경 내용을 스트림을 통해 전파할 필요가 없거나 이러한 변경 내용을 처리하기 위해 별도의 논리를 선호하는 경우에 유용합니다. 일회성 변경 내용을 일시적으로 무시해야 하는 경우 켜고 끌 skipChangeCommits 수 있습니다.
Databricks는 변경 데이터 피드를 사용하지 않는 대부분의 워크로드에 사용하는 skipChangeCommits 것이 좋습니다.
Python
(spark.readStream
.option("skipChangeCommits", "true")
.table("source_table")
)
Scala
spark.readStream
.option("skipChangeCommits", "true")
.table("source_table")
Important
테이블에 대해 스트리밍 읽기가 시작된 후 델타 테이블의 스키마가 변경되면 쿼리가 실패합니다. 대부분의 스키마 변경의 경우 스트림을 다시 시작하여 스키마 불일치를 해결하고 처리를 계속할 수 있습니다.
Databricks Runtime 12.2 LTS 이하에서는 열 이름 바꾸기 또는 삭제와 같은 비가산적 스키마 진화를 거친 열 매핑이 활성화된 델타 테이블에서 스트리밍할 수 없습니다. 자세한 내용은 열 매핑 및 스트리밍을 참조하세요.
Note
Databricks Runtime 12.2 LTS 이상에서 skipChangeCommits가 ignoreChanges를 대체합니다. Databricks Runtime 11.3 LTS 이하 ignoreChanges 에서 유일하게 지원되는 옵션입니다. 자세한 내용은 레거시 옵션을 ignoreChanges 참조하세요.
레거시 옵션: ignoreDeletes
ignoreDeletes 는 파티션 경계(즉, 전체 파티션 삭제)에서 데이터를 삭제하는 트랜잭션만 처리하는 레거시 옵션입니다. 파티션이 아닌 삭제, 업데이트 또는 기타 수정 사항을 처리해야 하는 경우 대신 사용합니다 skipChangeCommits .
Python
(spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
)
Scala
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
레거시 옵션: ignoreChanges
ignoreChanges 는 Databricks Runtime 11.3 LTS 이하에서 사용할 수 있습니다. Databricks Runtime 12.2 LTS 이상에서는 skipChangeCommits로 대체됩니다.
ignoreChanges이(가) 사용 설정된 경우, UPDATE, MERGE INTO, DELETE (파티션 내) 또는 OVERWRITE와 같은 데이터 수정 작업 후에 원본 테이블의 다시 작성된 데이터 파일이 다시 내보내집니다. 변경되지 않은 행은 새 행과 함께 내보내지는 경우가 많으므로 다운스트림 소비자는 중복을 처리할 수 있어야 합니다. 삭제는 다운스트림으로 전파되지 않습니다.
ignoreChanges가 ignoreDeletes보다 우선합니다.
반면, skipChangeCommits 파일 변경 작업은 완전히 무시합니다. 데이터 수정 작업으로 인해 소스 테이블의 데이터 파일이 UPDATE, MERGE INTO, DELETE, OVERWRITE와 같이 다시 작성되었으나 이 파일들은 전적으로 무시됩니다. 스트림 원본 테이블의 변경 내용을 반영하려면 이러한 변경 내용을 전파하는 별도의 논리를 구현해야 합니다.
Databricks는 모든 새 워크로드에 사용할 skipChangeCommits 것을 권장합니다. 워크로드를 ignoreChanges에서 skipChangeCommits로 마이그레이션하려면 스트리밍 로직을 리펙토링하세요.
다운스트림 테이블의 전체 새로 고침
업스트림 변경 내용이 드물고 데이터가 다시 처리할 만큼 작은 경우 스트리밍 검사점 및 출력 테이블을 삭제한 다음 처음부터 스트림을 다시 시작할 수 있습니다. 이렇게 하면 스트림이 원본 테이블의 모든 데이터를 다시 처리합니다. 또한 이 방법을 사용하려면 이 스트림의 출력에 의존하는 모든 다운스트림 테이블을 다시 처리해야 합니다.
이 방법은 업스트림 변경이 드물고 전체 새로 고침 비용이 허용되는 소규모 데이터 세트 또는 워크로드에 가장 적합합니다.
변경 데이터 피드 사용
모든 유형의 변경 내용(삽입, 업데이트 및 삭제)을 처리하는 워크로드의 경우 Delta Lake 변경 데이터 피드를 사용합니다. 변경 데이터 피드는 델타 테이블에 대한 행 수준 변경 내용을 기록하므로 이러한 변경 내용을 스트리밍하고 다운스트림 테이블의 각 변경 형식을 처리하는 논리를 작성할 수 있습니다. 코드가 모든 유형의 변경 이벤트를 명시적으로 처리하기 때문에 가장 강력한 방법입니다. Azure Databricks에서 Delta Lake 변경 데이터 피드를 사용하는 방법을 참조하세요.
Lakeflow Spark 선언적 파이프라인을 사용하는 경우 AUTO CDC API: 파이프라인을 사용하여 변경 데이터 캡처 간소화를 참조하세요.
Important
Databricks Runtime 12.2 LTS 이하에서는 열 이름 바꾸기 또는 삭제와 같이 비가산적 스키마 진화를 거친 열 매핑이 활성화된 Delta 테이블의 변경 데이터 피드에서 스트리밍할 수 없습니다. 열 매핑 및 스트리밍을 참조하세요.
구체화된 뷰를 사용하세요
구체화된 뷰는 원본 데이터가 변경될 때 결과를 다시 계산하여 업스트림 변경 내용을 자동으로 처리합니다. 가능한 가장 낮은 대기 시간이 필요하지 않고 스트리밍 복잡성을 관리하지 않으려면 구체화된 뷰를 통해 아키텍처를 간소화할 수 있습니다. 구체화된 뷰는 Lakeflow Spark 선언적 파이프라인 파이프라인 및 Databricks SQL에서 사용할 수 있습니다. 구체화된 뷰를 참조하세요.
Example
예를 들어, user_events, date및 user_email 열이 있는 action 테이블을 가지고 있으며, date로 분할된 경우를 가정해 보겠습니다.
user_events 테이블에서 스트림하고 GDPR로 인해 데이터를 삭제해야 합니다.
skipChangeCommits 를 사용하면 여러 파티션의 데이터를 삭제할 수 있습니다(이 예제에서는 필터링 user_email). 다음 구문을 사용합니다.
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
user_email 문을 사용하여 UPDATE을(를) 업데이트하면, 문제의 user_email을(를) 포함하는 파일이 다시 작성됩니다. 변경된 데이터 파일을 무시하는 데 사용합니다 skipChangeCommits .
삭제가 항상 전체 파티션 삭제임을 확신하지 않는 한 Databricks는 skipChangeCommits 대신 ignoreDeletes를 사용할 것을 권장합니다.
를 idempotent 테이블 쓰기에 사용 foreachBatch
Note
Databricks는 foreachBatch사용하는 대신 업데이트하려는 각 싱크에 대해 별도의 스트리밍 쓰기를 구성하는 것이 좋습니다. 여러 싱크에 foreachBatch 대한 쓰기는 병렬 처리를 줄이고 여러 테이블에 대한 쓰기가 직렬화 foreachBatch되므로 전체 대기 시간을 증가시킵니다.
델타 테이블은 다음 DataFrameWriter 옵션을 지원하여 foreachBatch idempotent 내의 여러 테이블에 쓰기를 수행합니다.
-
txnAppId: 각 DataFrame 쓰기에 전달할 수 있는 고유 문자열입니다. 예를 들어 StreamingQuery ID를txnAppId로 사용할 수 있습니다.txnAppId는 사용자가 생성한 고유 문자열일 수 있으며 스트림 ID와 관련될 필요가 없습니다. -
txnVersion: 트랜잭션 버전 역할을 하는 단조 증가하는 숫자입니다.
Delta Lake는 txnAppId 및 txnVersion을 사용하여 중복 쓰기를 식별하고 무시합니다. 예를 들어 오류가 일괄 처리 쓰기를 중단한 후 동일한 txnAppIdtxnVersion 일괄 처리를 다시 실행하고 중복 항목을 올바르게 식별하고 무시할 수 있습니다.
foreachBatch를 사용하여 임의의 데이터 싱크에 쓰기를 참조하세요.
Warning
스트리밍 검사점을 삭제하고 새 검사점을 사용하여 쿼리를 다시 시작하는 경우 다른 txnAppId항목을 제공해야 합니다. 새 검사점은 일괄 처리 ID로 시작합니다 0. Delta Lake는 배치 ID와 txnAppId를 함께 고유 키로 사용하며, 이미 본 값이 있는 배치를 건너뜁니다.
다음 코드 예제에서는 이 패턴을 보여 줍니다.
Python
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
Scala
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}
스트리밍 쿼리에서 foreachBatch를 사용하여 업서트
스트리밍 쿼리에서 merge 및 foreachBatch을 사용하여 델타 테이블에 복잡한 업서트를 작성할 수 있습니다.
foreachBatch를 사용하여 임의의 데이터 싱크에 쓰기를 참조하세요.
이 접근 방식에는 다음과 같은 많은 애플리케이션이 있습니다.
- 출력 모드를
update사용하여 쓰기 성능을 향상시키는 반면complete출력 모드에서는 각 마이크로배치에 대한 전체 결과 테이블을 다시 작성해야 합니다. - 델타 테이블에 변경 내용을 계속 적용하기 위해 병합 쿼리를 사용하여 변경 데이터를
foreachBatch에 작성합니다. Delta Lake를 사용하여 SCD(느린 변경 데이터) 및 CDC(변경 데이터 캡처)를 참조하세요. - 스트림 처리 중에 중복 제거를 처리합니다. 삽입 전용 병합 쿼리를
foreachBatch사용하여 자동 중복 제거를 사용하여 델타 테이블에 데이터를 지속적으로 쓸 수 있습니다. 델타 테이블에 쓸 때 데이터 중복 제거를 참조하세요.
Note
merge안의foreachBatch문이 idempotent인지 확인하세요. 그렇지 않으면 스트리밍 쿼리를 다시 시작하면 동일한 데이터 일괄 처리에 작업을 여러 번 적용할 수 있습니다. idempotent 테이블 쓰기 사용을foreachBatch참조하세요.merge입력 데이터 속도 메트릭을 사용하면 원본에서foreachBatch데이터가 생성되는 실제 속도의 배수를 반환할 수 있습니다.merge는 입력 데이터를 여러 번 읽어 메트릭을 곱합니다. 이 지표 곱셈을 방지하려면merge전에 배치 DataFrame을 캐시하고merge다음에 언캐시하십시오.입력 데이터 속도는
StreamingQueryProgress및 노트북 스트리밍 속도 그래프를 통해 확인할 수 있습니다. Azure Databricks의 구조적 스트리밍 쿼리 모니터링을 참조하세요.
예를 들어 MERGE 다음에서 foreachBatchSQL 문을 사용할 수 있습니다.
Scala
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Upsert 스트리밍에 Delta Lake API를 사용할 수도 있습니다.
Scala
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
변경 내용을 처리하도록 초기 테이블 버전 설정
기본적으로 스트림은 사용 가능한 최신 Delta 테이블 버전으로 시작합니다. 여기에는 현재 테이블의 전체 스냅샷과 이후의 모든 변경 내용이 포함됩니다. Databricks는 대부분의 워크로드에 기본 초기 테이블 버전을 사용하는 것이 좋습니다.
필요에 따라 다음 옵션을 사용하여 전체 테이블을 처리하지 않고 Delta Lake 스트리밍 원본의 시작점을 지정할 수 있습니다.
startingVersion: 읽기를 시작할 델타 테이블 버전입니다. 지정된 버전 이후에 커밋된 모든 테이블 변경 사항은 스트림에서 읽습니다. 지정된 버전을 사용할 수 없으면 스트림이 시작되지 않습니다.사용 가능한 커밋 버전을 찾으려면 .를 실행하고
DESCRIBE HISTORY확인합니다version. 최신 변경 내용만 반환하려면 .를 지정합니다latest. 델타 테이블 버전에 대한 자세한 내용은 테이블 기록 작업을 참조하세요.startingTimestamp: 읽기를 시작할 타임스탬프입니다. 지정된 타임스탬프 이후에 커밋된 모든 테이블 변경 사항은 스트림에서 읽을 수 있습니다. 제공된 타임스탬프가 모든 테이블 커밋 앞에 오는 경우 스트리밍 읽기는 사용 가능한 가장 빠른 타임스탬프로 시작됩니다. 다음 중 하나를 설정합니다.- 타임스탬프 문자열입니다. 예들 들어
"2019-01-01T00:00:00.000Z"입니다. - 날짜 문자열입니다. 예들 들어
"2019-01-01"입니다.
- 타임스탬프 문자열입니다. 예들 들어
startingVersion와 startingTimestamp를 동시에 설정할 수 없습니다. 이러한 설정은 새 스트리밍 쿼리에만 적용됩니다. 스트리밍 쿼리가 시작되고 진행률이 검사점에서 기록된 경우 이러한 설정은 무시됩니다.
Important
지정된 버전 또는 타임스탬프에서 스트리밍 원본을 시작할 수 있지만 스트리밍 원본의 스키마는 항상 델타 테이블의 최신 스키마입니다. 지정된 버전 또는 타임스탬프 이후 델타 테이블에 호환되지 않는 스키마 변경이 없는지 확인해야 합니다. 그렇지 않으면 잘못된 스키마를 사용하여 데이터를 읽을 때 스트리밍 원본이 잘못된 결과를 반환할 수 있습니다.
Example
예를 들어 user_events테이블이 있다고 가정해 보겠습니다. 버전 5 이후의 변경 내용을 읽으려면 다음을 사용합니다.
spark.readStream
.option("startingVersion", "5")
.table("user_events")
2018년 10월 18일 이후의 변경 내용을 읽으려면 다음을 사용합니다.
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
데이터를 삭제하지 않고 초기 스냅샷 처리
이 기능은 Databricks Runtime 11.3 LTS 이상에서 사용할 수 있습니다.
워터마크가 정의된 상태 저장 스트리밍 쿼리에서 수정 시간별로 파일을 처리하면 레코드를 잘못된 순서로 처리할 수 있습니다. 이로 인해 워터마크가 레코드를 지연 이벤트로 잘못 표시하고 삭제할 수 있습니다. 이 문제는 초기 델타 스냅샷이 기본 순서로 처리되는 경우에만 발생할 수 있습니다.
델타 원본 테이블이 있는 스트림의 경우 쿼리는 먼저 테이블에 있는 모든 데이터를 처리하고 초기 스냅샷이라는 버전을 만듭니다. 기본적으로 Delta 테이블의 데이터 파일은 마지막으로 수정된 파일에 따라 처리됩니다. 그러나 마지막 수정 시간이 반드시 레코드 이벤트 시간 순서를 나타내는 것은 아닙니다.
초기 스냅샷 처리 중에 데이터가 삭제하지 않도록 하려면 이 withEventTimeOrder 옵션을 사용하도록 설정합니다.
withEventTimeOrder 는 초기 스냅샷 데이터의 이벤트 시간 범위를 시간 버킷으로 나눕니다. 각 마이크로 일괄 처리는 시간 범위 내에서 데이터를 필터링하여 버킷을 처리합니다.
maxFilesPerTrigger 및 maxBytesPerTrigger 옵션은 마이크로 배치 크기를 제어하는 데 여전히 적용할 수 있지만, 처리 방법으로 인해 대략적으로만 적용됩니다.
다음 다이어그램은 이 프로세스를 보여줍니다.
Constraints
- 스트림 쿼리가 시작되고 초기 스냅샷이 적극적으로 처리되고 있는지 변경할 수 없습니다
withEventTimeOrder. 변경된withEventTimeOrder으로 다시 시작하려면 검사점을 삭제해야 합니다. - 사용하도록 설정된 경우
withEventTimeOrder초기 스냅샷 처리가 완료될 때까지 이 기능을 지원하지 않는 Databricks 런타임 버전으로 스트림을 다운그레이드할 수 없습니다. 다운그레이드하려면 초기 스냅샷이 완료되기를 기다리거나 검사점을 삭제하고 쿼리를 다시 시작합니다. - 이 기능은 다음 시나리오에서 지원되지 않습니다.
- 이벤트 시간 열은 생성된 열이며 델타 원본과 워터마크 사이에 비 프로젝션 변환이 있습니다.
- 스트림 쿼리에 둘 이상의 델타 원본이 있는 워터마크가 있습니다.
성능
사용하도록 설정하면 withEventTimeOrder 초기 스냅샷 처리 성능이 느려질 수 있습니다. 각 마이크로 일괄 처리는 초기 스냅샷을 검사하여 해당 이벤트 시간 범위 내에서 데이터를 필터링합니다. 필터링 성능을 향상시키려면 다음을 수행합니다.
- 데이터 건너뛰기를 적용할 수 있도록 델타 원본 열을 이벤트 시간으로 사용합니다. 데이터 건너뛰기 참조
- 이벤트 시간 열을 따라 테이블을 분할합니다.
Spark UI를 사용하여 특정 마이크로 일괄 처리를 위해 검사되는 델타 파일 수를 확인합니다.
Example
테이블 user_events에 event_time 열이 있다고 가정해 봅시다. 스트리밍 쿼리는 집계 쿼리입니다. 초기 스냅샷 처리 중 데이터가 삭제되지 않도록 하려면 다음을 사용할 수 있습니다.
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
클러스터에서 Spark 구성을 통해 withEventTimeOrder를 설정하여 모든 스트리밍 쿼리에 적용할 수 있습니다 spark.databricks.delta.withEventTimeOrder.enabled true.
입력 속도를 제한하여 처리 성능 향상
기본적으로 구조적 스트리밍은 각 마이크로 일괄 처리에서 가능한 한 많은 파일을 처리합니다. 일괄 처리당 처리되는 데이터의 양을 제한하고 메모리 사용량을 관리하거나 대기 시간을 안정화하거나 클라우드 스토리지 비용을 줄이려면 다음 옵션을 사용합니다.
-
maxFilesPerTrigger: 각 마이크로 배치에서 고려해야 할 새 파일 수. 기본값은 1000입니다. -
maxBytesPerTrigger: 각 마이크로 배치에서 처리되는 데이터의 양입니다. 이 옵션은 "소프트 최대"를 설정합니다. 즉, 일괄 처리는 이 정도의 데이터를 처리하며, 가장 작은 입력 단위가 이 제한보다 큰 경우 스트리밍 쿼리가 앞으로 이동하도록 하기 위해 제한보다 더 많이 처리할 수 있습니다. 이 설정은 기본적으로 설정되지 않습니다.
둘 다 maxBytesPerTrigger와 maxFilesPerTrigger을 사용하는 경우, 마이크로 배치 처리는 데이터가 maxFilesPerTrigger 또는 maxBytesPerTrigger 제한에 도달할 때까지 계속됩니다.
Note
기본적으로 logRetentionDuration 원본 테이블의 트랜잭션을 정리하고 스트리밍 쿼리가 해당 버전을 처리하려고 하면 쿼리가 데이터 손실을 방지하지 못합니다. 손실된 데이터를 무시하고 처리를 계속하도록 failOnDataLossfalse 옵션을 설정할 수 있습니다.
시간 이동 쿼리에 대한 데이터 보존 구성을 참조하세요.
클라우드 스토리지 비용 제어
스트리밍 쿼리에는 비용 및 대기 시간(예processingTimeavailableNowrealTime: 및 )의 균형을 맞출 수 있는 여러 트리거 모드를 사용할 수 있습니다.
클라우드 스토리지 비용 제어를 참조하세요.