Sdílet prostřednictvím


Provádění hromadných operací s daty Azure Cosmos DB

PLATÍ PRO: NoSQL

Tento návod obsahuje pokyny k provádění hromadných operací v Azure Cosmos DB Java V4 SDK. Tato verze sady SDK se dodává s integrovanou knihovnou hromadného zpracování. Pokud používáte starší verzi sady Java SDK, doporučujeme migrovat na nejnovější verzi. Azure Cosmos DB Java V4 SDK je aktuálně doporučené řešení pro hromadnou podporu Javy.

Knihovna bulk executoru je v současné době podporována pouze Azure Cosmos DB pro NoSQL a API pro Gremlin účty. Další informace o použití knihovny .NET bulk Executor s rozhraním API pro Gremlin najdete v tématu Hromadné operace ve službě Azure Cosmos DB pro Gremlin.

Požadavky

  • Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet. Nebo můžete použít emulátor služby Azure Cosmos DB s https://localhost:8081 koncovým bodem. Primární klíč je uvedený v části Ověřování požadavků.

  • Java Development Kit (JDK) 1.8+

    • Na Ubuntu nainstalujte sadu JDK spuštěním příkazu apt-get install default-jdk.

    • Nezapomeňte nastavit proměnnou prostředí JAVA_HOME tak, aby odkazovala na složku, ve které je sada JDK nainstalovaná.

  • Stáhněte a nainstalujte binární archiv Maven.

    • Na Ubuntu můžete Maven nainstalovat spuštěním příkazu apt-get install maven.
  • Pomocí postupu popsaného v části vytvoření účtu databáze článku rychlý start pro Javu vytvořte účet Azure Cosmos DB for NoSQL.

Klonování ukázkové aplikace

Teď přejdeme na práci s kódem stažením obecného úložiště ukázek pro sadu Java SDK v4 pro Azure Cosmos DB z GitHubu. Tyto ukázkové aplikace provádějí operace CRUD a další běžné operace ve službě Azure Cosmos DB. Pokud chcete naklonovat úložiště, otevřete příkazový řádek, přejděte do adresáře, do kterého chcete aplikaci zkopírovat, a spusťte následující příkaz:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

Klonované úložiště obsahuje ukázku SampleBulkQuickStartAsync.java ve složce /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async. Aplikace generuje dokumenty a provádí operace pro hromadné vytváření, aktualizaci, nahrazování a odstraňování položek ve službě Azure Cosmos DB. V dalších částech si projdeme kód v ukázkové aplikaci.

Hromadné spouštění ve službě Azure Cosmos DB

  1. Připojovací řetězce služby Azure Cosmos DB se čtou jako argumenty a přiřazují se proměnným definovaným v souboru examples/common/AccountSettings.java. Tyto proměnné prostředí musí být nastaveny.
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Chcete-li spustit hromadnou ukázku, zadejte její hlavní třídu:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. Objekt CosmosAsyncClient se inicializuje pomocí následujících příkazů:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
            .credential(new DefaultAzureCredentialBuilder().build())
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. Ukázka vytvoří asynchronní databázi a kontejner. Potom vytvoří více dokumentů, na kterých se budou provádět hromadné operace. Tyto dokumenty se přidají do reaktivního objektu streamu Flux<Family> :

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. Ukázka obsahuje metody hromadného vytváření, vložení nebo aktualizace, nahrazení a odstranění. V každé metodě mapujeme řady dokumentů v bulkWriter Flux<Family> streamu na více volání metod v CosmosBulkOperations. Tyto operace se přidají do jiného reaktivního objektu streamu Flux<CosmosItemOperation>. Stream se pak předá executeBulkOperations metodě asynchronní synchronizace container , kterou jsme vytvořili na začátku, a operace se provádějí hromadně. Jako příklad si prohlédněte metodu hromadného vytvoření:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Také je ve stejném adresáři jako ukázková aplikace třída BulkWriter.java. Tato třída ukazuje, jak zpracovat chyby omezování rychlosti (429) a časového limitu (408), ke kterým může dojít během hromadného spuštění, a efektivně tyto operace opakovat. Implementuje se v následujících metodách a také ukazuje, jak implementovat řízení místní a globální propustnosti.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Kromě toho v ukázce existují metody hromadného vytváření, které ilustrují, jak přidat zpracování odpovědí a nastavit možnosti spuštění:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

Tipy týkající se výkonu

Při použití knihovny Bulk Executor zvažte následující body, které vám poskytnou lepší výkon:

  • Pokud chcete dosáhnout nejlepšího výkonu, spusťte aplikaci z virtuálního počítače Azure ve stejné oblasti jako oblast zápisu účtu Azure Cosmos DB.

  • Pro dosažení vyšší propustnosti:

    • Nastavte velikost haldy JVM na dostatečně velké číslo, abyste se vyhnuli problémům s pamětí při zpracování velkého počtu dokumentů. Navrhovaná velikost paměti: max(3 GB, 3 × velikost(všechny dokumenty předané API pro hromadný import v jedné dávce)).
    • Existuje doba předběžného zpracování, kvůli které získáte vyšší propustnost při provádění hromadných operací s velkým počtem dokumentů. Takže pokud chcete importovat 10 000 000 dokumentů, spuštění hromadného importu 10krát pro 10 hromad dokumentů, každé o velikosti 1 000 000, je vhodnější než spuštění hromadného importu 100krát pro 100 hromad dokumentů, každé o velikosti 100 000.
  • Doporučujeme vytvořit instanci jednoho objektu CosmosAsyncClient pro celou aplikaci v rámci jednoho virtuálního počítače, který odpovídá určitému kontejneru Azure Cosmos DB.

  • Vzhledem k tomu, že spouštění rozhraní API pro hromadnou operaci spotřebovává velký blok procesoru a vstupně-výstupních operací klientského počítače. K tomu dochází tak, že interně vytváříte více úloh, vyhněte se vytváření více souběžných úloh v rámci procesu aplikace při každém provádění volání rozhraní API hromadné operace. Pokud jedno volání rozhraní API hromadné operace spuštěné na jednom virtuálním počítači nemůže využívat propustnost celého kontejneru (pokud propustnost > kontejneru 1 milion RU/s), je vhodnější vytvořit samostatné virtuální počítače pro souběžné spouštění volání rozhraní API hromadné operace.

Další kroky