Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Замечание
Библиотека массового исполнителя, описанная в этой статье, поддерживается для приложений, использующих версию пакета SDK для .NET 2.x. Для новых приложений можно использовать массовую поддержку , доступную непосредственно в пакете SDK для .NET версии 3.x, и для нее нет внешней библиотеки.
Если в настоящее время вы используете библиотеку массового исполнителя и планируете выполнить миграцию на массовую поддержку в новом пакете SDK, выполните действия, описанные в руководстве по миграции, чтобы перенести приложение.
В этом руководстве приводятся инструкции по использованию библиотеки массового исполнителя .NET для импорта и обновления документов в контейнер Azure Cosmos DB. Сведения о библиотеке массового исполнителя и о том, как она помогает использовать массовую пропускную способность и хранилище, см. в обзоре библиотеки массового исполнителя Azure Cosmos DB. В этом руководстве показано, как пример приложения .NET осуществляет массовый импорт случайно сгенерированных документов в контейнер Azure Cosmos DB. После импорта данных библиотека показывает, как можно массово обновлять импортированные данные, указывая исправления в качестве операций для выполнения в определенных полях документов.
В настоящее время библиотека массового исполнителя поддерживается только для учетных записей Azure Cosmos DB, использующих NoSQL и API для Gremlin. В этой статье описывается, как использовать библиотеку .NET массового исполнителя с api для учетных записей NoSQL. Чтобы узнать, как использовать библиотеку .NET массового исполнителя через API для учетных записей Gremlin, см. сведения о поглощении данных в Azure Cosmos DB для Gremlin с помощью библиотеки массового исполнителя.
Предпосылки
Visual Studio последней версии с рабочей нагрузкой разработки для Azure. Вы можете начать с бесплатной среды разработки Visual Studio Community. При установке Visual Studio включите рабочую нагрузку разработки для Azure.
Если у вас нет подписки на Azure, создайте бесплатную учетную запись перед началом. Вы также можете установить и использовать эмулятор Azure Cosmos DB для локальной разработки и тестирования с конечной
https://localhost:8081точкой. Первичный ключ предоставляется в разделе Выполнение проверки подлинности запросов.Создайте учетную запись Azure Cosmos DB для NoSQL, используя шаги, описанные в разделе "Создание учетной записи Azure Cosmos DB" в кратком руководстве: клиентская библиотека Azure Cosmos DB для NoSQL для .NET.
Клонирование примера приложения
Теперь перейдем к работе с кодом, скачав пример приложения .NET из GitHub. Это приложение выполняет массовые операции с данными, хранящимися в учетной записи Azure Cosmos DB. Чтобы клонировать приложение, откройте командную строку, перейдите в каталог, в котором нужно скопировать его, и выполните следующую команду:
git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git
Клонированный репозиторий содержит два примера: BulkImportSample и BulkUpdateSample. Вы можете открыть любое из примеров приложений, обновить строки подключения в файле App.config с помощью строк подключения вашей учетной записи Azure Cosmos DB, собрать решение и запустить его.
Приложение BulkImportSample создает случайные документы и выполняет массовый импорт их в учетную запись Azure Cosmos DB. Приложение BulkUpdateSample массово обновляет импортированные документы, указывая исправления в качестве операций для выполнения в определенных полях документов. В следующих разделах вы рассмотрите код в каждом из этих примеров приложений.
Массовый импорт данных в учетную запись Azure Cosmos DB
Перейдите в папку BulkImportSample и откройте файл BulkImportSample.sln .
Строки подключения Azure Cosmos DB извлечены из файла App.config, как показано в следующем коде:
private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"]; private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"]; private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"]; private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"]; private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);Средство массового импорта создает базу данных и коллекцию с именем базы данных, именем контейнера и значениями пропускной способности, указанными в файле App.config.
Затем объект DocumentClient инициализируется с режимом подключения Direct TCP:
ConnectionPolicy connectionPolicy = new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp }; DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey, connectionPolicy)Объект BulkExecutor инициализирован с высоким значением повторных попыток для времени ожидания и регулирования запросов. Затем для них задано значение 0, чтобы передать управление перегрузкой в BulkExecutor на всё время его работы.
// Set retry options high during initialization (default values). client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9; IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection); await bulkExecutor.InitializeAsync(); // Set retries to 0 to pass complete control to bulk executor. client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;Приложение вызывает API BulkImportAsync . Библиотека .NET предоставляет две перегрузки API массового импорта: одну, которая принимает список сериализованных документов JSON, и другую — список десериализированных документов POCO. Дополнительные сведения об определениях каждого из этих перегруженных методов см. в документации по API.
BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync( documents: documentsToImportInBatch, enableUpsert: true, disableAutomaticIdGeneration: true, maxConcurrencyPerPartitionKeyRange: null, maxInMemorySortingBatchSize: null, cancellationToken: token);Метод BulkImportAsync принимает следующие параметры:
Parameter Описание enableUpsert Флаг для включения операций вставки или обновления в документы. Если документ с заданным идентификатором уже существует, он обновляется. Значение по умолчанию — false. disableAutomaticIdGeneration Параметр, отключающий автоматическое создание идентификатора. Значение по умолчанию — истина. maxConcurrencyPerPartitionKeyRange Максимальная степень параллелизма для одного диапазона ключей партиции. Значение NULL приводит к тому, что библиотека будет использовать значение по умолчанию 20. Максимальный размер пакета для сортировки в памяти Максимальное количество документов, выведенных из перечислителя документов, которое передается вызову API на каждом этапе. Для этапа сортировки в памяти, который происходит перед массовым импортом. Если этот параметр имеет значение NULL, библиотека будет использовать минимальное значение по умолчанию (documents.count, 1000000). токен отмены Токен отмены для плавного выхода из операции массового импорта.
Определение объекта ответа массового импорта
Результат вызова API массового импорта содержит следующие атрибуты:
| Parameter | Описание |
|---|---|
| NumberOfDocumentsImported (длинное целое) | Общее число документов, которые были успешно импортированы из общего количества документов, предоставленных для вызова API массового импорта. |
| Общее количество использованных запросных единиц (double) | Общее число единиц запросов (ЕЗ), потребляемых вызовом API массового импорта. |
| TotalTimeTaken (TimeSpan) | Общее время, затрачиваемое на выполнение вызова API массового импорта. |
| BadInputDocuments (List< объектов>) | Список документов с неверным форматом, которые не были успешно импортированы при вызове API массового импорта. Необходимо исправить возвращенные документы и повторить импорт. Документы с неверным форматом включают в себя документы, значение идентификаторов которых не является строкой (NULL или любой другой тип данных считается недействительным). |
Массовое обновление данных в учетной записи Azure Cosmos DB
Существующие документы можно обновить с помощью API BulkUpdateAsync . В этом примере вы задайте Name для поля новое значение и удалите Description поле из существующих документов. Полный набор поддерживаемых операций обновления см. в документации по API.
Перейдите в папку BulkUpdateSample и откройте файл BulkUpdateSample.sln .
Определите элементы для обновления вместе с соответствующими операциями обновления полей. В этом примере вы используете SetUpdateOperation для обновления поля и
Name, чтобы удалитьDescriptionполе из всех документов. Вы также можете выполнять другие операции, такие как увеличение поля документа по определенному значению, отправка определенных значений в поле массива или удаление определенного значения из поля массива. Дополнительные сведения о различных методах, предоставляемых API массового обновления, см. в документации по API.SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc"); UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description"); List<UpdateOperation> updateOperations = new List<UpdateOperation>(); updateOperations.Add(nameUpdate); updateOperations.Add(descriptionUpdate); List<UpdateItem> updateItems = new List<UpdateItem>(); for (int i = 0; i < 10; i++) { updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations)); }Приложение вызывает API BulkUpdateAsync . Сведения об определении метода BulkUpdateAsync см. в документации по API.
BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync( updateItems: updateItems, maxConcurrencyPerPartitionKeyRange: null, maxInMemorySortingBatchSize: null, cancellationToken: token);Метод BulkUpdateAsync принимает следующие параметры:
Parameter Описание maxConcurrencyPerPartitionKeyRange Максимальная степень параллелизма для одного диапазона ключей партиции. Если этот параметр имеет значение NULL, библиотека использует значение по умолчанию (20). Максимальный размер пакета для сортировки в памяти Максимальное число элементов, извлеченных из перечислителя обновляемых элементов, переданного в вызов API на каждом этапе. Для этапа сортировки в памяти, который происходит перед массовым обновлением. Установка этого параметра на значение NULL приводит к тому, что библиотека использует минимальное значение по умолчанию (updateItems.count, 1000000). токен отмены Токен отмены для плавного выхода из операции массового обновления.
Определение объекта ответа массового обновления
Результат вызова API массового обновления содержит следующие атрибуты:
| Parameter | Описание |
|---|---|
| NumberOfDocumentsUpdated (длинное целое число) | Общее число документов, которые были успешно обновлены из общего количества документов, предоставленных для вызова API массового обновления. |
| Общее количество использованных запросных единиц (double) | Общее число единиц запросов (ЕЗ), потребляемых вызовом API массового обновления. |
| TotalTimeTaken (TimeSpan) | Общее время, затрачиваемое на выполнение вызова API массового обновления. |
Советы по производительности
При использовании библиотеки массового исполнителя следует учитывать следующие моменты для повышения производительности.
Для повышения производительности запустите приложение из виртуальной машины Azure, которая находится в том же регионе, что и область записи учетной записи Azure Cosmos DB.
Рекомендуется создать экземпляр одного объекта BulkExecutor для всего приложения в пределах одной виртуальной машины, соответствующей конкретному контейнеру Azure Cosmos DB.
Одно выполнение API пакетных операций потребляет значительную часть ресурсов ЦП и сетевого ввода-вывода клиентского компьютера при запуске нескольких внутренних задач. Не запускайте множество параллельных задач в процессе вашего приложения, выполняющих вызовы API массовых операций. Если один вызов API массовой операции, выполняющийся на одной виртуальной машине, не может использовать пропускную способность всего контейнера (если пропускная способность > контейнера составляет 1 миллион ЕЗ/с), рекомендуется создать отдельные виртуальные машины для параллельного выполнения вызовов API массовой операции.
Убедитесь, что метод
InitializeAsync()вызывается после создания экземпляра объекта BulkExecutor, чтобы получить карту секционирования целевого контейнера Azure Cosmos DB.Убедитесь, что в файле App.Config приложения включен параметр gcServer для обеспечения лучшей производительности.
<runtime> <gcServer enabled="true" /> </runtime>Библиотека создает трассировки, которые могут записываться в файл журнала или на консоль. Чтобы включить оба, добавьте следующий код в файл App.Config приложения:
<system.diagnostics> <trace autoflush="false" indentsize="4"> <listeners> <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" /> <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" /> </listeners> </trace> </system.diagnostics>