Teilen über


Verwenden der BulkExecutor-.NET-Bibliothek zum Ausführen von Massenvorgängen in Azure Cosmos DB

GILT FÜR: NoSQL

Hinweis

Die in diesem Artikel beschriebene Bulk Executor-Bibliothek wird für Anwendungen gepflegt, die die Version .NET SDK 2.x verwenden. Für neue Anwendungen können Sie die Massenunterstützung verwenden, die direkt mit dem .NET SDK 3.x verfügbar ist und keine externe Bibliothek erfordert.

Wenn Sie derzeit die BulkExecutor-Bibliothek verwenden und eine Migration zur Massenunterstützung für das neuere SDK planen, führen Sie die Schritte im Migrationsleitfaden aus, um Ihre Anwendung zu migrieren.

In diesem Tutorial erfahren Sie, wie Sie die Bulk Executor .NET-Bibliothek verwenden, um Dokumente in einen Azure Cosmos DB-Container zu importieren und zu aktualisieren. Informationen zur BulkExecutor-Bibliothek und dazu, wie Sie damit massiven Durchsatz und riesige Speichermengen nutzen können, finden Sie im Artikel Azure Cosmos DB Bulk Executor-Bibliothek – Übersicht. In diesem Tutorial sehen Sie eine .NET-Beispielanwendung, die zufällig generierte Dokumente mittels Massenimport in einen Azure Cosmos DB-Container importiert. Nachdem Sie die Daten importiert haben, zeigt Ihnen die Bibliothek, wie Sie die importierten Daten mittels Massenaktualisierung aktualisieren können, indem Sie Patches als Vorgänge angeben, die für bestimmte Dokumentfelder ausgeführt werden sollen.

Zurzeit wird die Bulk Executor-Bibliothek nur von den Azure Cosmos DB for NoSQL- und „API für Gremlin“-Konten unterstützt. Dieser Artikel beschreibt, wie Sie die Bulk Executor-.NET-Bibliothek mit „API für NoSQL“-Konten verwenden. Informationen zur Verwendung der .NET-Bibliothek für den Massenvollstrecker mit der API für Gremlin-Konten finden Sie unter Erfassen von Daten in einem Massenvorgang in Azure Cosmos DB für Gremlin mithilfe einer Bulk Executor-Bibliothek.

Voraussetzungen

Klonen der Beispielanwendung

Beginnen Sie jetzt mit der Arbeit mit Code, indem Sie eine .NET-Beispielanwendung von GitHub herunterladen. Diese Anwendung führt Massenvorgänge für die in Ihrem Azure Cosmos DB-Konto gespeicherten Daten aus. Um die Anwendung zu klonen, öffnen Sie eine Eingabeaufforderung, navigieren Sie zu dem Verzeichnis, in das Sie die Anwendung kopieren möchten, und führen Sie den folgenden Befehl aus:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

Das geklonte Repository enthält zwei Beispiele: BulkImportSample und BulkUpdateSample. Sie können jede der Beispielanwendungen öffnen, die Verbindungszeichenfolgen in der Datei App.config auf die Verbindungszeichenfolgen Ihres Azure Cosmos DB-Kontos aktualisieren, die Lösung erstellen und sie ausführen.

Die Anwendung BulkImportSample generiert zufällig Dokumente und importiert sie per Massenvorgang in Ihr Azure Cosmos DB-Konto. Die Anwendung BulkUpdateSample aktualisiert die importierten Dokumente per Massenvorgang, indem sie Patches als Vorgänge angibt, die für bestimmte Dokumentfelder ausgeführt werden sollen. In den nächsten Abschnitten sehen wir uns den Code in jeder dieser Beispiel-Apps an.

