Share via


Usare la libreria .NET dell'executor bulk per eseguire operazioni bulk in Azure Cosmos DB

SI APPLICA A: NoSQL

Nota

La libreria dell'executor bulk descritta in questo articolo viene mantenuta per le applicazioni che usano la versione di .NET SDK 2.x. Per le nuove applicazioni, è possibile usare il supporto in blocco direttamente disponibile con .NET SDK versione 3.x e non richiede alcuna libreria esterna.

Se attualmente si usa la libreria dell'executor bulk e si prevede di eseguire la migrazione al supporto bulk nell'SDK più recente, seguire la procedura descritta nella Guida alla migrazione per eseguire la migrazione dell'applicazione.

Questa esercitazione fornisce istruzioni su come usare la libreria .NET dell'executor bulk per importare e aggiornare documenti in un contenitore Di Azure Cosmos DB. Per informazioni sulla libreria dell'executor bulk e su come consente di usare velocità effettiva e archiviazione di grandi dimensioni, vedere la panoramica della libreria dell'executor bulk di Azure Cosmos DB. In questa esercitazione viene visualizzata un'applicazione .NET di esempio in cui le importazioni bulk di documenti generati in modo casuale in un contenitore di Azure Cosmos DB. Dopo aver importato i dati, la raccolta mostra come è possibile aggiornare in blocco i dati importati specificando le patch come operazioni da eseguire su campi di documento specifici.

Attualmente, la libreria dell'executor bulk è supportata solo dagli account Azure Cosmos DB per NoSQL e API per gli account Gremlin. Questo articolo descrive come usare la libreria .NET dell'executor bulk con l'API per gli account NoSQL. Per informazioni su come usare la libreria .NET dell'executor bulk con l'API per gli account Gremlin, vedere Inserire i dati in blocco in Azure Cosmos DB per Gremlin usando una libreria di executor bulk.

Prerequisiti

Clonare l'applicazione di esempio

Passare ora all'uso del codice scaricando un'applicazione .NET di esempio da GitHub. Questa applicazione esegue operazioni in blocco sui dati archiviati nell'account Azure Cosmos DB. Per clonare l'applicazione, aprire un prompt dei comandi, passare alla directory in cui si vuole copiarlo ed eseguire il comando seguente:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

Il repository clonato contiene due esempi, BulkImportSample e BulkUpdateSample. È possibile aprire una delle applicazioni di esempio, aggiornare i stringa di connessione nel file App.config con i stringa di connessione dell'account Azure Cosmos DB, compilare la soluzione ed eseguirla.

L'applicazione BulkImportSample genera documenti casuali e li importa in blocco nell'account Azure Cosmos DB. L'applicazione BulkUpdateSample aggiorna in blocco i documenti importati specificando le patch come operazioni da eseguire su campi di documento specifici. Nelle sezioni successive si esaminerà il codice in ognuna di queste app di esempio.

Importare in blocco i dati in un account Azure Cosmos DB

  1. Passare alla cartella BulkImportSample e aprire il file BulkImportSample.sln .

  2. Le stringa di connessione di Azure Cosmos DB vengono recuperate dal file App.config, come illustrato nel codice seguente:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    L'utilità di importazione in blocco crea un nuovo database e un contenitore con il nome del database, il nome del contenitore e i valori di velocità effettiva specificati nel file App.config.

  3. Successivamente, l'oggetto DocumentClient viene inizializzato con la modalità di connessione TCP diretta:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. L'oggetto BulkExecutor viene inizializzato con un valore elevato per il tempo di attesa e le richieste limitate. Vengono quindi impostati su 0 per passare il controllo della congestione a BulkExecutor per la sua durata.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. L'applicazione richiama l'API BulkImportAsync . La libreria .NET fornisce due overload dell'API di importazione bulk, una che accetta un elenco di documenti JSON serializzati e l'altra che accetta un elenco di documenti POCO deserializzati. Per altre informazioni sulle definizioni di ognuno di questi metodi di overload, vedere la documentazione dell'API.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    Di seguito sono illustrati i parametri accettati dal metodo BulkImportAsync.

    Parametro Descrizione
    enableUpsert Flag per abilitare le operazioni upsert nei documenti. Se esiste già un documento con l'ID specificato, viene aggiornato. Per impostazione predefinita, è impostato su false.
    disableAutomaticIdGeneration Flag per disabilitare la generazione automatica dell'ID. Per impostazione predefinita, è impostato su true.
    maxConcurrencyPerPartitionKeyRange Grado massimo di concorrenza per ogni intervallo di chiavi di partizione. Se si imposta su Null, la libreria usa un valore predefinito pari a 20.
    maxInMemorySortingBatchSize Numero massimo di documenti estratti dall'enumeratore di documenti, che viene passato alla chiamata API in ogni fase. Per la fase di ordinamento in memoria che si verifica prima dell'importazione bulk. Se si imposta questo parametro su Null, la raccolta usa il valore minimo predefinito (documents.count, 1000000).
    Cancellationtoken Token di annullamento per uscire normalmente dall'operazione di importazione bulk.

Definizione dell'oggetto risposta importazione bulk
Il risultato della chiamata API di importazione bulk contiene gli attributi seguenti:

