Azure Cosmos DB의 변경 피드 끌어오기 모델

적용 대상: NoSQL

변경 피드 끌어오기 모델을 사용하여 원하는 속도로 Azure Cosmos DB 변경 피드를 사용할 수 있습니다. 변경 피드 프로세서와 마찬가지로 변경 피드 풀 모델을 사용하여 여러 변경 피드 소비자에 걸쳐 변경 내용 처리를 병렬화할 수 있습니다.

변경 피드 프로세서와 비교

많은 시나리오에서 변경 피드 프로세서 또는 변경 피드 풀 모델을 사용하여 변경 피드를 처리할 수 있습니다. 끌어오기 모델의 연속 토큰 및 변경 피드 프로세서의 임대 컨테이너는 모두 변경 피드에서 마지막으로 처리된 항목(또는 항목의 모음)에 대한 "책갈피"로 작도합니다.

그러나 연속 토큰을 임대로 변환하거나 그 반대로 변환할 수는 없습니다.

참고 항목

대부분의 경우 변경 피드에서 읽어야 하는 경우 가장 간단한 옵션은 변경 피드 프로세서를 사용하는 것입니다.

다음 시나리오에서는 끌어오기 모델 사용을 고려해야 합니다.

  • 특정 파티션 키의 변경 내용을 읽습니다.
  • 클라이언트가 처리를 위해 변경 사항을 받는 속도를 제어합니다.
  • 변경 피드의 기존 데이터에 대한 일회성 읽기를 수행합니다(예: 데이터 마이그레이션 수행).

변경 피드 프로세서와 변경 피드 풀 모델 간의 몇 가지 주요 차이점은 다음과 같습니다.

기능 변경 피드 프로세서 변경 피드 풀 모델
변경 피드 처리의 현재 지점 추적 임대(Azure Cosmos DB 컨테이너에 저장됨) 연속 토큰(메모리에 저장되거나 수동으로 유지됨)
이전 변경 내용 재생 가능 여부 예, 밀어넣기 모델 사용 예, 끌어오기 모델 사용
이후 변경 내용 폴링 사용자 지정 WithPollInterval 값에 기반하여 변경 내용을 자동으로 확인 수동
새로운 변경 내용이 없는 동작 WithPollInterval의 값을 자동으로 기다린 후 다시 확인합니다. 상태를 확인하고 수동으로 다시 확인
전체 컨테이너의 변경 내용 처리 예. 동일한 컨테이너에서 소비하는 여러 스레드와 컴퓨터에서 자동으로 병렬화됩니다. 예, FeedRange를 사용하여 수동으로 병렬화했습니다.
단일 파티션 키에서만 변경 내용을 처리합니다. 지원되지 않음

참고 항목

풀 모델을 사용할 때는 변경 피드 프로세서를 사용하여 읽을 때와 달리 새로운 변경 내용이 없는 경우를 명시적으로 처리해야 합니다.

풀 모델 작업

풀 모델을 사용하여 변경 피드를 처리하려면 FeedIterator의 인스턴스를 만듭니다. 처음에 FeedIterator를 만들 때 변경 내용을 읽기 위한 시작 위치와 FeedRange에 사용할 값으로 구성된 필수 ChangeFeedStartFrom 값을 지정해야 합니다. FeedRange는 파티션 키 값의 범위이며 해당 특정 FeedIterator를 사용하여 변경 피드에서 읽을 수 있는 항목을 지정합니다. 또한 변경 내용을 처리하려는 모드(최신 버전 또는 모든 버전 및 삭제)에 대해 필수 ChangeFeedMode 값을 지정해야 합니다. 변경 피드를 읽는 데 사용할 모드를 표시하려면 ChangeFeedMode.LatestVersion 또는 ChangeFeedMode.AllVersionsAndDeletes를 사용합니다. 모든 버전 및 삭제 모드를 사용하는 경우 Now() 값 또는 특정 연속 토큰에서 변경 피드 시작을 선택해야 합니다.

필요에 따라 ChangeFeedRequestOptions를 지정하여 PageSizeHint를 설정할 수 있습니다. 설정된 후 이 속성은 페이지당 수신되는 최대 항목 수를 설정합니다. 모니터링되는 컬렉션의 작업이 저장 프로시저를 통해 수행되는 경우 변경 피드에서 항목을 읽을 때 트랜잭션 범위가 유지됩니다. 결과적으로, 수신한 항목 수가 지정된 값보다 높을 수 있어 동일한 트랜잭션에 의해 변경된 항목이 하나의 원자성 일괄 처리의 일부로 반환됩니다.

다음은 엔터티 개체(이 경우에는 User 개체)를 반환하는 최신 버전 모드에서 FeedIterator를 가져오는 방법에 대한 예입니다.

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

버전 3.34.0 이전에는 ChangeFeedMode.Incremental을 설정하여 최신 버전 모드를 사용할 수 있습니다. IncrementalLatestVersion은 모두 변경 피드의 최신 버전 모드를 참조하며 두 모드 중 하나를 사용하는 애플리케이션은 동일한 동작을 보게 됩니다.

