你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
针对 Azure Cosmos DB 数据执行批量操作
适用范围: NoSQL
在本教程中,可了解如何在 Azure Cosmos DB Java V4 SDK 上执行批量操作。 此版本的 SDK 附带内置批量执行工具库。 如果使用旧版 Java SDK,建议迁移到最新版本。 Azure Cosmos DB Java V4 SDK 是当前建议用于 Java 批量支持的解决方案。
目前,批量执行工具库仅受 Azure Cosmos DB for NoSQL 和 API for Gremlin 帐户支持。 若要了解如何将批量执行工具 .NET 库与 API for Gremlin 配合使用,请参阅在 Azure Cosmos DB for Gremlin 中执行批量操作。
先决条件
如果没有 Azure 订阅,请在开始之前创建一个免费帐户。
无需 Azure 订阅即可免费试用 Azure Cosmos DB,也无需缴纳费用或承诺金。 或者,可以通过
https://localhost:8081
终结点使用 Azure Cosmos DB 仿真器。 对请求进行身份验证中提供了主密钥。-
在 Ubuntu 上运行
apt-get install default-jdk
,以便安装 JDK。请确保设置 JAVA_HOME 环境变量,使之指向在其中安装了 JDK 的文件夹。
-
- 在 Ubuntu 上,可以通过运行
apt-get install maven
来安装 Maven。
- 在 Ubuntu 上,可以通过运行
使用 Java 快速入门文章的创建数据库帐户部分所述的步骤创建 Azure Cosmos DB for NoSQL 帐户。
克隆示例应用程序
现在,让我们从 GitHub 下载适用于 Java V4 SDK for Azure Cosmos DB 的通用示例存储库来接着处理代码。 这些示例应用程序对 Azure Cosmos DB 执行 CRUD 操作和其他常见操作。 若要克隆该存储库,请打开命令提示符,导航到要将该应用程序复制到的目录,然后运行以下命令:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples
克隆的存储库在 /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async
文件夹中包含示例 SampleBulkQuickStartAsync.java
。 应用程序生成文档并执行操作,来批量创建、更新插入、替换和删除 Azure Cosmos DB 中的项。 在后续部分,我们将查看示例应用中的代码。
Azure Cosmos DB 中的批量执行
- Azure Cosmos DB 的连接字符串将作为参数读取,并分配到在 /
examples/common/AccountSettings.java
文件中定义的变量。 必须设置这些环境变量
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key
若要运行批量示例,请指定其 Main 类:
com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
使用以下语句初始化
CosmosAsyncClient
对象:client = new CosmosClientBuilder() .endpoint(AccountSettings.HOST) .key(AccountSettings.MASTER_KEY) .preferredRegions(preferredRegions) .contentResponseOnWriteEnabled(true) .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
该示例创建了一个异步数据库和容器。 然后,它会创建多个要对其执行批量操作的文档。 它将这些文档添加到
Flux<Family>
反应式流对象中:Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); Family johnsonFamilyItem = Families.getJohnsonFamilyItem(); Family smithFamilyItem = Families.getSmithFamilyItem(); // Setup family items to create Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
该示例包含用于批量创建、更新插入、替换和删除的方法。 在每个方法中,我们将 BulkWriter
Flux<Family>
流中的系列文档映射到CosmosBulkOperations
中的多个方法调用。 这些操作将添加到另一个反应式流对象Flux<CosmosItemOperation>
。 然后,流会传递给我们在开始时创建的异步container
的executeBulkOperations
方法,并批量执行操作。 以下面的批量创建方法为例:private void bulkCreateItems(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).blockLast(); }
示例应用程序所在的目录中也有一个
BulkWriter.java
类。 此类演示了如何处理批量执行期间可能发生的速率限制 (429) 和超时 (408) 错误,以及如何有效地重试这些操作。 此方法在以下方法中实现,还显示如何实现本地和全局吞吐量控制。private void bulkUpsertItemsWithBulkWriterAbstraction() { Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() { ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group1") .setTargetThroughput(200) .build(); container.enableLocalThroughputControlGroup(groupConfig); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() { String controlContainerId = "throughputControlContainer"; CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId); database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block(); ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group-" + UUID.randomUUID()) .setTargetThroughput(200) .build(); GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId) .setControlItemRenewInterval(Duration.ofSeconds(5)) .setControlItemExpireInterval(Duration.ofSeconds(20)) .build(); container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig); CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); requestOptions.setThroughputControlGroupName(groupConfig.getGroupName()); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); }
此外,示例中还有批量创建方法,演示如何添加响应处理和设置执行选项:
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> { CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation(); if (cosmosBulkOperationResponse.getException() != null) { logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); } else if (cosmosBulkItemResponse == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) { logger.error( "The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " + "successfully with " + "a" + " {} response code.", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName(), cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a"); } else { logger.info( "Item ID: [{}] Item PartitionKey Value: [{}]", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName()); logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode()); logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge()); } if (cosmosBulkItemResponse == null) { return Mono.error(new IllegalStateException("No response retrieved.")); } else { return Mono.just(cosmosBulkItemResponse); } }).blockLast(); } private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) { CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); // The default value for maxMicroBatchConcurrency is 1. // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage. // // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not. // When the RU has already been under saturation, increasing the concurrency will not help the situation, // rather it may cause more 429 and request timeout. bulkExecutionOptions.setMaxMicroBatchConcurrency(2); Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast(); }
性能提示
使用 Bulk Executor 库时,请注意以下几点,以获得更好的性能:
为获得最佳性能,请在 Azure VM 中运行应用程序,该 Azure VM 所处区域与 Azure Cosmos DB 帐户写入区域相同。
为实现更高的吞吐量:
- 请将 JVM 的堆大小设为足够大的数字,以免在处理大量文档时出现任何内存问题。 建议的堆大小:max(3GB, 3 * sizeof(在一个批中传递给批量导入 API 的文档总数))。
- 会有一段预处理时间,因此,在对大量的文档执行批量操作时可以获得更高的吞吐量。 如果想要导入 10,000,000 个文档,针对 10 批文档(每个批的大小为 1,000,000)运行批量导入 10 次,比针对 100 批文档(每个批的大小为 100,000 个文档)运行批量导入 100 次会更有利。
建议在单个虚拟机中,为整个应用程序实例化对应于特定 Azure Cosmos DB 容器的单个 CosmosAsyncClient 对象。
原因是单个批量操作 API 执行会消耗客户端计算机的大量 CPU 和网络 IO。 而发生这种情况的原因是在内部生成了多个任务,因此,每次执行批量操作 API 调用时,请避免在应用程序进程中生成多个并发任务。 如果单个虚拟机上运行的单个批量操作 API 调用无法占用整个容器的吞吐量(如果容器吞吐量超过 100 万 RU/秒),最好是创建独立的虚拟机来并发执行批量操作 API 调用。
后续步骤
- 有关批量执行程序功能的概述,请参阅批量执行程序概述。