Lire en anglais

Partager via


Migrer de la bibliothèque d’exécuteur en bloc vers la prise en charge de l’exécution en bloc dans le SDK Azure Cosmos DB .NET v3

S’APPLIQUE À : NoSQL

Cet article décrit les étapes nécessaires pour migrer le code d’une application existante qui utilise la bibliothèque d’exécuteur en bloc .NET vers la fonctionnalité de prise en charge de l’exécution en bloc de la dernière version du SDK .NET.

Activer la prise en charge de l’exécution en bloc

Activez la prise en charge de l’exécution en bloc sur l’instance de CosmosClient par le biais de la configuration AllowBulkExecution :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Créer des tâches pour chaque opération

La prise en charge de l’exécution en bloc dans le SDK .NET fonctionne en tirant parti de la bibliothèque parallèle de tâches et des opérations de regroupement qui se produisent simultanément.

Il n’existe pas de méthode unique dans le Kit de développement logiciel (SDK) qui prend votre liste de documents ou d’opérations comme paramètre d’entrée. Au lieu de cela, vous devez créer une tâche pour chaque opération que vous souhaitez exécuter en bloc, puis attendre qu’elles se terminent.

Par exemple, si votre entrée initiale est une liste d’éléments où chaque élément a le schéma suivant :

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Si vous souhaitez effectuer une importation en bloc (comme lors de l’utilisation de BulkExecutor.BulkImportAsync), vous devez effectuer des appels simultanés à CreateItemAsync. Par exemple :

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Si vous souhaitez effectuer une mise à jour en bloc (comme lors de l’utilisation de BulkExecutor.BulkUpdateAsync), vous devez effectuer des appels simultanés à la méthode ReplaceItemAsync après la mise à jour de la valeur de l’élément. Par exemple :

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

Et si vous souhaitez effectuer une suppression en bloc (comme lors de l’utilisation de BulkExecutor.BulkDeleteAsync), vous devez effectuer des appels simultanés à DeleteItemAsync, avec l’id et la clé de partition de chaque élément. Par exemple :

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Capturer l’état du résultat de la tâche

Dans les exemples de code précédents, nous avons créé une liste de tâches simultanées et appelé la méthode CaptureOperationResponse sur chacune de ces tâches. Cette méthode est une extension qui nous permet de conserver un schéma de réponse similaire à BulkExecutor, en capturant les erreurs et en effectuant le suivi de l’utilisation des unités de requête.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

OperationResponse est déclaré comme suit :

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Exécuter des opérations simultanément

Pour suivre l’étendue de la liste complète des tâches, nous utilisons cette classe d’assistance :

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

La méthode ExecuteAsync attendra que toutes les opérations soient terminées et vous pourrez l’utiliser comme suit :

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Capturer les statistiques

Le code précédent attend que toutes les opérations soient terminées et calcule les statistiques requises. Ces statistiques sont similaires à celles de la classe BulkImportResponse de la bibliothèque d’exécuteur en bloc.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

BulkOperationResponse contient les éléments suivants :

  1. Temps total nécessaire pour traiter la liste des opérations par le biais de la prise en charge de l’exécution en bloc
  2. Nombre d’opérations réussies
  3. Nombre total d’unités de requête consommées
  4. En cas d’échec, il affiche une liste de tuples qui contiennent l’exception et l’élément associé à des fins de journalisation et d’identification

Configuration de nouvelle tentative

La bibliothèque d’exécuteur en bloc contenait des recommandations indiquant qu’il fallait affecter la valeur 0 aux paramètres MaxRetryWaitTimeInSeconds et MaxRetryAttemptsOnThrottledRequests de RetryOptions pour déléguer le contrôle à la bibliothèque.

Pour la prise en charge de l’exécution en bloc dans le SDK .NET, il n’existe aucun comportement masqué. Vous pouvez configurer les options de nouvelle tentative directement par le biais de CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests et CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Notes

Dans les cas où les unités de requête provisionnées sont beaucoup plus faibles que celles attendues d’après la quantité de données, vous pouvez les définir sur des valeurs élevées. L’opération en bloc prendra plus de temps, mais elle aura plus de chances de se terminer correctement en raison du nombre plus élevé de nouvelles tentatives.

Optimisation des performances

Comme pour d’autres opérations effectuées avec le SDK .NET, l’utilisation des API de flux améliore les performances et évite toute sérialisation inutile.

L’utilisation des API de flux n’est possible que si la nature des données que vous utilisez correspond à celle d’un flux d’octets (par exemple des flux de fichiers). Dans ce cas, l’utilisation des méthodes CreateItemStreamAsync, ReplaceItemStreamAsync ou DeleteItemStreamAsync et l’utilisation de ResponseMessage (au lieu de ItemResponse) augmente le débit qui peut être atteint.

Étapes suivantes