Migrer de la bibliothèque du processeur de flux de modification vers le kit de développement logiciel (SDK) Azure Cosmos DB .NET V3
S’APPLIQUE À : NoSQL
Cet article décrit la procédure de migration d'un code d'application existante utilisant la bibliothèque du processeur de flux de modification vers la fonctionnalité de flux de modification de la dernière version du kit de développement logiciel (SDK) .NET (également appelé kit de développement logiciel (SDK) .NET V3).
Modifications de code requises
Le kit de développement logiciel (SDK) .NET V3 présente plusieurs changements cassants et vous trouverez ci-dessous les principales étapes pour migrer votre application :
- Convertissez les instances
DocumentCollectionInfo
en référencesContainer
pour les conteneurs surveillés et de baux. - Les personnalisations utilisant
WithProcessorOptions
doivent être mises à jour pour utiliserWithLeaseConfiguration
etWithPollInterval
pour les intervalles,WithStartTime
pour l'heure de début etWithMaxItems
pour définir le nombre maximal d’éléments. - Définissez
processorName
surGetChangeFeedProcessorBuilder
à des fins de correspondance avec la valeur configurée surChangeFeedProcessorOptions.LeasePrefix
ou utilisezstring.Empty
. - Les modifications ne sont plus fournies sous forme de
IReadOnlyList<Document>
, mais deIReadOnlyCollection<T>
oùT
correspond à un type que vous devez définir. Il n'existe plus de classe d'élément de base. - Pour gérer les modifications, vous n’avez plus besoin d’une implémentation de
IChangeFeedObserver
, mais vous devez définir un délégué. Le délégué peut être une fonction statique ou, s'il vous faut maintenir l'état entre les exécutions vous pouvez créer votre propre classe et transmettre une méthode d’instance en tant que délégué.
Par exemple, si le code d’origine pour créer le processeur de flux de modification se présente comme suit :
ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.monitoredContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.leasesContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
.WithHostName("consoleHost")
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
{
StartFromBeginning = true,
LeasePrefix = "MyLeasePrefix",
MaxItemCount = 10,
FeedPollDelay = TimeSpan.FromSeconds(1)
})
.WithFeedCollection(monitoredCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithObserver<ChangeFeedObserver>()
.BuildAsync();
Le code migré se présentera comme suit :
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithMaxItems(10)
.WithPollInterval(TimeSpan.FromSeconds(1))
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
Pour le délégué, vous pouvez avoir une méthode statique pour recevoir les événements. Si vous avez consommé des informations à partir du IChangeFeedObserverContext
vous pouvez migrer pour utiliser le ChangeFeedProcessorContext
:
ChangeFeedProcessorContext.LeaseToken
peut être utilisé à la place deIChangeFeedObserverContext.PartitionKeyRangeId
ChangeFeedProcessorContext.Headers
peut être utilisé à la place deIChangeFeedObserverContext.FeedResponse
ChangeFeedProcessorContext.Diagnostics
contient des informations détaillées sur la latence des demandes de résolution des problèmes
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate work
await Task.Delay(1);
}
}
Événements d’intégrité et observabilité
Si vous utilisiez IHealthMonitor
précédemment ou si vous utilisiez IChangeFeedObserver.OpenAsync
et IChangeFeedObserver.CloseAsync
, utilisez l’API Notifications.
IChangeFeedObserver.OpenAsync
peut être remplacé parWithLeaseAcquireNotification
.IChangeFeedObserver.CloseAsync
peut être remplacé parWithLeaseReleaseNotification
.IHealthMonitor.InspectAsync
peut être remplacé parWithErrorNotification
.
État et conteneur de baux
À l’instar de la bibliothèque du processeur de flux de modification, la fonctionnalité de flux de modification du kit de développement logiciel (SDK) .NET V3 utilise un conteneur de baux pour stocker l’état. Toutefois, les schémas sont différents.
Le processeur de flux de modification du kit de développement logiciel (SDK) V3 détecte tout état de bibliothèque antérieur et le migre automatiquement vers le nouveau schéma lors de la première exécution du code de l’application migrée.
Vous pouvez arrêter en toute sécurité l’application à l’aide de l’ancien code, migrer le code vers la nouvelle version, démarrer l’application migrée, après quoi toutes les modifications apportées pendant l’arrêt de l’application sont récupérées et traitées par la nouvelle version.
Ressources supplémentaires
Étapes suivantes
Pour plus d’informations sur le processeur de flux de modification, consultez les articles suivants :
- Vue d’ensemble du processeur de flux de modification
- Utilisation de l’estimateur de flux de modification
- Heure de début du processeur de flux de modification
- Vous tentez d’effectuer une planification de la capacité pour une migration vers Azure Cosmos DB ?
- Si vous ne connaissez que le nombre de vCores et de serveurs présents dans votre cluster de bases de données existant, lisez Estimation des unités de requête à l’aide de vCores ou de processeurs virtuels
- Si vous connaissez les taux de requêtes typiques de votre charge de travail de base de données actuelle, lisez la section concernant l’estimation des unités de requête à l’aide du planificateur de capacité Azure Cosmos DB