Condividi tramite


Eseguire la migrazione dalla libreria 'Bulk Executor' al supporto per operazioni bulk nell'SDK .NET V3 di Azure Cosmos DB.

Questo articolo descrive i passaggi necessari per eseguire la migrazione del codice di un'applicazione esistente che usa la libreria dell'esecutore bulk .NET alla funzionalità di supporto bulk nella versione più recente della .NET SDK.

Abilitare il supporto in blocco

Abilitare il supporto bulk nell'istanza CosmosClient tramite la configurazione AllowBulkExecution :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Creare attività per ogni operazione

Il supporto in blocco in .NET SDK funziona sfruttando la libreria parallela delle attività e le operazioni di raggruppamento che si verificano simultaneamente.

Non esiste un singolo metodo nell'SDK che accetta l'elenco di documenti o operazioni come parametro di input, ma è piuttosto necessario creare un'attività per ogni operazione che si vuole eseguire in blocco e quindi attendere semplicemente il completamento.

Ad esempio, se l'input iniziale è un elenco di elementi in cui ogni elemento ha lo schema seguente:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Se si vuole eseguire l'importazione in blocco (simile all'uso di BulkExecutor.BulkImportAsync), è necessario avere chiamate simultanee a CreateItemAsync. Per esempio:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Se si vuole eseguire l'aggiornamento in blocco (simile all'uso di BulkExecutor.BulkUpdateAsync), è necessario disporre di chiamate simultanee al ReplaceItemAsync metodo dopo l'aggiornamento del valore dell'elemento. Per esempio:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

Se si vuole eseguire l'eliminazione in blocco (analogamente all'uso di BulkExecutor.BulkDeleteAsync), è necessario avere chiamate simultanee a DeleteItemAsync, con la id chiave di partizione e di ogni elemento. Per esempio:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Acquisire lo stato del risultato dell'attività

Negli esempi di codice precedenti è stato creato un elenco simultaneo di attività e viene chiamato il CaptureOperationResponse metodo in ognuna di queste attività. Questo metodo è un'estensione che consente di mantenere uno schema di risposta simile a BulkExecutor, acquisendo eventuali errori e monitorando l'utilizzo delle unità richiesta.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

dove il OperationResponse è dichiarato come:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Eseguire operazioni contemporaneamente

Per tenere traccia dell'ambito dell'intero elenco di attività, viene usata questa classe helper:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

Il ExecuteAsync metodo attenderà il completamento di tutte le operazioni ed è possibile usarlo come segue:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Acquisire statistiche

Il codice precedente attende il completamento di tutte le operazioni e calcola le statistiche necessarie. Queste statistiche sono simili a quelle della BulkImportResponse della libreria bulk executor.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

BulkOperationResponse Contiene:

  1. Tempo totale impiegato per elaborare l'elenco delle operazioni tramite il supporto per l'elaborazione in blocco.
  2. Numero di operazioni riuscite.
  3. Totale delle unità richiesta utilizzate.
  4. In caso di errori, viene visualizzato un elenco di tuple che contengono l'eccezione e l'elemento associato per la registrazione e l'identificazione.

Riprovare a configurare

La libreria dell'esecutore bulk includeva indicazioni su come impostare i parametri e le opzioni di RetryOptions a 0 per delegare il controllo alla libreria.

Per il supporto bulk in .NET SDK, non esiste alcun comportamento nascosto. È possibile configurare le opzioni di ripetizione dei tentativi direttamente tramite CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests e CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Annotazioni

Nei casi in cui le unità richiesta di cui è stato effettuato il provisioning sono molto inferiori al previsto in base alla quantità di dati, è consigliabile impostare questi valori su valori elevati. L'operazione in blocco richiederà più tempo, ma ha una maggiore probabilità di successo a causa di un numero maggiore di tentativi.

Miglioramenti delle prestazioni

Come per altre operazioni con .NET SDK, l'uso delle API di flusso comporta prestazioni migliori ed evita qualsiasi serializzazione non necessaria.

L'uso delle API di flusso è possibile solo se la natura dei dati usati corrisponde a quella di un flusso di byte (ad esempio, flussi di file). In questi casi, l'uso dei CreateItemStreamAsyncmetodi , ReplaceItemStreamAsynco DeleteItemStreamAsync e l'uso di ResponseMessage (invece di ItemResponse) aumenta la velocità effettiva che è possibile ottenere.

Passaggi successivi