Teilen über


Ausführen von Massenvorgängen für Azure Cosmos DB-Daten

GILT FÜR: NoSQL

Dieses Lernprogramm enthält Anweisungen zum Ausführen von Massenvorgängen im Azure Cosmos DB Java V4 SDK. In dieser Version des SDK ist die BulkExecutor-Bibliothek integriert. Wenn Sie eine ältere Version des Java SDK verwenden, wird das Migrieren zur aktuellen Version empfohlen. Das Azure Cosmos DB Java V4 SDK ist die derzeit empfohlene Lösung für Java-Massenunterstützung.

Zurzeit wird die Bulk Executor-Bibliothek nur von Azure Cosmos DB for NoSQL- und API für Gremlin-Konten unterstützt. Weitere Informationen zum Verwenden der .NET-Bibliothek für Bulk Executor mit der API für Gremlin finden Sie unter Ausführen von Massenvorgängen in Azure Cosmos DB for Gremlin.

Voraussetzungen

Klonen der Beispielanwendung

Beginnen wir nun mit der Verwendung von Code, indem wir ein generisches Beispielrepository für Java V4 SDK für Azure Cosmos DB aus GitHub herunterladen. Diese Beispielanwendungen führen CRUD-Vorgänge und andere allgemeine Vorgänge in Azure Cosmos DB aus. Um das Repository zu klonen, öffnen Sie eine Eingabeaufforderung, navigieren Sie zu dem Verzeichnis, in das Sie die Anwendung kopieren möchten, und führen Sie den folgenden Befehl aus:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

Das geklonte Repository enthält eine Beispiel-SampleBulkQuickStartAsync.java im Ordner /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async. Die Anwendung generiert Dokumente und führt Vorgänge aus, um Elemente in Azure Cosmos DB in Massenvorgängen zu erstellen, zu aktualisieren/einzufügen (upsert), zu ersetzen und zu löschen. In den nächsten Abschnitten sehen wir uns den Code in der Beispiel-App an.

Massenausführung in Azure Cosmos DB

  1. Die Verbindungszeichenfolgen von Azure Cosmos DB werden als Argumente gelesen und Variablen zugewiesen, die in der Datei /examples/common/AccountSettings.java definiert sind. Diese Umgebungsvariablen müssen festgelegt sein,
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Zum Ausführen des Massenbeispiels geben Sie die zugehörige Hauptklasse an:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. Das CosmosAsyncClient-Objekt wird mit den folgenden Anweisungen initialisiert:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. Im Beispiel werden eine asynchrone Datenbank und ein asynchroner Container erstellt. Anschließend werden mehrere Dokumente erstellt, für die Massenvorgänge ausgeführt werden. Diese Dokumente werden einem reaktiven Flux<Family>-Streamobjekt hinzugefügt:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. Das Beispiel enthält Methoden für Massenerstellung, Massenupsert, Massenersetzung und Massenlöschung. In jeder Methode ordnen wir die Families-Dokumente im BulkWriter-Flux<Family>-Stream mehreren Methodenaufrufen in CosmosBulkOperations zu. Diese Vorgänge werden einem anderen reaktiven Flux<CosmosItemOperation>-Streamobjekt hinzugefügt. Der Stream wird dann an die executeBulkOperations-Methode des asynchronen container übergeben, den wir zu Beginn erstellt haben, und Vorgänge werden als Massenvorgänge ausgeführt. Die nachstehende Massenerstellungsmethode dient als Beispiel:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Es gibt zudem eine BulkWriter.java-Klasse im selben Verzeichnis wie die Beispielanwendung. Mithilfe dieser Klasse wird veranschaulicht, wie Fehler beim Einschränken von Raten (429) und Timeoutfehler (408) behandelt werden, die während der Massenausführung auftreten können, und wie diese Vorgänge effektiv wiederholt werden. Sie wird in den folgenden Methoden implementiert, die auch zeigen, wie eine lokale und globale Durchsatzsteuerung implementiert wird.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Darüber hinaus gibt es im Beispiel Massenerstellungsmethoden, die veranschaulichen, wie die Antwortverarbeitung hinzugefügt und Ausführungsoptionen festgelegt werden:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

Leistungstipps

Berücksichtigen Sie bei der Verwendung der BulkExecutor-Bibliothek die folgenden Punkte, um die Leistung zu verbessern:

  • Um die beste Leistung zu erzielen, führen Sie Ihre Anwendung auf einem virtuellen Azure-Computer in der Region aus, die Sie für Ihre Azure Cosmos DB-Kontoschreibvorgänge verwenden.

  • So erzielen Sie einen höheren Durchsatz:

    • Legen Sie die JVM-Heapgröße auf einen Wert fest, der hoch genug ist, um Arbeitsspeicherprobleme bei der Verarbeitung einer großen Anzahl von Dokumenten zu vermeiden. Empfohlene Heapgröße: max(3GB, 3 * sizeof(all documents passed to bulk import API in one batch)).
    • Es muss eine gewisse Vorverarbeitungszeit eingerechnet werden, daher erhalten Sie einen höheren Durchsatz, wenn Sie Massenvorgänge mit einer großen Anzahl von Dokumenten ausführen. Wenn Sie z.B. 10.000.000 Dokumente importieren möchten, sollten Sie eher 10 Massenimportvorgänge für 10 Batches mit jeweils 1.000.000 Dokumenten ausführen als 100 Massenimportvorgänge für 100 Batches mit jeweils 100.000 Dokumenten.
  • Es empfiehlt sich, ein einzelnes CosmosAsyncClient-Objekt für die gesamte Anwendung auf einem einzelnen virtuellen Computer zu instanziieren, das einem bestimmten Azure Cosmos DB-Container entspricht.

  • Eine einzelne Ausführung einer Massenvorgang-API verbraucht eine große Menge an CPU- und Netzwerk-E/A-Ressourcen des Clientcomputers. Dies wird erreicht, indem mehrere Tasks intern erzeugt werden. Vermeiden Sie das Erzeugen mehrerer gleichzeitiger Tasks in Ihrem Anwendungsprozess, von denen jeder Massenvorgang-API-Aufrufe ausführt. Wenn ein einzelner Massenvorgang-API-Aufruf, der auf einem einzelnen virtuellen Computer ausgeführt wird, nicht den gesamten Durchsatz Ihres Containers verbrauchen kann (wenn der Durchsatz mehr als > 1 Million Anforderungseinheiten pro Sekunde beträgt), ist es besser, separate virtuelle Computer zu erstellen, um Massenvorgang-API-Aufrufe gleichzeitig auszuführen.

Nächste Schritte