Parametro Descrizione
NumberOfDocumentsImported (long) Numero totale di documenti importati correttamente dai documenti totali forniti alla chiamata API di importazione bulk.
TotalRequestUnitsConsumed (double) Le unità richiesta (UR) totali usate dalla chiamata API di importazione in blocco.
TotalTimeTaken (TimeSpan) Tempo totale impiegato dalla chiamata API di importazione bulk per completare l'esecuzione.
BadInputDocuments (oggetto> List<) Elenco di documenti in formato non valido che non sono stati importati correttamente nella chiamata API di importazione bulk. Correggere i documenti restituiti e riprovare a importare. I documenti formattati in modo non valido includono documenti il cui valore ID non è una stringa (null o qualsiasi altro tipo di dati è considerato non valido).

Aggiornare in blocco i dati nell'account Azure Cosmos DB

È possibile aggiornare i documenti esistenti usando l'API BulkUpdateAsync . In questo esempio si imposta il Name campo su un nuovo valore e si rimuove il Description campo dai documenti esistenti. Per il set completo di operazioni di aggiornamento supportate, vedere la documentazione dell'API.

  1. Passare alla cartella BulkUpdateSample e aprire il file BulkUpdateSample.sln .

  2. Definire gli elementi di aggiornamento insieme alle operazioni di aggiornamento dei campi corrispondenti. In questo esempio si usa SetUpdateOperation per aggiornare il Name campo e UnsetUpdateOperation per rimuovere il Description campo da tutti i documenti. È anche possibile eseguire altre operazioni, ad esempio l'incremento di un campo documento in base a un valore specifico, il push di valori specifici in un campo di matrice o la rimozione di un valore specifico da un campo di matrice. Per informazioni sui diversi metodi forniti dall'API di aggiornamento in blocco, vedere la documentazione dell'API.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. L'applicazione richiama l'API BulkUpdateAsync . Per informazioni sulla definizione del metodo BulkUpdateAsync, vedere la documentazione dell'API.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    Di seguito sono illustrati i parametri accettati dal metodo BulkUpdateAsync.

    Parametro Descrizione
    maxConcurrencyPerPartitionKeyRange Grado massimo di concorrenza per ogni intervallo di chiavi di partizione. Se si imposta questo parametro su Null, la libreria usa il valore predefinito (20).
    maxInMemorySortingBatchSize Numero massimo di elementi di aggiornamento estratti dall'enumeratore degli elementi di aggiornamento passato alla chiamata API in ogni fase. Per la fase di ordinamento in memoria che si verifica prima dell'aggiornamento bulk. Se si imposta questo parametro su Null, la libreria usa il valore minimo predefinito(updateItems.count, 1000000).
    Cancellationtoken Token di annullamento per uscire normalmente dall'operazione di aggiornamento in blocco.

Definizione dell'oggetto risposta per l'aggiornamento bulk
Il risultato della chiamata API di aggiornamento bulk contiene gli attributi seguenti:

Parametro Descrizione
NumberOfDocumentsUpdated (long) Numero di documenti aggiornati correttamente dai documenti totali forniti alla chiamata API di aggiornamento bulk.
TotalRequestUnitsConsumed (double) Unità richiesta totali (UR) utilizzate dalla chiamata API di aggiornamento in blocco.
TotalTimeTaken (TimeSpan) Tempo totale impiegato dalla chiamata API di aggiornamento bulk per completare l'esecuzione.

Suggerimenti per le prestazioni

Quando si usa la libreria bulk executor, considerare i punti seguenti per ottenere prestazioni migliori:

  • Per ottenere prestazioni ottimali, eseguire l'applicazione da una macchina virtuale di Azure che si trova nella stessa area dell'area di scrittura dell'account Azure Cosmos DB.

  • È consigliabile creare un'istanza di un singolo oggetto BulkExecutor per l'intera applicazione all'interno di una singola macchina virtuale corrispondente a un contenitore di Azure Cosmos DB specifico.

  • Un'esecuzione singola dell'API per operazioni bulk usa un blocco elevato della CPU e dell'I/O di rete del computer client durante la generazione di più attività internamente. Evitare di generare più attività simultanee all'interno del processo dell'applicazione che eseguono chiamate API per operazioni bulk. Se una singola chiamata API per operazioni bulk in esecuzione in una singola macchina virtuale non è in grado di usare la velocità effettiva dell'intero contenitore (se la velocità effettiva > del contenitore è di 1 milione di UR/s), è preferibile creare macchine virtuali separate per eseguire simultaneamente le chiamate API dell'operazione in blocco.

  • Assicurarsi che il InitializeAsync() metodo venga richiamato dopo aver creato un'istanza di un oggetto BulkExecutor per recuperare la mappa di partizione del contenitore di Azure Cosmos DB di destinazione.

  • In App.Config dell'applicazione verificare che gcServer sia abilitato per assicurare prestazioni migliori.

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • La libreria genera tracce che possono essere raccolte in un file di log o nella console. Per abilitare entrambi, aggiungere il codice seguente al file App.Config dell'applicazione:

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

Passaggi successivi