你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
从更改源处理器库迁移到 Azure Cosmos DB .NET V3 SDK
适用范围: NoSQL
本文介绍了将使用更改源处理器库的现有应用程序代码迁移到 .NET SDK 最新版本(也称为 .NET V3 SDK)中的更改源功能所需的步骤。
所需的代码更改
.NET V3 SDK 有几项中断性变更,以下是迁移应用程序的关键步骤:
- 将
DocumentCollectionInfo
实例转换为受监视容器和租约容器的Container
引用。 - 使用
WithProcessorOptions
的自定义应更新为对时间间隔使用WithLeaseConfiguration
和WithPollInterval
、对开始时间使用WithStartTime
以及使用WithMaxItems
来定义最大项计数。 - 将
GetChangeFeedProcessorBuilder
上的processorName
设置为与ChangeFeedProcessorOptions.LeasePrefix
上配置的值匹配,否则使用string.Empty
。 - 更改将不再作为
IReadOnlyList<Document>
传递,而是作为IReadOnlyCollection<T>
,其中T
是需要定义的类型,任何基项类都不再存在。 - 若要处理更改,不再需要实现
IChangeFeedObserver
,而是需要定义委托。 委托可以是静态函数,或者,如果需要跨执行维护状态,还可以创建自己的类并传递实例方法作为委托。
例如,如果用于生成更改源处理器的原始代码如下所示:
ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.monitoredContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.leasesContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
.WithHostName("consoleHost")
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
{
StartFromBeginning = true,
LeasePrefix = "MyLeasePrefix",
MaxItemCount = 10,
FeedPollDelay = TimeSpan.FromSeconds(1)
})
.WithFeedCollection(monitoredCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithObserver<ChangeFeedObserver>()
.BuildAsync();
迁移的代码将如下所示:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithMaxItems(10)
.WithPollInterval(TimeSpan.FromSeconds(1))
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
对于委托,可以使用静态方法来接收事件。 如果使用的是 IChangeFeedObserverContext
中的信息,可以迁移以使用 ChangeFeedProcessorContext
:
- 可以使用
ChangeFeedProcessorContext.LeaseToken
,而不是使用IChangeFeedObserverContext.PartitionKeyRangeId
- 可以使用
ChangeFeedProcessorContext.Headers
,而不是使用IChangeFeedObserverContext.FeedResponse
ChangeFeedProcessorContext.Diagnostics
包含有关故障排除的请求延迟的详细信息
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate work
await Task.Delay(1);
}
}
健康状况事件和可观测性
如果以前使用过 IHealthMonitor
或利用过 IChangeFeedObserver.OpenAsync
和 IChangeFeedObserver.CloseAsync
,请使用通知 API。
IChangeFeedObserver.OpenAsync
可以替换为WithLeaseAcquireNotification
。IChangeFeedObserver.CloseAsync
可以替换为WithLeaseReleaseNotification
。IHealthMonitor.InspectAsync
可以替换为WithErrorNotification
。
状态和租约容器
与更改源处理器库类似,.NET V3 SDK 中的更改源功能使用租约容器来存储状态。 但是,架构是不同的。
SDK V3 更改源处理器将在首次执行迁移的应用程序代码时自动检测任何旧的库状态并将其迁移到新的架构。
可以使用旧代码安全地停止应用程序,将代码迁移到新版本,启动已迁移的应用程序,而在应用程序停止时所发生的任何更改都将由新版本选取并处理。
其他资源
后续步骤
现在,可以通过以下文章继续详细了解更改源处理器:
- 更改源处理器概述
- 使用更改源估算器
- 更改源处理器开始时间
- 正在尝试为迁移到 Azure Cosmos DB 进行容量计划?
- 如果只知道现有数据库群集中的 vCore 和服务器数量,请阅读使用 vCore 或 vCPU 估算请求单位
- 若知道当前数据库工作负载的典型请求速率,请阅读使用 Azure Cosmos DB 容量计划工具估算请求单位