Melakukan operasi massal pada data Azure Cosmos DB

BERLAKU UNTUK: NoSQL

Tutorial ini memberikan instruksi tentang melakukan operasi massal di SDK V4 Java Azure Cosmos DB. Versi SDK ini dilengkapi dengan pustaka eksekutor massal bawaan. Jika Anda menggunakan versi Java SDK lama, sebaiknya migrasikan ke versi terbaru. SDK Azure Cosmos DB Java V4 adalah solusi yang direkomendasikan saat ini untuk dukungan massal Java.

Saat ini, pustaka pelaksana massal hanya didukung oleh Azure Cosmos DB untuk NoSQL dan API untuk akun Gremlin. Untuk mempelajari tentang menggunakan pustaka .NET pelaksana massal dengan API untuk Gremlin, lihat melakukan operasi massal di Azure Cosmos DB untuk Gremlin.

Prasyarat

Buat klon sampel aplikasi

Sekarang mari kita beralih menggunakan kode dengan mengunduh repositori contoh umum untuk SDK V4 Java untuk Azure Cosmos DB dari GitHub. Aplikasi contoh ini melakukan operasi CRUD dan operasi umum lainnya di Azure Cosmos DB. Untuk mengkloning repositori, buka perintah, navigasikan ke direktori tempat Anda ingin menyalin aplikasi dan menjalankan perintah berikut:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

Repositori yang dikloning berisi contoh SampleBulkQuickStartAsync.java di folder /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async. Aplikasi menghasilkan dokumen dan menjalankan operasi untuk membuat, upsert, mengganti, dan menghapus item secara massal di Azure Cosmos DB. Di bagian selanjutnya, kita akan meninjau kode di aplikasi contoh.

Eksekusi massal di Azure Cosmos DB

  1. String koneksi Azure Cosmos DB dibaca sebagai argumen dan ditetapkan ke variabel yang ditentukan dalam file /examples/common/AccountSettings.java. Variabel lingkungan ini harus ditetapkan
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Untuk menjalankan contoh massal, tentukan Kelas Utamanya:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. Objek CosmosAsyncClient diinisialisasi dengan menggunakan pernyataan berikut:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. Sampel membuat database dan kontainer asinkron. Sampel kemudian membuat beberapa dokumen tempat operasi massal akan dijalankan. Sampel menambahkan dokumen-dokumen ini ke objek aliran reaktif Flux<Family>:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. Sampel berisi metode untuk membuat, upsert, mengganti, dan menghapus secara massal. Dalam setiap metode, kita memetakan dokumen keluarga di aliran Flux<Family> BulkWriter ke beberapa panggilan metode di CosmosBulkOperations. Operasi ini ditambahkan ke objek aliran reaktif lainnya Flux<CosmosItemOperation>. Aliran kemudian diteruskan ke metode executeBulkOperations dari asinkron container yang kita buat di awal, dan operasi dijalankan secara massal. Lihat metode pembuatan massal di bawah ini sebagai contoh:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Terdapat juga kelas BulkWriter.java dalam direktori yang sama dengan aplikasi contoh. Kelas ini menunjukkan cara menangani kesalahan pembatasan laju (429) dan batas waktu (408) yang mungkin terjadi selama eksekusi massal, dan mencoba kembali operasi tersebut secara efektif. Ini diimplementasikan dalam metode di bawah ini, juga menunjukkan cara menerapkan pengendalian throughput lokal dan global.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Selain itu, terdapat metode pembuatan massal dalam contoh, yang menggambarkan cara menambahkan pemrosesan respons, dan mengatur opsi eksekusi:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

    Tips performa

    Pertimbangkan poin berikut untuk kinerja yang lebih baik saat menggunakan pustaka pelaksana massal:

    • Untuk performa terbaik, jalankan aplikasi Anda dari Azure VM di wilayah yang sama dengan wilayah tulis akun Azure Cosmos DB Anda.

    • Untuk mencapai throughput yang lebih tinggi:

      • Set ukuran timbunan JVM ke jumlah yang cukup besar untuk menghindari masalah memori dalam menangani sejumlah besar dokumen. Ukuran tumpukan yang disarankan: maks(3 GB, 3 * ukuran (semua dokumen diteruskan ke API impor massal dalam satu batch)).
      • Ada waktu praproses, karena itu Anda akan mendapatkan throughput yang lebih tinggi ketika melakukan operasi massal dengan dokumen dalam jumlah besar. Jadi, jika Anda ingin mengimpor 10.000.000 dokumen, menjalankan impor massal 10 kali pada 10 dokumen massal masing-masing ukuran 1.000.000 lebih disukai daripada menjalankan impor massal 100 kali pada 100 dokumen massal masing-masing ukuran 100.000 dokumen.
    • Disarankan untuk membuat instans satu objek CosmosAsyncClient untuk seluruh aplikasi dalam satu komputer virtual yang sesuai dengan kontainer Azure Cosmos DB tertentu.

    • Karena pelaksanaan API operasi massal tunggal mengkonsumsi sebagian besar CPU dan IO jaringan mesin klien. Ini terjadi dengan menelurkan beberapa tugas secara internal, menghindari menelurkan beberapa tugas bersamaan dalam proses aplikasi Anda setiap menjalankan panggilan API operasi massal. Jika panggilan API operasi massal tunggal yang berjalan pada satu mesin virtual tidak dapat menggunakan seluruh throughput kontainer Anda (jika throughput kontainer Anda > 1 juta RU/dtk), sebaiknya buat mesin virtual terpisah untuk menjalankan panggilan API operasi massal secara bersamaan.

    Langkah berikutnya