Azure Cosmos DB 데이터에 대한 대량 작업 수행

적용 대상: NoSQL

이 자습서에서는 Azure Cosmos DB Java V4 SDK에서 일괄 작업을 수행하는 방법에 대한 지침을 제공합니다. 이 버전의 SDK에는 대량 실행기 라이브러리가 기본 제공되어 있습니다. 이전 버전의 Java SDK를 사용하는 경우 최신 버전으로 마이그레이션하는 것이 좋습니다. Azure Cosmos DB Java V4 SDK는 현재 Java 대량 지원에 권장되는 솔루션입니다.

현재 Azure Cosmos DB for NoSQL 및 API for Gremlin 계정에서만 대량 실행기 라이브러리가 지원됩니다. API for Gremlin에서 Bulk Executor .NET 라이브러리 사용에 대해 알아보려면 에서 대량 작업 수행을 참조하세요.

필수 조건

샘플 애플리케이션 복제

이제 GitHub에서 Azure Cosmos DB용 Java V4 SDK에 대한 일반 샘플 리포지토리를 다운로드하여 코드 작업으로 전환해 보겠습니다. 이러한 샘플 애플리케이션은 Azure Cosmos DB에서 CRUD 작업 및 기타 일반 작업을 수행합니다. 리포지토리를 복제하려면 명령 프롬프트를 열고 애플리케이션을 복사할 디렉터리로 이동한 후 다음 명령을 실행합니다.

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

복제된 리포지토리는 /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async 폴더에 샘플 SampleBulkQuickStartAsync.java을 포함합니다. 애플리케이션은 문서를 만들고 Azure Cosmos DB에서 항목을 대량 만들기, upsert, 교체 및 삭제하는 작업을 실행합니다. 다음 섹션에서는 샘플 앱의 코드를 검토합니다.

Azure Cosmos DB에서 대량 실행

  1. Azure Cosmos DB의 연결 문자열은 인수로 읽고 /examples/common/AccountSettings.java 파일에 정의된 변수에 할당됩니다. 이러한 환경 변수를 설정하여
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

대량 샘플을 실행하려면 기본 클래스를 지정합니다.

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. CosmosAsyncClient 개체는 다음 문을 사용하여 초기화됩니다.

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. 이 샘플은 비동기 데이터베이스와 컨테이너를 만듭니다. 그런 다음 대량 작업이 실행될 여러 문서를 만듭니다. 다음 문서를 Flux<Family> 반응 스트림 개체에 추가합니다.

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. 샘플에는 대량 만들기, upsert, 교체 및 삭제를 위한 메서드가 포함되어 있습니다. 각 메서드에서 BulkWriter Flux<Family> 스트림의 패밀리 문서를 CosmosBulkOperations의 여러 메서드 호출에 매핑합니다. 이러한 작업은 다른 반응 스트림 개체 Flux<CosmosItemOperation>에 추가됩니다. 그런 다음 스트림은 처음에 만든 비동기 containerexecuteBulkOperations 메서드로 전달되고 작업이 일괄 실행됩니다. 예로 아래의 대량 만들기 방법을 참조하세요.

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. 샘플 애플리케이션과 동일한 디렉터리에 BulkWriter.java 클래스도 있습니다. 이 클래스는 대량 실행 중에 발생할 수 있는 속도 제한(429) 및 시간 제한(408) 오류를 처리하고 이러한 작업을 효과적으로 다시 시도하는 방법을 보여 줍니다. 아래 방법으로 구현되며 로컬 및 전역 처리량 제어를 구현하는 방법도 보여 줍니다.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. 또한 샘플에는 응답 처리를 추가하고 실행 옵션을 설정하는 방법을 보여 주는 대량 만들기 메서드가 있습니다.

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

    성능 팁

    Bulk Executor 라이브러리를 사용하는 경우 성능 향상을 위해 다음 사항을 고려합니다.

    • 최상의 성능을 위해 Azure Cosmos DB 계정 작성 지역과 동일한 지역의 Azure VM에서 애플리케이션을 실행합니다.

    • 더 높은 처리량을 달성하려면:

      • 많은 수의 문서를 처리할 때 메모리 문제를 방지하기 위해 JVM의 힙 크기를 충분한 수로 설정합니다. 제안되는 힙 크기: 최대(3GB, 3 * sizeof(각 일괄 처리에서 대량 가져오기 API로 전달된 모든 문서)).
      • 문서 수가 많은 대량 작업을 수행하면 처리량이 더 높아지므로 전처리 시간이 있습니다. 따라서 10,000,000개 문서를 가져오려는 경우 크기가 100,000개인 대량 문서 100개에서 대량 가져오기를 100번 수행하는 것보다 각 크기가 1,000,000개인 대량 문서 10개에서 대량 가져오기를 10번 실행하는 것이 좋습니다.
    • 특정 Azure Cosmos DB 컨테이너에 해당하는 단일 가상 머신 내에서 전체 애플리케이션에 대해 단일 CosmosAsyncClient 개체를 인스턴스화하는 것이 좋습니다.

    • 단일 대량 작업 API 실행은 클라이언트 컴퓨터의 CPU 및 네트워크 IO의 대규모 청크를 사용하기 때문입니다. 이는 내부적으로 여러 작업을 생성되며 발생합니다. 대량 작업 API 호출을 각각 실행하는 애플리케이션 내에서 여러 동시 작업이 발생하지 않도록 하십시오. 단일 가상 머신에서 실행되는 단일 대량 작업 API 호출에서 전체 컨테이너의 처리량을 사용할 수 없는 경우(컨테이너의 처리량이 1백만 RU/s 미만인 경우) 대량 작업 API 호출을 동시에 실행하도록 개별 가상 머신을 만드는 것이 좋습니다.

    다음 단계