Compartilhar via


Executar operações em massa em dados do Azure Cosmos DB

APLICA-SE A: NoSQL

Este tutorial fornece instruções sobre como executar operações em massa no Azure Cosmos DB Java V4 SDK. Esta versão do SDK vem com a biblioteca de execução em massa embutida. Se você estiver usando uma versão mais antiga do SDK do Java, é recomendável migrar para a versão mais recente. O Azure Cosmos DB Java V4 SDK é a solução atual recomendada para o suporte em massa em Java.

Atualmente, a biblioteca de execução em massa é suportada apenas por Azure Cosmos DB para NoSQL e API para contas Gremlin. Para saber mais sobre como usar a biblioteca .NET de execução em massa com a API do Gremlin, consulte Executar operações em massa no Gremlin do Azure Cosmos DB.

Pré-requisitos

Clonar o aplicativo de exemplo

Agora vamos mudar para trabalhar com o código baixando um repositório de exemplos genéricos para o Java V4 SDK para o Azure Cosmos DB do GitHub. Esses aplicativos de exemplo executam operações CRUD e outras operações comuns no Azure Cosmos DB. Para clonar o repositório, abra um prompt de comando, navegue até o diretório onde você deseja copiar o aplicativo e execute o seguinte comando:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

O repositório clonado contém uma amostra SampleBulkQuickStartAsync.java na pasta /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async. O aplicativo gera documentos e executa operações para criar, executar upsert, substituir e excluir itens em massa no Azure Cosmos DB. Nas próximas seções, examinaremos o código no aplicativo de exemplo.

Execução em massa no Azure Cosmos DB

  1. As cadeias de caracteres de conexão do Azure Cosmos DB são lidas como argumentos e atribuídas a variáveis definidas no arquivo /examples/common/AccountSettings.java. Essas variáveis de ambiente devem ser definidas
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Para executar um exemplo em massa, especifique sua Classe principal:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. O objeto CosmosAsyncClient é inicializado usando as instruções a seguir:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
            .credential(new DefaultAzureCredentialBuilder().build())
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. A amostra cria um banco de dados e um contêiner assíncronos. Em seguida, ela cria vários documentos nos quais as operações em massa serão executadas. Ela adiciona esses documentos a um objeto Flux<Family> de fluxo reativo:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. A amostra contém métodos para criação em massa, execução de upsert, substituição e exclusão. Em cada método, mapeamos os documentos de famílias no fluxo BulkWriter Flux<Family> para várias chamadas de método em CosmosBulkOperations. Essas operações são adicionadas a outro objeto de fluxo reativo Flux<CosmosItemOperation>. O fluxo é então passado para o método executeBulkOperations do método assíncrono container que criamos no início, e as operações são executadas em massa. Veja o método de criação em massa abaixo como exemplo:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Também há uma classe BulkWriter.java no mesmo diretório que o aplicativo de exemplo. Essa classe demonstra como tratar erros de limitação de taxa (429) e tempo limite (408) que podem ocorrer durante a execução em massa e tentar novamente essas operações com eficiência. Ela é implementada nos métodos abaixo, mostrando também como implementar o controle de taxa de transferência local e global.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Além disso, há métodos de criação em massa no exemplo que ilustram como adicionar processamento de resposta e definir opções de execução:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

Dicas de desempenho

Considere os seguintes pontos para melhor desempenho ao usar a biblioteca bulk executor:

  • Para ter o melhor desempenho, execute o aplicativo de uma VM do Azure na mesma região da sua região de gravação da conta do Azure Cosmos DB.

  • Para alcançar maior taxa de transferência:

    • Defina o tamanho do heap da JVM para um número grande o suficiente para evitar qualquer problema de memória na manipulação de grandes números de documentos. Tamanho do heap sugerido: máx (3GB, 3 * tamanho de [todos os documentos passados para API de importação em massa em um lote]).
    • Há um tempo de pré-processamento, devido ao qual você obterá maior taxa de transferência ao executar operações em massa com um grande número de documentos. Portanto, se você deseja importar 10.000.000 documentos, executar a importação em massa 10 vezes em 10 lotes de documentos, cada um de tamanho 1.000.000, é preferível do que a execução da importação em massa 100 vezes em 100 lotes de documentos, cada um de tamanho 100.000 documentos.
  • É recomendado criar uma instância de um único objeto CosmosAsyncClient para o aplicativo inteiro em uma única máquina virtual que corresponda a um contêiner do Azure Cosmos DB específico.

  • Uma vez que a execução de uma única API de operação em massa consome uma grande parte da CPU e da E/S da rede do computador cliente. Isso acontece por geração de várias tarefas internamente, evite a geração de várias tarefas simultâneas no processo de aplicativo executando chamadas de API de operação em massa. Se uma única chamada de API de operação em massa executada em uma única máquina virtual não for capaz de consumir toda a taxa de transferência do contêiner (se a taxa de transferência > do contêiner for de 1 milhão de RU/s), é preferível criar máquinas virtuais separadas para executar simultaneamente chamadas de API de operação em massa.

Próximas etapas