Usar a biblioteca .NET do executor em massa para executar operações em massa no Azure Cosmos DB

APLICA-SE A: NoSQL

Nota

A biblioteca de executores em massa descrita neste artigo é mantida para aplicativos que usam a versão do .NET SDK 2.x. Para novos aplicativos, você pode usar o suporte em massa que está diretamente disponível com o .NET SDK versão 3.x e não requer nenhuma biblioteca externa.

Se você usa atualmente a biblioteca de executores em massa e planeja migrar para o suporte em massa no SDK mais recente, use as etapas no Guia de migração para migrar seu aplicativo.

Este tutorial fornece instruções sobre como usar a biblioteca .NET do executor em massa para importar e atualizar documentos para um contêiner do Azure Cosmos DB. Para saber mais sobre a biblioteca de executores em massa e como ela ajuda você a usar a taxa de transferência e o armazenamento massivos, consulte a visão geral da biblioteca de executores em massa do Azure Cosmos DB. Neste tutorial, você verá um aplicativo .NET de exemplo em que importa em massa documentos gerados aleatoriamente para um contêiner do Azure Cosmos DB. Depois de importar os dados, a biblioteca mostra como você pode atualizar em massa os dados importados especificando patches como operações a serem executadas em campos de documento específicos.

Atualmente, a biblioteca de executores em massa é suportada apenas pelo Azure Cosmos DB para NoSQL e pela API para contas Gremlin. Este artigo descreve como usar a biblioteca .NET do executor em massa com a API para contas NoSQL. Para saber como usar a biblioteca .NET do executor em massa com a API para contas Gremlin, consulte Ingerir dados em massa no Azure Cosmos DB para Gremlin usando uma biblioteca de executor em massa.

Pré-requisitos

Clonar a aplicação de exemplo

Agora vamos mudar para trabalhar com código baixando um aplicativo .NET de exemplo do GitHub. Este aplicativo executa operações em massa nos dados armazenados em sua conta do Azure Cosmos DB. Para clonar o aplicativo, abra um prompt de comando, navegue até o diretório onde deseja copiá-lo e execute o seguinte comando:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

O repositório clonado contém dois exemplos, BulkImportSample e BulkUpdateSample. Você pode abrir qualquer um dos aplicativos de exemplo, atualizar as cadeias de conexão no arquivo App.config com as cadeias de conexão da sua conta do Azure Cosmos DB, criar a solução e executá-la.

O aplicativo BulkImportSample gera documentos aleatórios e os importa em massa para sua conta do Azure Cosmos DB. O aplicativo BulkUpdateSample atualiza em massa os documentos importados especificando patches como operações a serem executadas em campos de documento específicos. Nas próximas seções, você revisará o código em cada um desses aplicativos de exemplo.

Importar dados em massa para uma conta do Azure Cosmos DB

  1. Navegue até a pasta BulkImportSample e abra o arquivo BulkImportSample.sln.

  2. As cadeias de conexão do Azure Cosmos DB são recuperadas do arquivo App.config, conforme mostrado no código a seguir:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    O importador em massa cria um novo banco de dados e um contêiner com o nome do banco de dados, o nome do contêiner e os valores de taxa de transferência especificados no arquivo App.config.

  3. Em seguida, o objeto DocumentClient é inicializado com o modo de conexão TCP direta:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. O objeto BulkExecutor é inicializado com um alto valor de repetição para tempo de espera e solicitações limitadas. Em seguida, eles são definidos como 0 para passar o controle de congestionamento para o BulkExecutor por sua vida útil.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. O aplicativo invoca a API BulkImportAsync . A biblioteca .NET fornece duas sobrecargas da API de importação em massa — uma que aceita uma lista de documentos JSON serializados e outra que aceita uma lista de documentos POCO desserializados. Para saber mais sobre as definições de cada um desses métodos sobrecarregados, consulte a documentação da API.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    O método BulkImportAsync aceita os seguintes parâmetros:

    Parâmetro Descrição
    enableUpsert Um sinalizador para habilitar operações de upsert nos documentos. Se já existir um documento com o ID fornecido, ele será atualizado. Por padrão, ele é definido como false.
    disableAutomaticIdGeneration Um sinalizador para desativar a geração automática de ID. Por padrão, ele é definido como true.
    maxConcurrencyPerPartitionKeyRange O grau máximo de simultaneidade por intervalo de chaves de partição. A configuração como null faz com que a biblioteca use um valor padrão de 20.
    maxInMemorySortingBatchSize O número máximo de documentos que são extraídos do enumerador de documentos, que é passado para a chamada de API em cada estágio. Para a fase de classificação na memória que acontece antes da importação em massa. Definir esse parâmetro como null faz com que a biblioteca use o valor mínimo padrão (documents.count, 1000000).
    cancellationToken O token de cancelamento para sair graciosamente da operação de importação em massa.