Massenimport von Daten in ein Azure Cosmos DB-Konto

  1. Navigieren Sie zum Ordner BulkImportSample, und öffnen Sie die Datei BulkImportSample.sln.

  2. Die Verbindungszeichenfolgen von Azure Cosmos DB werden aus der Datei „App.config“ abgerufen, wie im folgenden Code gezeigt:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    Die Massenimportanwendung erstellt eine neue Datenbank und einen Container mit dem Datenbanknamen, dem Containernamen und den Durchsatzwerten, die in der Datei „App.config“ angegeben sind.

  3. Als Nächstes wird das DocumentClient-Objekt mit dem direkten TCP-Verbindungsmodus initialisiert:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. Das BulkExecutor-Objekt wird mit hohen Wiederholungswerten für Wartezeit und gedrosselte Anforderungen initialisiert. Dann werden die Werte auf 0 festgelegt, um die Überlastungssteuerung während der Lebensdauer an BulkExecutor zu übergeben.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. Die Anwendung ruft die BulkImportAsync-API auf. Die .NET-Bibliothek stellt zwei Überladungen der Massenimport-API bereit: Die eine akzeptiert eine Liste von serialisierten JSON-Dokumenten, die andere akzeptiert eine Liste von deserialisierten POCO-Dokumenten. Weitere Informationen zu den Definitionen jeder dieser überladenen Methoden finden Sie in der API-Dokumentation.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    Die BulkImportAsync-Methode akzeptiert die folgenden Parameter:

    Parameter Beschreibung
    enableUpsert Ein Flag zum Aktivieren von Upsert-Vorgängen für die Dokumente. Wenn bereits ein Dokument mit der angegebenen ID vorhanden ist, wird es aktualisiert. Die Standardeinstellung ist „false“.
    disableAutomaticIdGeneration Ein Flag zum Deaktivieren der automatischen Generierung einer ID. Die Standardeinstellung ist „true“.
    maxConcurrencyPerPartitionKeyRange Der maximale Grad an Parallelität pro Partitionsschlüsselbereich. Der Wert Null veranlasst die Bibliothek, einen Standardwert von 20 zu verwenden.
    maxInMemorySortingBatchSize Die maximale Anzahl von Dokumenten, für die von dem Dokumentenumerator ein Pull ausgeführt wird, der in jeder Phase an den API-Aufruf übergeben wird. Für die Phase der In-Memory-Sortierung, die vor dem Massenimport stattfindet. Wenn dieser Parameter auf Null gesetzt wird, verwendet die Bibliothek den Standard-Mindestwert (documents.count, 1000000).
    cancellationToken Das Abbruchtoken für ein ordnungsgemäßes Beenden des Massenimportvorgangs.

Definition des Massenimport-Antwortobjekts
Das Ergebnis des Massenimport-API-Aufrufs enthält die folgenden Attribute:

Parameter Beschreibung
NumberOfDocumentsImported (long) Die Gesamtanzahl von Dokumenten, die von allen an den Massenimport-API-Aufruf gesendeten Dokumenten erfolgreich importiert wurden.
TotalRequestUnitsConsumed (double) Die Gesamtanzahl von Anforderungseinheiten, die vom Massenimport-API-Aufruf verbraucht wurden.
TotalTimeTaken (TimeSpan) Die Gesamtzeit, die der Massenimport-API-Aufruf benötigt, um die Ausführung abzuschließen.
BadInputDocuments (List<object>) Die Liste der Dokumente mit ungültigem Format, die im Massenimport-API-Aufruf nicht erfolgreich importiert wurden. Korrigieren Sie die zurückgegebenen Dokumente, und wiederholen Sie den Import. Zu Dokumenten mit ungültigem Format gehören auch Dokumente, deren ID-Wert keine Zeichenfolge ist (NULL-Werte oder andere Datentypen werden als ungültig betrachtet).

Massenaktualisierung von Daten in Ihrem Azure Cosmos DB-Konto

