다음을 통해 공유


변경 피드 프로세서 라이브러리에서 Azure Cosmos DB .NET V3 SDK로 마이그레이션

적용 대상: NoSQL

이 문서에서는 변경 피드 프로세서 라이브러리를 사용하는 기존 애플리케이션 코드를 최신 버전의 .NET SDK(.NET V3 SDK라고도 함)의 변경 피드 기능으로 마이그레이션하는 데 필요한 단계를 설명합니다.

필요한 코드 변경

.NET V3 SDK에는 몇 가지 주요 변경 내용이 포함되어 있으며, 애플리케이션을 마이그레이션하는 주요 단계는 다음과 같습니다.

  1. DocumentCollectionInfo 인스턴스를 모니터링 대상 및 임대 컨테이너에 대한 Container 참조로 변환합니다.
  2. 사용하는 WithProcessorOptions 사용자 지정은 사용 WithLeaseConfigurationWithPollInterval 간격, WithStartTime 시작 시간WithMaxItems 최대 항목 수를 정의하도록 업데이트해야 합니다.
  3. ChangeFeedProcessorOptions.LeasePrefix에 구성된 값과 일치하도록 GetChangeFeedProcessorBuilder에 대해 processorName을 설정하고, 그렇지 않은 경우 string.Empty를 사용합니다.
  4. 변경 내용은 더 이상 IReadOnlyList<Document>로 전달되지 않습니다. 대신 IReadOnlyCollection<T>입니다. 여기서 T는 정의해야 하는 형식이며, 기본 항목 클래스는 더 이상 없습니다.
  5. 변경 내용을 처리하기 위해 더 이상 IChangeFeedObserver의 구현이 필요하지 않으며 대신 대리자를 정의해야 합니다. 대리자는 고정 함수이거나 실행 간에 상태를 유지해야 하는 경우 고유한 클래스를 만들고 인스턴스 메서드를 대리자로 전달할 수 있습니다.

예를 들어, 변경 피드 프로세서를 빌드하는 원래 코드는 다음과 같습니다.

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

마이그레이션된 코드는 다음과 같습니다.

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

대리자의 경우 이벤트를 수신하는 정적 메서드를 사용할 수 있습니다. IChangeFeedObserverContext에서 정보를 사용하는 경우 ChangeFeedProcessorContext를 사용하도록 마이그레이션할 수 있습니다.

  • IChangeFeedObserverContext.PartitionKeyRangeId 대신 ChangeFeedProcessorContext.LeaseToken를 사용할 수 있음
  • IChangeFeedObserverContext.FeedResponse 대신 ChangeFeedProcessorContext.Headers를 사용할 수 있음
  • ChangeFeedProcessorContext.Diagnostics에는 문제 해결을 위한 요청 대기 시간에 대한 자세한 정보가 포함되어 있음
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

상태 이벤트 및 관찰 가능성

이전에 사용 IHealthMonitor 했거나 활용 IChangeFeedObserver.OpenAsyncIChangeFeedObserver.CloseAsync경우 알림 API사용합니다.

  • IChangeFeedObserver.OpenAsync 를 으로 WithLeaseAcquireNotification바꿀 수 있습니다.
  • IChangeFeedObserver.CloseAsync 를 으로 WithLeaseReleaseNotification바꿀 수 있습니다.
  • IHealthMonitor.InspectAsync 를 으로 WithErrorNotification바꿀 수 있습니다.

상태 및 임대 컨테이너

변경 피드 프로세서 라이브러리와 마찬가지로 .NET V3 SDK의 변경 피드 기능은 임대 컨테이너를 사용하여 상태를 저장합니다. 그러나 스키마는 다릅니다.

SDK V3 변경 피드 프로세서는 마이그레이션된 애플리케이션 코드를 처음 실행할 때 모든 이전 라이브러리 상태를 검색하고 새 스키마로 자동으로 마이그레이션합니다.

이전 코드를 사용하여 애플리케이션을 안전하게 중지하고, 코드를 새 버전으로 마이그레이션하고, 마이그레이션된 애플리케이션을 시작하고, 애플리케이션이 중지되는 동안 발생한 모든 변경 내용을 새 버전에서 선택하여 처리합니다.

추가 리소스

다음 단계

이제 다음 문서에서 변경 피드 프로세서에 대해 자세히 알아볼 수 있습니다.