Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questo articolo descrive i passaggi necessari per eseguire la migrazione del codice di un'applicazione esistente che usa la libreria dell'esecutore bulk .NET alla funzionalità di supporto bulk nella versione più recente della .NET SDK.
Abilitare il supporto in blocco
Abilitare il supporto bulk nell'istanza CosmosClient tramite la configurazione AllowBulkExecution :
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Creare attività per ogni operazione
Il supporto in blocco in .NET SDK funziona sfruttando la libreria parallela delle attività e le operazioni di raggruppamento che si verificano simultaneamente.
Non esiste un singolo metodo nell'SDK che accetta l'elenco di documenti o operazioni come parametro di input, ma è piuttosto necessario creare un'attività per ogni operazione che si vuole eseguire in blocco e quindi attendere semplicemente il completamento.
Ad esempio, se l'input iniziale è un elenco di elementi in cui ogni elemento ha lo schema seguente:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Se si vuole eseguire l'importazione in blocco (simile all'uso di BulkExecutor.BulkImportAsync), è necessario avere chiamate simultanee a CreateItemAsync. Per esempio:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Se si vuole eseguire l'aggiornamento in blocco (simile all'uso di BulkExecutor.BulkUpdateAsync), è necessario disporre di chiamate simultanee al ReplaceItemAsync metodo dopo l'aggiornamento del valore dell'elemento. Per esempio:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
Se si vuole eseguire l'eliminazione in blocco (analogamente all'uso di BulkExecutor.BulkDeleteAsync), è necessario avere chiamate simultanee a DeleteItemAsync, con la id chiave di partizione e di ogni elemento. Per esempio:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Acquisire lo stato del risultato dell'attività
Negli esempi di codice precedenti è stato creato un elenco simultaneo di attività e viene chiamato il CaptureOperationResponse metodo in ognuna di queste attività. Questo metodo è un'estensione che consente di mantenere uno schema di risposta simile a BulkExecutor, acquisendo eventuali errori e monitorando l'utilizzo delle unità richiesta.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
dove il OperationResponse è dichiarato come:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Eseguire operazioni contemporaneamente
Per tenere traccia dell'ambito dell'intero elenco di attività, viene usata questa classe helper:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
Il ExecuteAsync metodo attenderà il completamento di tutte le operazioni ed è possibile usarlo come segue:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Acquisire statistiche
Il codice precedente attende il completamento di tutte le operazioni e calcola le statistiche necessarie. Queste statistiche sono simili a quelle della BulkImportResponse della libreria bulk executor.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
BulkOperationResponse Contiene:
- Tempo totale impiegato per elaborare l'elenco delle operazioni tramite il supporto per l'elaborazione in blocco.
- Numero di operazioni riuscite.
- Totale delle unità richiesta utilizzate.
- In caso di errori, viene visualizzato un elenco di tuple che contengono l'eccezione e l'elemento associato per la registrazione e l'identificazione.
Riprovare a configurare
La libreria dell'esecutore bulk includeva indicazioni su come impostare i parametri e le opzioni di RetryOptions a 0 per delegare il controllo alla libreria.
Per il supporto bulk in .NET SDK, non esiste alcun comportamento nascosto. È possibile configurare le opzioni di ripetizione dei tentativi direttamente tramite CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests e CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Annotazioni
Nei casi in cui le unità richiesta di cui è stato effettuato il provisioning sono molto inferiori al previsto in base alla quantità di dati, è consigliabile impostare questi valori su valori elevati. L'operazione in blocco richiederà più tempo, ma ha una maggiore probabilità di successo a causa di un numero maggiore di tentativi.
Miglioramenti delle prestazioni
Come per altre operazioni con .NET SDK, l'uso delle API di flusso comporta prestazioni migliori ed evita qualsiasi serializzazione non necessaria.
L'uso delle API di flusso è possibile solo se la natura dei dati usati corrisponde a quella di un flusso di byte (ad esempio, flussi di file). In questi casi, l'uso dei CreateItemStreamAsyncmetodi , ReplaceItemStreamAsynco DeleteItemStreamAsync e l'uso di ResponseMessage (invece di ItemResponse) aumenta la velocità effettiva che è possibile ottenere.
Passaggi successivi
- Per altre informazioni sulle versioni di .NET SDK, vedere l'articolo Azure Cosmos DB SDK .
- Ottenere il codice sorgente di migrazione completo da GitHub.
- Esempi in blocco aggiuntivi su GitHub
- Si sta tentando di pianificare la capacità per una migrazione ad Azure Cosmos DB?
- Se si conosce solo il numero di vCore e server nel cluster di database esistente, leggere le informazioni sulla Stima delle unità richieste usando vCore o vCPU
- Se si conosce la frequenza delle richieste tipiche per il carico di lavoro corrente del database, leggere le informazioni sulla stima delle unità richieste con lo strumento di pianificazione della capacità di Azure Cosmos DB