Usare la libreria .NET di esecuzione bulk per eseguire operazioni in bulk in Azure Cosmos DB
SI APPLICA A: NoSQL
Nota
La libreria di esecuzione bulk descritta in questo articolo viene gestita per applicazioni che usano la versione 2.x di .NET SDK. Per le nuove applicazioni, è possibile usare il supporto in bulk disponibile direttamente con la versione 3.x di .NET SDK e non richiede alcuna libreria esterna.
Se attualmente si usa la libreria di esecuzione bulk e si prevede di eseguire la migrazione al supporto bulk nell'SDK più recente, seguire la procedura descritta nella Guida alla migrazione per eseguire la migrazione dell'applicazione.
Questa esercitazione fornisce le istruzioni per importare e aggiornare i documenti nei contenitori di Azure Cosmos DB usando la libreria .NET dell'executor in blocco. Per informazioni sulla libreria di esecuzione bulk e su come consente di sfruttare le enormi potenzialità di velocità effettiva e spazio di archiviazione, vedere l'articolo Panoramica della libreria di esecuzione bulk di Azure Cosmos DB. In questa esercitazione, viene esaminata in dettaglio un'applicazione .NET di esempio che importa in blocco i documenti generati in modo casuale in un contenitore di Azure Cosmos DB. Dopo l'importazione, si scoprirà come poter aggiornare in bulk i dati importati specificando le patch come operazioni da eseguire su campi di documenti specifici.
Attualmente, la libreria di esecuzione bulk è supportata solo dagli account Azure Cosmos DB for NoSQL e API per Gremlin. Questo articolo descrive come usare la libreria .NET di esecuzione bulk con account dell'API per NoSQL. Per informazioni su come usare la libreria .NET di esecuzione bulk con gli account dell'API per Gremlin, vedere Inserire i dati in blocco in Azure Cosmos DB per Gremlin usando una libreria di esecuzione bulk.
Prerequisiti
Visual Studio più recente con il carico di lavoro Sviluppo di Azure. È possibile iniziare a usare l'IDE gratuito di Visual Studio Community . Durante l'installazione di Visual Studio, abilitare il carico di lavoro Sviluppo di Azure.
Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.
In alternativa, è possibile provare gratuitamente Azure Cosmos DB senza una sottoscrizione di Azure. È anche possibile installare e usare l'emulatore di Azure Cosmos DB per lo sviluppo locale e il test con l'endpoint
https://localhost:8081
. La chiave primaria viene fornita in Autenticazione delle richieste.Creare un account Azure Cosmos DB for NoSQL seguendo la procedura descritta nella sezione Creare un account Azure Cosmos DB di Guida introduttiva: Libreria client di Azure Cosmos DB for NoSQL per .NET.
Clonare l'applicazione di esempio
A questo punto è possibile iniziare a usare il codice scaricando un'applicazione .NET di esempio da GitHub. Questa applicazione esegue operazioni in bulk sui dati archiviati nel proprio account Azure Cosmos DB. Per clonare l'applicazione, aprire un prompt dei comandi, passare alla directory in cui si vuole copiare l'applicazione ed eseguire il comando seguente:
git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git
Il repository clonato contiene due esempi: BulkImportSample e BulkUpdateSample. È possibile aprire una delle applicazioni di esempio, aggiornare le stringhe di connessione nel file App.config con le stringhe di connessione dell'account Azure Cosmos DB, compilare la soluzione ed eseguirla.
L'applicazione BulkImportSample genera documenti casuali e li importa in bulk nell'account Azure Cosmos DB. L'applicazione BulkUpdateSample aggiorna in blocco i documenti importati, specificando le patch come operazioni da eseguire su campi di documenti specifici. Nelle sezioni successive verrà esaminato il codice in ognuna di queste app di esempio.
Importare dati in bulk in un account Azure Cosmos DB
Passare alla cartella BulkImportSample e aprire il file BulkImportSample.sln.
Le stringhe di connessione di Azure Cosmos DB vengono recuperate dal file App.config, come illustrato nel codice seguente:
private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"]; private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"]; private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"]; private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"]; private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
L'utilità di importazione in bulk crea un nuovo database e un nuovo contenitore con il nome del database, il nome del contenitore e i valori di velocità effettiva specificati nel file App.config.
L'oggetto DocumentClient viene quindi inizializzato con la modalità di connessione TCP diretto:
ConnectionPolicy connectionPolicy = new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp }; DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey, connectionPolicy)
L'oggetto BulkExecutor viene inizializzato con valori di ripetizione elevati per il tempo di attesa e le richieste con limitazione. Questi vengono quindi impostati su 0 per passare il controllo della congestione a BulkExecutor per la relativa durata.
// Set retry options high during initialization (default values). client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9; IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection); await bulkExecutor.InitializeAsync(); // Set retries to 0 to pass complete control to bulk executor. client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
L'applicazione richiama l'API BulkImportAsync. La libreria .NET fornisce due overload dell'API di importazione in bulk: una che accetta un elenco di documenti JSON serializzati e una che accetta un elenco di documenti POCO deserializzati. Per altre informazioni sulle definizioni di ognuno di questi metodi di overload, vedere la documentazione dell'API.
BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync( documents: documentsToImportInBatch, enableUpsert: true, disableAutomaticIdGeneration: true, maxConcurrencyPerPartitionKeyRange: null, maxInMemorySortingBatchSize: null, cancellationToken: token);
Di seguito sono illustrati i parametri accettati dal metodo BulkImportAsync.
Parametro Descrizione enableUpsert Flag per consentire operazioni di upsert dei documenti. Se esiste già un documento con l'ID specificato, questo viene aggiornato. Per impostazione predefinita, il valore è impostato su False. disableAutomaticIdGeneration Flag per disabilitare la generazione automatica dell'ID. Per impostazione predefinita, il valore è impostato su True. maxConcurrencyPerPartitionKeyRange Grado massimo di concorrenza per ogni intervallo di chiavi di partizione. Se si imposta su null, la libreria usa un valore predefinito pari a 20. maxInMemorySortingBatchSize Il numero massimo di documenti di cui è stato eseguito il pull dall'enumeratore di documenti che viene passato alla chiamata API in ogni fase. Per la fase di ordinamento in memoria che si verifica prima dell'importazione bulk. Se si imposta questo parametro su null, la raccolta usa il valore minimo predefinito (documents.count, 1000000). cancellationToken Il token di annullamento per uscire dall'operazione di importazione in bulk in modo graduale.
Definizione dell'oggetto risposta importazione bulk
Il risultato della chiamata API di importazione bulk contiene gli attributi seguenti:
Parametro | Descrizione |
---|---|
NumberOfDocumentsImported (long) | Il numero totale di documenti importati correttamente rispetto al numero totale di documenti forniti alla chiamata API di importazione in bulk. |
TotalRequestUnitsConsumed (double) | Le unità richiesta (UR) totali usate dalla chiamata API di importazione in blocco. |
TotalTimeTaken (TimeSpan) | Il tempo totale impiegato dalla chiamata API di importazione in bulk per completare l'esecuzione. |
BadInputDocuments (List<object>) | L'elenco di documenti con formato non valido che non sono stati importati correttamente nella chiamata API di importazione in bulk. Correggere i documenti restituiti e ripetere l'importazione. I documenti con formato non valido includono i documenti con un valore ID diverso da una stringa, ad esempio con un valore null o qualsiasi altro tipo di dati considerato non valido. |
Aggiornare i dati in bulk in un account Azure Cosmos DB
È possibile aggiornare i documenti esistenti tramite l'API BulkUpdateAsync. In questo esempio si imposta il campo Name
su un nuovo valore e si rimuoverà il campo Description
dai documenti esistenti. Per il set completo di operazioni di aggiornamento supportate, vedere la documentazione dell'API.
Passare alla cartella BulkUpdateSample e aprire il file BulkUpdateSample.sln.
Definire gli elementi di aggiornamento insieme alle operazioni di aggiornamento dei campi corrispondenti. In questo esempio si userà SetUpdateOperation per aggiornare il campo
Name
e UnsetUpdateOperation per rimuovere il campoDescription
da tutti i documenti. È possibile anche eseguire altre operazioni, ad esempio incrementare un campo del documento per un valore specifico, eseguire il push di valori specifici in un campo di matrice o rimuovere un valore specifico da un campo di matrice. Per informazioni sui diversi metodi forniti dall'API di aggiornamento in blocco, vedere la documentazione dell'API.SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc"); UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description"); List<UpdateOperation> updateOperations = new List<UpdateOperation>(); updateOperations.Add(nameUpdate); updateOperations.Add(descriptionUpdate); List<UpdateItem> updateItems = new List<UpdateItem>(); for (int i = 0; i < 10; i++) { updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations)); }
L'applicazione richiama l'API BulkUpdateAsync. Per altre informazioni sulla definizione del metodo BulkUpdateAsync, vedere la documentazione dell'API.
BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync( updateItems: updateItems, maxConcurrencyPerPartitionKeyRange: null, maxInMemorySortingBatchSize: null, cancellationToken: token);
Di seguito sono illustrati i parametri accettati dal metodo BulkUpdateAsync.
Parametro Descrizione maxConcurrencyPerPartitionKeyRange Grado massimo di concorrenza per ogni intervallo di chiavi di partizione. Se si imposta questo parametro su null, la libreria usa il valore predefinito (20). maxInMemorySortingBatchSize Il numero massimo di elementi di aggiornamento di cui è stato eseguito il pull dall'enumeratore di elementi di aggiornamento che viene passato alla chiamata API in ogni fase. Per la fase di ordinamento in memoria che si verifica prima dell'aggiornamento bulk. Se si imposta questo parametro su null, la libreria usa il valore minimo predefinito(updateItems.count, 1000000). cancellationToken Il token di annullamento per uscire dall'operazione di aggiornamento in bulk in modo graduale.
Definizione dell'oggetto risposta per l'aggiornamento bulk
Il risultato della chiamata API di aggiornamento bulk contiene gli attributi seguenti:
Parametro | Descrizione |
---|---|
NumberOfDocumentsUpdated (long) | Il numero di documenti aggiornati correttamente rispetto al numero totale di documenti forniti alla chiamata API di aggiornamento in bulk. |
TotalRequestUnitsConsumed (double) | Le unità richiesta (UR) totali usate dalla chiamata API di aggiornamento in bulk. |
TotalTimeTaken (TimeSpan) | Il tempo totale impiegato dalla chiamata API di aggiornamento in bulk per completare l'esecuzione. |
Suggerimenti per le prestazioni
Per ottenere prestazioni migliori, quando si usa la libreria di esecuzione bulk, tenere presente quanto segue:
Per prestazioni ottimali, eseguire l'applicazione da una macchina virtuale Azure che si trovi nella stessa area in cui si trova l'area di scrittura dell'account Azure Cosmos DB.
È consigliabile creare l'istanza di un singolo oggetto BulkExecutor per l'intera applicazione all'interno di una singola macchina virtuale corrispondente a un contenitore di Azure Cosmos DB specifico.
L'esecuzione di una singola API di operazione in bulk usa un blocco di grandi dimensioni della CPU e dell'I/O di rete del computer client quando genera internamente più attività. Evitare di generare più attività simultanee all'interno del processo dell'applicazione, di cui ognuna esegue chiamate API di operazioni in bulk. Se una singola chiamata API di operazione in bulk in esecuzione su una singola macchina virtuale non è in grado di usare la velocità effettiva dell'intero contenitore (se la velocità effettiva del contenitore > 1 milione di UR/sec), è preferibile creare macchine virtuali separate per eseguire simultaneamente le chiamate API di operazione in bulk.
Verificare che il metodo
InitializeAsync()
venga richiamato dopo la creazione dell'istanza di un oggetto BulkExecutor per recuperare la mappa di partizioni del contenitore di Azure Cosmos DB di destinazione.In App.Config dell'applicazione verificare che gcServer sia abilitato per assicurare prestazioni migliori.
<runtime> <gcServer enabled="true" /> </runtime>
La libreria genera tracce che possono essere raccolte in un file di log o nella console. Per abilitare entrambi, aggiungere il codice seguente al file App.Config dell'applicazione:
<system.diagnostics> <trace autoflush="false" indentsize="4"> <listeners> <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" /> <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" /> </listeners> </trace> </system.diagnostics>