모든 버전 및 삭제 모드는 미리 보기 상태이며 미리 보기 .NET SDK 버전 >= 3.32.0-preview와 함께 사용할 수 있습니다. 다음은 동적 개체를 반환하는 모든 버전 및 삭제 모드에서 FeedIterator를 가져오는 예입니다.

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

참고 항목

최신 버전 모드에서는 일부 추가 메타데이터와 함께 변경된 항목을 나타내는 개체를 받습니다. 모든 버전 및 삭제 모드는 다른 데이터 모델을 반환합니다. 자세한 내용은 응답 개체 구문 분석을 참조하세요.

스트림을 통해 변경 피드 사용

두 변경 피드 모드 모두에 대한 FeedIterator에는 두 가지 옵션이 있습니다. 엔터티 개체를 반환하는 예제 외에, Stream 지원을 통해 응답을 얻을 수도 있습니다. 스트림을 사용하면 먼저 역직렬화하지 않고도 데이터를 읽을 수 있으므로 클라이언트 리소스가 절약됩니다.

다음은 Stream을 반환하는 최신 버전 모드에서 FeedIterator를 가져오는 방법의 예입니다.

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

전체 컨테이너에 대한 변경 내용 사용

FeedIteratorFeedRange 매개 변수를 제공하지 않으면 원하는 속도로 전체 컨테이너의 변경 피드를 처리할 수 있습니다. 다음은 최신 버전 모드를 사용하여 현재 시간부터 모든 변경 내용을 읽기 시작하는 예입니다.

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

변경 피드는 사실상 향후 모든 쓰기 및 업데이트를 포함하는 무한한 항목 목록이므로 HasMoreResults의 값은 항상 true입니다. 변경 피드를 읽으려고 할 때 사용 가능한 새 변경 내용이 없는 경우 NotModified 상태의 응답을 받게 됩니다. 앞의 예에서는 변경 내용을 다시 확인하기 전에 5초 동안 기다리는 방식으로 처리되었습니다.

파티션 키에 대한 변경 내용 사용

경우에 따라 특정 파티션 키에 대한 변경 내용만 처리하고 싶을 수도 있습니다. 특정 파티션 키에 대한 FeedIterator를 가져와서 전체 컨테이너와 같은 방식으로 변경 내용을 처리할 수 있습니다.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

병렬화를 위해 FeedRange 사용

변경 피드 프로세서에서 작업은 여러 소비자에게 자동으로 분산됩니다. 변경 피드 끌어오기 모델에서 FeedRange를 사용하여 변경 피드에 대한 처리를 병렬화할 수 있습니다. FeedRange는 파티션 키 값의 범위를 나타냅니다.

다음은 컨테이너의 범위 목록을 가져오는 방법을 보여 주는 예입니다.

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

컨테이너에 대한 FeedRange 값 목록을 가져오면 실제 파티션당 하나의 FeedRange를 가져옵니다.

FeedRange를 사용하면 여러 머신 또는 스레드의 변경 피드 처리를 병렬화하는 FeedIterator를 만들 수 있습니다. 전체 컨테이너에 또는 단일 파티션 키에 대해 FeedIterator를 얻는 방법을 보여주는 이전 예제와 달리, FeedRanges를 사용하면 변경 피드를 병렬로 처리할 수 있는 여러 FeedIterators를 얻을 수 있습니다.

FeedRanges를 사용하려는 경우에는 FeedRanges를 가져와서 해당 머신에 배포하는 오케스트레이터 프로세스가 필요합니다. 이 배포는 다음과 같습니다.

  • FeedRange.ToJsonString을 사용하고 이 문자열 값을 배포합니다. 소비자는 이 값을 FeedRange.FromJsonString과 함께 사용할 수 있습니다.
  • 배포를 처리 중이면 FeedRange 개체 참조를 전달하는 것입니다.

다음은 병렬로 읽는 두 개의 가상의 별도 컴퓨터를 사용하여 컨테이너의 변경 피드 시작 부분부터 읽는 방법을 보여 주는 샘플입니다.

머신 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

머신 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

연속 토큰 저장

연속 토큰을 획득하여 FeedIterator의 위치를 저장할 수 있습니다. 연속 토큰은 FeedIterator가 마지막으로 처리한 변경 내용의 지점을 추적하고 나중에 이 지점에서 FeedIterator을 다시 시작할 수 있도록 하는 문자열 값입니다. 연속 토큰이 지정된 경우 시작 시간보다 우선하며 시작 값부터 시작됩니다. 다음 코드는 컨테이너 만들기 이후 변경 피드를 읽습니다. 변경 내용이 더 이상 없으면 연속 토큰을 유지하여 변경 피드 사용을 나중에 재개할 수 있도록 합니다.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

최신 버전 모드를 사용하는 경우 Azure Cosmos DB 컨테이너가 여전히 존재하는 한 FeedIterator 연속 토큰은 만료되지 않습니다. 모든 버전 및 삭제 모드를 사용하는 경우 지속적인 백업을 위한 보존 기간 내에 변경 내용이 발생하는 한 FeedIterator 연속 토큰이 유효합니다.

다음 단계