Azure Cosmos DB의 변경 피드 살펴보기
Azure Cosmos DB 변경 피드는 컨테이너의 변경 내용을 발생하는 순서대로 보관하는 영구 레코드입니다. Azure Cosmos DB의 변경 피드 지원은 모든 변경 사항에 대해 Azure Cosmos DB 컨테이너를 수신하여 작동합니다. 그런 다음 변경된 문서가 수정된 순서로 정렬된 목록이 출력됩니다. 영구 변경 내용은 비동기적 및 증분적으로 처리할 수 있고 출력을 하나 이상의 소비자 사이에 분산하여 병렬 처리가 가능합니다.
변경 피드 및 다양한 작업
현재 변경 피드의 모든 삽입 및 업데이트가 표시됩니다. 특정 작업 유형에 대한 변경 피드를 필터링할 수 없습니다. 현재 변경 피드는 삭제 작업을 로그하지 않습니다. 이 문제를 해결하려면 삭제할 항목에 소프트 마커를 추가하면 됩니다. 예를 들어, "deleted"라는 항목에 특성을 추가하고 해당 값을 "true"로 설정한 다음 항목에 TTL(Time-to-Live) 값을 설정할 수 있습니다. TTL을 설정하면 항목이 자동으로 삭제됩니다.
Cosmos DB 변경 피드 읽기
푸시 모델 또는 풀 모델 중 하나를 사용하여 Azure Cosmos DB 변경 피드로 작업할 수 있습니다. 푸시 모델을 사용하면 변경 피드 프로세서가 이 작업을 처리하기 위한 비즈니스 로직이 있는 클라이언트에 작업을 푸시합니다. 그러나 작업 확인 및 마지막 처리 작업에 대한 상태 저장의 복잡성은 변경 피드 프로세서 내에서 처리됩니다.
풀 모델을 사용하면 클라이언트는 서버에서 작업을 풀해야 합니다. 이 경우 해당 클라이언트는 작업을 처리하기 위한 비즈니스 논리를 가지고 있으며, 마지막으로 처리된 작업의 상태도 저장합니다. 해당 클라이언트는 작업을 병렬로 처리하는 여러 클라이언트 간의 부하 분산을 다루면서, 오류도 다룹니다.
참고 항목
이후 변경 내용에 대한 변경 피드 폴링, 마지막으로 처리된 변경에 대한 상태 저장, 기타 이점에 대해 걱정할 필요가 없으므로 푸시 모델을 사용하는 것이 좋습니다.
Azure Cosmos DB 변경 피드를 사용하는 대부분의 시나리오에서는 푸시 모델 옵션 중 하나를 사용합니다. 그러나 끌어오기 모델의 추가적인 하위 수준 제어를 원할 수 있는 몇 가지 시나리오가 있습니다. 추가적인 하위 수준 제어에는 다음이 포함됩니다.
- 특정 파티션 키에서 변경 내용 읽기
- 클라이언트에서 변경 내용을 받아 처리하는 속도 제어
- 변경 피드에서 기존 데이터를 한 번 읽음(예: 데이터 마이그레이션 수행)
푸시 모델을 사용하여 변경 피드 읽기
푸시 모델을 사용하여 변경 피드에서 읽을 수 있는 방법에는 Azure Functions Azure Cosmos DB 트리거와 변경 피드 프로세서 라이브러리가 있습니다. Azure Functions는 백그라운드에서 변경 피드 프로세서를 사용하므로 이 두 가지는 모두 변경 피드를 읽는 비슷한 방법입니다. 변경 피드를 완전히 다른 방식으로 읽는 것이 아니라, Azure Functions를 변경 피드 프로세서의 호스팅 플랫폼으로 생각하세요. Azure Functions는 백그라운드에서 변경 피드 프로세서를 사용합니다. 이는 컨테이너의 여러 파티션에 걸쳐 변경 처리를 자동으로 병렬화합니다.
Azure 기능
Azure Cosmos DB 컨테이너의 변경 피드에서 새 이벤트가 발생할 때마다 자동으로 트리거되는 작은 반응형 Azure Functions를 만들 수 있습니다. Azure Cosmos DB용 Azure Functions 트리거를 사용하면 작업자 인프라를 유지 관리할 필요 없이 변경 피드 프로세서의 확장 및 안정적인 이벤트 감지 기능을 사용할 수 있습니다.
변경 피드 프로세서
변경 피드 프로세서는 Azure Cosmos DB .NET V3 및 Java V4 SDK의 일부입니다. 이를 통해 변경 피드를 읽고 이벤트 처리를 여러 소비자에게 효과적으로 배포하는 프로세스를 간소화할 수 있습니다.
변경 피드 프로세서를 구현하는 주요 구성 요소 4개는 다음과 같습니다.
모니터링된 컨테이너: 모니터링된 컨테이너에는 변경 피드가 생성되는 데이터가 있습니다. 모니터링된 컨테이너에 대한 모든 삽입 및 업데이트는 컨테이너의 변경 피드에 반영됩니다.
임대 컨테이너: 임대 컨테이너는 상태 스토리지 역할을 하며 여러 작업자의 변경 피드 처리를 조정합니다. 임대 컨테이너는 모니터링되는 컨테이너와 동일한 계정 또는 별도의 계정에 저장할 수 있습니다.
컴퓨팅 인스턴스: 컴퓨팅 인스턴스는 변경 내용을 수신 대기하도록 변경 피드 프로세서를 호스트합니다. 플랫폼에 따라 이는 VM, kubernetes Pod, Azure App Service 인스턴스 또는 실제 물리적 컴퓨터로 표현될 수 있습니다. 이 문서 전체에서 인스턴스 이름으로 참조되는 고유 식별자가 있습니다.
대리자: 대리자는 개발자가 변경 피드 프로세서에서 읽는 각 변경 내용 일괄 처리로 수행하려는 작업을 정의하는 코드입니다.
변경 피드 프로세서를 구현할 때 진입점은 항상 GetChangeFeedProcessorBuilder
를 호출하는 Container
인스턴스에서 모니터링되는 컨테이너입니다.
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
여기서 첫 번째 매개 변수는 이 프로세서의 목표를 설명하는 고유한 이름이고, 두 번째 이름은 변경 내용을 다루는 대리자 구현입니다. 다음은 대리자의 예입니다.
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
그런 다음, WithInstanceName
을 사용하여 컴퓨팅 인스턴스 이름 또는 고유 식별자를 정의합니다. 이는 배포 중인 각 컴퓨팅 인스턴스에서 고유하고 달라야 하며, 마지막으로 WithLeaseContainer
로 임대 상태를 유지 관리하는 컨테이너입니다.
Build
를 호출하면 StartAsync
를 호출하여 시작할 수 있는 프로세서 인스턴스가 제공됩니다.
호스트 인스턴스의 일반적인 수명 주기는 다음과 같습니다.
- 변경 피드를 읽습니다.
- 변경 내용이 없는 경우에는 미리 정의된 시간(
Builder
에서WithPollInterval
로 사용자 지정 가능) 동안 일시 중지하고 #1로 이동합니다. - 변경 내용이 있으면 대리자에게 보냅니다.
- 성공적으로 변경 내용 처리를 완료하면 최신 처리 시점으로 임대 상점을 업데이트하고 #1로 이동합니다.