Migrer de la bibliothèque d’exécuteur en bloc vers la prise en charge de l’exécution en bloc dans le SDK Azure Cosmos DB .NET v3
S’APPLIQUE À : NoSQL
Cet article décrit les étapes nécessaires pour migrer le code d’une application existante qui utilise la bibliothèque d’exécuteur en bloc .NET vers la fonctionnalité de prise en charge de l’exécution en bloc de la dernière version du SDK .NET.
Activer la prise en charge de l’exécution en bloc
Activez la prise en charge de l’exécution en bloc sur l’instance de CosmosClient
par le biais de la configuration AllowBulkExecution :
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Créer des tâches pour chaque opération
La prise en charge de l’exécution en bloc dans le SDK .NET fonctionne en tirant parti de la bibliothèque parallèle de tâches et des opérations de regroupement qui se produisent simultanément.
Il n’existe pas de méthode unique dans le Kit de développement logiciel (SDK) qui prend votre liste de documents ou d’opérations comme paramètre d’entrée. Au lieu de cela, vous devez créer une tâche pour chaque opération que vous souhaitez exécuter en bloc, puis attendre qu’elles se terminent.
Par exemple, si votre entrée initiale est une liste d’éléments où chaque élément a le schéma suivant :
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Si vous souhaitez effectuer une importation en bloc (comme lors de l’utilisation de BulkExecutor.BulkImportAsync), vous devez effectuer des appels simultanés à CreateItemAsync
. Par exemple :
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Si vous souhaitez effectuer une mise à jour en bloc (comme lors de l’utilisation de BulkExecutor.BulkUpdateAsync), vous devez effectuer des appels simultanés à la méthode ReplaceItemAsync
après la mise à jour de la valeur de l’élément. Par exemple :
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
Et si vous souhaitez effectuer une suppression en bloc (comme lors de l’utilisation de BulkExecutor.BulkDeleteAsync), vous devez effectuer des appels simultanés à DeleteItemAsync
, avec l’id
et la clé de partition de chaque élément. Par exemple :
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Capturer l’état du résultat de la tâche
Dans les exemples de code précédents, nous avons créé une liste de tâches simultanées et appelé la méthode CaptureOperationResponse
sur chacune de ces tâches. Cette méthode est une extension qui nous permet de conserver un schéma de réponse similaire à BulkExecutor, en capturant les erreurs et en effectuant le suivi de l’utilisation des unités de requête.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
Où OperationResponse
est déclaré comme suit :
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Exécuter des opérations simultanément
Pour suivre l’étendue de la liste complète des tâches, nous utilisons cette classe d’assistance :
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
La méthode ExecuteAsync
attendra que toutes les opérations soient terminées et vous pourrez l’utiliser comme suit :
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Capturer les statistiques
Le code précédent attend que toutes les opérations soient terminées et calcule les statistiques requises. Ces statistiques sont similaires à celles de la classe BulkImportResponse de la bibliothèque d’exécuteur en bloc.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
BulkOperationResponse
contient les éléments suivants :
- Temps total nécessaire pour traiter la liste des opérations par le biais de la prise en charge de l’exécution en bloc
- Nombre d’opérations réussies
- Nombre total d’unités de requête consommées
- En cas d’échec, il affiche une liste de tuples qui contiennent l’exception et l’élément associé à des fins de journalisation et d’identification
Configuration de nouvelle tentative
La bibliothèque d’exécuteur en bloc contenait des recommandations indiquant qu’il fallait affecter la valeur 0
aux paramètres MaxRetryWaitTimeInSeconds
et MaxRetryAttemptsOnThrottledRequests
de RetryOptions pour déléguer le contrôle à la bibliothèque.
Pour la prise en charge de l’exécution en bloc dans le SDK .NET, il n’existe aucun comportement masqué. Vous pouvez configurer les options de nouvelle tentative directement par le biais de CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests et CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Notes
Dans les cas où les unités de requête provisionnées sont beaucoup plus faibles que celles attendues d’après la quantité de données, vous pouvez les définir sur des valeurs élevées. L’opération en bloc prendra plus de temps, mais elle aura plus de chances de se terminer correctement en raison du nombre plus élevé de nouvelles tentatives.
Optimisation des performances
Comme pour d’autres opérations effectuées avec le SDK .NET, l’utilisation des API de flux améliore les performances et évite toute sérialisation inutile.
L’utilisation des API de flux n’est possible que si la nature des données que vous utilisez correspond à celle d’un flux d’octets (par exemple des flux de fichiers). Dans ce cas, l’utilisation des méthodes CreateItemStreamAsync
, ReplaceItemStreamAsync
ou DeleteItemStreamAsync
et l’utilisation de ResponseMessage
(au lieu de ItemResponse
) augmente le débit qui peut être atteint.
Étapes suivantes
- Pour en savoir plus sur les versions du SDK .NET, consultez l’article Kit SDK Azure Cosmos DB.
- Procurez-vous le code source de migration complet sur le site web de GitHub.
- Exemples supplémentaires d’exécution en bloc sur GitHub
- Vous tentez d’effectuer une planification de la capacité pour une migration vers Azure Cosmos DB ?
- Si vous ne connaissez que le nombre de vCores et de serveurs présents dans votre cluster de bases de données existant, lisez Estimation des unités de requête à l’aide de vCores ou de processeurs virtuels
- Si vous connaissez les taux de requêtes typiques de votre charge de travail de base de données actuelle, lisez la section concernant l’estimation des unités de requête à l’aide du planificateur de capacité Azure Cosmos DB