Definição de objeto de resposta de importação em massa
O resultado da chamada de API de importação em massa contém os seguintes atributos:

Parâmetro Descrição
NumberOfDocumentsImported (longo) O número total de documentos que foram importados com êxito do total de documentos fornecidos para a chamada de API de importação em massa.
TotalRequestUnitsConsumed (duplo) O total de unidades de solicitação (RU) consumidas pela chamada de API de importação em massa.
TotalTimeTaken (TimeSpan) O tempo total necessário pela chamada de API de importação em massa para concluir a execução.
BadInputDocuments (objeto> List<) A lista de documentos de formato incorreto que não foram importados com êxito na chamada de API de importação em massa. Corrija os documentos retornados e tente importar novamente. Documentos com formatação incorreta incluem documentos cujo valor de ID não é uma cadeia de caracteres (nulo ou qualquer outro tipo de dados é considerado inválido).

Atualizar dados em massa na sua conta do Azure Cosmos DB

Você pode atualizar documentos existentes usando a API BulkUpdateAsync . Neste exemplo, você define o campo como um novo valor e remove o NameDescription campo dos documentos existentes. Para obter o conjunto completo de operações de atualização suportadas, consulte a documentação da API.

  1. Navegue até a pasta BulkUpdateSample e abra o arquivo BulkUpdateSample.sln.

  2. Defina os itens de atualização juntamente com as operações de atualização de campo correspondentes. Neste exemplo, você usa SetUpdateOperation para atualizar o campo e UnsetUpdateOperation para remover o NameDescription campo de todos os documentos. Você também pode executar outras operações, como incrementar um campo de documento por um valor específico, enviar valores específicos para um campo de matriz ou remover um valor específico de um campo de matriz. Para saber mais sobre os diferentes métodos fornecidos pela API de atualização em massa, consulte a documentação da API.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. O aplicativo invoca a API BulkUpdateAsync . Para saber mais sobre a definição do método BulkUpdateAsync, consulte a documentação da API.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    O método BulkUpdateAsync aceita os seguintes parâmetros:

    Parâmetro Descrição
    maxConcurrencyPerPartitionKeyRange O grau máximo de simultaneidade por intervalo de chaves de partição. Definir esse parâmetro como null faz com que a biblioteca use o valor padrão(20).
    maxInMemorySortingBatchSize O número máximo de itens de atualização extraídos do enumerador de itens de atualização passado para a chamada de API em cada estágio. Para a fase de classificação na memória que acontece antes da atualização em massa. Definir esse parâmetro como null faz com que a biblioteca use o valor mínimo padrão (updateItems.count, 1000000).
    cancellationToken O token de cancelamento para sair normalmente da operação de atualização em massa.

Definição de objeto de resposta de atualização em massa
O resultado da chamada de API de atualização em massa contém os seguintes atributos:

Parâmetro Descrição
NúmerodeDocumentosAtualizado (longo) O número de documentos que foram atualizados com êxito do total de documentos fornecidos para a chamada de API de atualização em massa.
TotalRequestUnitsConsumed (duplo) O total de unidades de solicitação (RUs) consumidas pela chamada de API de atualização em massa.
TotalTimeTaken (TimeSpan) O tempo total necessário pela chamada de API de atualização em massa para concluir a execução.

Sugestões de desempenho

Considere os seguintes pontos para obter um melhor desempenho ao usar a biblioteca de executores em massa:

  • Para obter o melhor desempenho, execute seu aplicativo a partir de uma máquina virtual do Azure que esteja na mesma região da região de gravação da sua conta do Azure Cosmos DB.

  • É recomendável que você instancie um único objeto BulkExecutor para todo o aplicativo em uma única máquina virtual que corresponda a um contêiner específico do Azure Cosmos DB.

  • Uma única execução de API de operação em massa consome uma grande parte da CPU e da E/S da rede da máquina cliente ao gerar várias tarefas internamente. Evite gerar várias tarefas simultâneas em seu processo de aplicativo que executam chamadas de API de operação em massa. Se uma única chamada de API de operação em massa em execução em uma única máquina virtual não puder consumir a taxa de transferência de todo o contêiner (se a taxa > de transferência do contêiner for de 1 milhão de RU/s), é preferível criar máquinas virtuais separadas para executar simultaneamente as chamadas de API de operação em massa.

  • Verifique se o método é invocado depois de instanciar um objeto BulkExecutor para buscar o InitializeAsync() mapa de partição do contêiner do Azure Cosmos DB de destino.

  • No App.config do seu aplicativo, verifique se o gcServer está habilitado para um melhor desempenho

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • A biblioteca emite rastreamentos que podem ser coletados em um arquivo de log ou no console. Para habilitar ambos, adicione o seguinte código ao arquivo App.Config do seu aplicativo:

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

Próximos passos