Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этой статье описаны необходимые действия по переносу существующего кода приложения, использующего библиотеку массового исполнителя Java в функцию массовой поддержки в последней версии пакета SDK для Java.
Включение массовой поддержки
Чтобы использовать массовую поддержку в пакете SDK для Java, включите импорт здесь:
import com.azure.cosmos.models.*;
Добавление документов в реактивный поток
Поддержка групповых операций в Java SDK версии 4 осуществляется путем добавления документов в объект реактивного потока. Например, можно добавить каждый документ по отдельности:
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();
// Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
Кроме того, вы можете добавить документы в поток из списка с помощью fromIterable:
class SampleDoc {
public SampleDoc() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
private String id="";
}
List<SampleDoc> docList = new ArrayList<>();
for (int i = 1; i <= 5; i++){
SampleDoc doc = new SampleDoc();
String id = "id-"+i;
doc.setId(id);
docList.add(doc);
}
Flux<SampleDoc> docs = Flux.fromIterable(docList);
Если вы хотите выполнить массовое создание или обновление элементов (аналогично использованию DocumentBulkExecutor.importAll), необходимо передать реактивный поток методу, как показано в следующем примере:
private void bulkUpsertItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getUpsertItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
Вы также можете использовать метод, как показано здесь, но этот метод используется только для создания элементов:
private void bulkCreateItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
Метод DocumentBulkExecutor.importAll в старой библиотеке BulkExecutor также использовался для массового обновления элементов. Старый метод DocumentBulkExecutor.mergeAll также использовался для исправления, но только для set типа операции исправления. Чтобы выполнить массовые операции исправления в пакете SDK версии 4, сначала необходимо создать операции исправления:
CosmosPatchOperations patchOps = CosmosPatchOperations.create().add("/country", "United States")
.set("/registered", 0);
Затем вы можете передать операции вместе с реактивным потоком документов в метод, как это показано здесь. В этом примере мы применяем как типы операций исправления add, так и set. Полный набор поддерживаемых типов операций исправления можно найти здесь в нашем обзоре частичного обновления документов в Azure Cosmos DB.
private void bulkPatchItems(Flux<Family> families, CosmosPatchOperations operations) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getPatchItemOperation(family.getId(), new PartitionKey(family.getLastName()), operations));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
Замечание
В предыдущем примере мы применяем add и set к элементам для исправления, для которых существует корневой родитель. Однако вы не можете выполнить эту операцию здесь, так как корневой родительский элемент не существует. Это связано с тем, что частичное обновление документа Azure Cosmos DB вдохновлено обновлением JSON Patch RFC 6902. Если требуется исправление там, где корневого родителя нет, сначала прочитайте полные документы, а затем используйте такой метод для замены документов:
private void bulkReplaceItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getReplaceItemOperation(family.getId(), family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
Если вы хотите выполнить массовое удаление (аналогично использованию DocumentBulkExecutor.deleteAll), необходимо использовать массовое удаление:
private void bulkDeleteItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations
.getDeleteItemOperation(family.getId(), new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
Повторные попытки, время ожидания и управление пропускной способностью
Поддержка массовой обработки в Java SDK версии 4 не поддерживает встроенную обработку повторных попыток и таймаутов. Вы можете ознакомиться с руководством в Bulk Executor - Java библиотека, в котором содержится пример, реализующий абстракцию для корректного управления повторными попытками и временными интервалами ожидания. В этом примере также приведены примеры для локального и глобального контроля пропускной способности. Вы также можете ознакомиться с разделом должно ли мое приложение повторять попытки при ошибках для получения дополнительных рекомендаций по различным типам ошибок, которые могут возникнуть, и лучших практик для обработки повторных попыток.
Дальнейшие действия
- Массовые примеры на GitHub
- Пытаетесь составить план мощностей для миграции в Azure Cosmos DB?
- Если вам известно только количество виртуальных ядер и серверов в существующем кластере баз данных, ознакомьтесь с информацией об оценке единиц запросов на основе виртуальных ядер или виртуальных процессоров.
- Если вы знаете типичные частоты запросов для текущей рабочей нагрузки базы данных, ознакомьтесь с оценкой единиц запросов с помощью планировщика емкости Azure Cosmos DB.