Sie können vorhandene Dokumente mithilfe der BulkUpdateAsync-API aktualisieren. In diesem Beispiel legen Sie das Feld Name auf einen neuen Wert fest und entfernen das Feld Description aus den vorhandenen Dokumenten. Alle unterstützten Aktualisierungsvorgänge finden Sie in der API-Dokumentation.

  1. Navigieren Sie zum Ordner BulkUpdateSample, und öffnen Sie die Datei BulkUpdateSample.sln.

  2. Definieren Sie die Aktualisierungselemente zusammen mit den entsprechenden Vorgängen zum Aktualisieren von Feldern. In diesem Beispiel verwenden Sie SetUpdateOperation, um das Name-Feld zu aktualisieren und UnsetUpdateOperation, um das Description-Feld aus allen Dokumenten zu entfernen. Sie können auch andere Vorgänge durchführen, z. B. ein Dokumentfeld um einen bestimmten Wert erhöhen, bestimmte Werte in ein Arrayfeld einfügen oder einen bestimmten Wert aus einem Arrayfeld entfernen. Informationen zu den verschiedenen Methoden, die von der API für die Massenaktualisierung bereitgestellt werden, finden Sie in der API-Dokumentation.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. Die Anwendung ruft die BulkUpdateAsync-API auf. Informationen zur Definition der BulkUpdateAsync-Methode finden Sie in der API-Dokumentation.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    Die BulkUpdateAsync-Methode akzeptiert die folgenden Parameter:

    Parameter Beschreibung
    maxConcurrencyPerPartitionKeyRange Der maximale Grad an Parallelität pro Partitionsschlüsselbereich. Setzen Sie diesen Parameter auf null, verwendet die Bibliothek den Standardwert (20).
    maxInMemorySortingBatchSize Die maximale Anzahl von Aktualisierungselementen, für die von dem Enumerator für Aktualisierungselemente ein Pull ausgeführt wird, der in jeder Phase an den API-Aufruf übergeben wird. Für die Phase der In-Memory-Sortierung, die vor der Massenaktualisierung stattfindet. Setzen Sie diesen Parameter auf null, verwendet die Bibliothek den Standard-Mindestwert (updateItems.count, 1000000).
    cancellationToken Das Abbruchtoken für ein ordnungsgemäßes Beenden des Massenaktualisierungsvorgangs.

Definition des Antwortobjekts für Massenupdates
Das Ergebnis des Massenaktualisierungs-API-Aufrufs enthält die folgenden Attribute:

Parameter Beschreibung
NumberOfDocumentsUpdated (long) Die Anzahl von Dokumenten, die von allen an den Massenaktualisierungs-API-Aufruf gesendeten Dokumenten erfolgreich aktualisiert wurden.
TotalRequestUnitsConsumed (double) Die Gesamtanzahl von Anforderungseinheiten, die vom Massenaktualisierungs-API-Aufruf verbraucht wurden.
TotalTimeTaken (TimeSpan) Die Gesamtzeit, die der Massenaktualisierungs-API-Aufruf benötigt, um die Ausführung abzuschließen.

Leistungstipps

Beachten Sie die folgenden Punkte für eine bessere Leistung, wenn Sie die Bulk Executor-Bibliothek verwenden:

  • Um die beste Leistung zu erzielen, führen Sie Ihre Anwendung auf einem virtuellen Azure-Computer in der Region aus, die Sie für Schreibvorgänge in Ihrem Azure Cosmos DB-Konto verwenden.

  • Es empfiehlt sich, ein einzelnes BulkExecutor-Objekt für die gesamte Anwendung auf einem einzelnen virtuellen Computer zu instanziieren, das einem bestimmten Azure Cosmos DB-Container entspricht.

  • Die Ausführung einer einzigen Massenvorgang-API verbraucht einen großen Teil der CPU des Clientcomputers und der Netzwerk-E/A, wenn mehrere Aufgaben intern gestartet werden. Vermeiden Sie das Erzeugen mehrerer gleichzeitiger Tasks in Ihrem Anwendungsprozess, der Massenvorgang-API-Aufrufe ausführt. Wenn ein einzelner Massenvorgang-API-Aufruf, der auf einem einzelnen virtuellen Computer ausgeführt wird, nicht den gesamten Durchsatz des Containers verbrauchen kann (wenn der Durchsatz mehr als >1 Million Anforderungseinheiten pro Sekunde beträgt), ist es besser, separate virtuelle Computer zu erstellen, um die Massenvorgang-API-Aufrufe gleichzeitig auszuführen.

  • Stellen Sie sicher, dass die Methode InitializeAsync() aufgerufen wird, nachdem ein BulkExecutor-Objekt instanziiert wurde, um die Partitionszuordnung für den Azure Cosmos DB-Zielcontainer abzurufen.

  • Stellen Sie in der App.config-Datei Ihrer Anwendung sicher, dass gcServer aktiviert ist, um eine bessere Leistung zu erzielen.

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • Die Bibliothek gibt Ablaufverfolgungen aus, die entweder in einer Protokolldatei oder in der Konsole gesammelt werden können. Um beides zu aktivieren, fügen Sie den folgenden Code zur Datei App.Config Ihrer Anwendung hinzu.

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

Nächste Schritte