Migrera från ändringsflödesprocessorbiblioteket till Azure Cosmos DB .NET V3 SDK
GÄLLER FÖR: NoSQL
Den här artikeln beskriver de steg som krävs för att migrera ett befintligt programs kod som använder ändringsflödesprocessorbiblioteket till ändringsflödesfunktionen i den senaste versionen av .NET SDK (kallas även .NET V3 SDK).
Nödvändiga kodändringar
.NET V3 SDK har flera icke-bakåtkompatibla ändringar. Följande är de viktigaste stegen för att migrera ditt program:
DocumentCollectionInfo
Konvertera instanserna tillContainer
referenser för de övervakade containrarna och lånar containrar.- Anpassningar som används
WithProcessorOptions
bör uppdateras för användningWithLeaseConfiguration
ochWithPollInterval
för intervall,WithStartTime
för starttid ochWithMaxItems
för att definiera det maximala antalet objekt. - Ange på
processorName
GetChangeFeedProcessorBuilder
för att matcha det värde som konfigurerats förChangeFeedProcessorOptions.LeasePrefix
, eller användstring.Empty
på annat sätt. - Ändringarna levereras inte längre som en
IReadOnlyList<Document>
, i stället är det enIReadOnlyCollection<T>
varT
är en typ som du behöver definiera, det finns ingen basobjektklass längre. - För att hantera ändringarna behöver du inte längre en implementering av
IChangeFeedObserver
i stället för att definiera ett ombud. Ombudet kan vara en statisk funktion, eller så kan du skapa en egen klass och skicka en instansmetod som ombud om du behöver underhålla tillståndet mellan körningarna.
Om till exempel den ursprungliga koden för att skapa ändringsflödesprocessorn ser ut så här:
ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.monitoredContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.leasesContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
.WithHostName("consoleHost")
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
{
StartFromBeginning = true,
LeasePrefix = "MyLeasePrefix",
MaxItemCount = 10,
FeedPollDelay = TimeSpan.FromSeconds(1)
})
.WithFeedCollection(monitoredCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithObserver<ChangeFeedObserver>()
.BuildAsync();
Den migrerade koden ser ut så här:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithMaxItems(10)
.WithPollInterval(TimeSpan.FromSeconds(1))
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
För ombudet kan du ha en statisk metod för att ta emot händelserna. Om du använder information från IChangeFeedObserverContext
kan du migrera för att använda ChangeFeedProcessorContext
:
ChangeFeedProcessorContext.LeaseToken
kan användas i stället förIChangeFeedObserverContext.PartitionKeyRangeId
ChangeFeedProcessorContext.Headers
kan användas i stället förIChangeFeedObserverContext.FeedResponse
ChangeFeedProcessorContext.Diagnostics
innehåller detaljerad information om svarstid för begäranden för felsökning
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate work
await Task.Delay(1);
}
}
Hälsohändelser och observerbarhet
Om du tidigare använde IHealthMonitor
eller använde IChangeFeedObserver.OpenAsync
och IChangeFeedObserver.CloseAsync
använder du API:et Meddelanden.
IChangeFeedObserver.OpenAsync
kan ersättas medWithLeaseAcquireNotification
.IChangeFeedObserver.CloseAsync
kan ersättas medWithLeaseReleaseNotification
.IHealthMonitor.InspectAsync
kan ersättas medWithErrorNotification
.
Tillstånd och lånecontainer
På samma sätt som ändringsflödesprocessorbiblioteket använder funktionen för ändringsflöde i .NET V3 SDK en lånecontainer för att lagra tillståndet. Schemana skiljer sig dock åt.
SDK V3-ändringsflödesprocessorn identifierar alla gamla bibliotekstillstånd och migrerar det automatiskt till det nya schemat vid den första körningen av den migrerade programkoden.
Du kan på ett säkert sätt stoppa programmet med den gamla koden, migrera koden till den nya versionen, starta det migrerade programmet och eventuella ändringar som inträffade när programmet stoppades hämtas och bearbetas av den nya versionen.
Ytterligare resurser
Nästa steg
Du kan nu fortsätta med att lära dig mer om ändringsflödesprocessorn i följande artiklar:
- Översikt över ändringsflödesprocessor
- Använda ändringsflödesestimatorn
- Starttid för ändringsflödesprocessor
- Försöker du planera kapacitet för en migrering till Azure Cosmos DB?
- Om allt du vet är antalet virtuella kärnor och servrar i ditt befintliga databaskluster läser du om att uppskatta enheter för begäranden med virtuella kärnor eller virtuella kärnor
- Om du känner till vanliga begärandefrekvenser för din aktuella databasarbetsbelastning kan du läsa om att uppskatta enheter för begäranden med azure Cosmos DB-kapacitetshanteraren