你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用 .NET SDK 将数据批量导入 Azure Cosmos DB for NoSQL 帐户

适用范围: NoSQL

本教程演示如何生成用于优化将数据导入到 Azure Cosmos DB 所需的预配吞吐量 (RU/s) 的 .NET 控制台应用程序。

在本文中,你将读取示例数据源中的数据,并将其导入到 Azure Cosmos DB 容器中。 本教程使用 3.0+ 版 Azure Cosmos DB .NET SDK,后者以 .NET Framework 或 .NET Core 为目标。

本教程涉及:

  • 创建 Azure Cosmos DB 帐户
  • 配置项目
  • 连接到启用了批量支持的 Azure Cosmos DB 帐户
  • 通过并发创建操作执行数据导入

先决条件

在按照本文中的说明操作之前,请确保具备以下资源:

步骤 1:创建 Azure Cosmos DB 帐户

从 Azure 门户创建 Azure Cosmos DB for NoSQL 帐户或者可以通过使用 Azure Cosmos DB 模拟器来创建帐户。

步骤 2:设置 .NET 项目

从本地计算机打开 Windows 命令提示符或终端窗口。 你将从命令提示符或终端运行接下来的部分中的所有命令。 运行以下 dotnet 新命令,创建名为“bulk-import-demo” 的新应用。

dotnet new console -n bulk-import-demo

将目录更改为新创建的应用文件夹。 可使用以下代码生成应用程序:

cd bulk-import-demo
dotnet build

内部版本的预期输出应如下所示:

Restore completed in 100.37 ms for C:\Users\user1\Downloads\CosmosDB_Samples\bulk-import-demo\bulk-import-demo.csproj.
  bulk -> C:\Users\user1\Downloads\CosmosDB_Samples\bulk-import-demo \bin\Debug\netcoreapp2.2\bulk-import-demo.dll

Build succeeded.
    0 Warning(s)
    0 Error(s)

Time Elapsed 00:00:34.17

步骤 3:添加 Azure Cosmos DB 包

当仍在应用程序目录中时,使用 DotNet 添加包命令安装适用于 .NET 的 Azure Cosmos DB 客户端库。

dotnet add package Microsoft.Azure.Cosmos

步骤 4:获取 Azure Cosmos DB 帐户凭据

此示例应用程序需对 Azure Cosmos DB 帐户进行身份验证。 为了进行身份验证,应将 Azure Cosmos DB 帐户凭据传递给应用程序。 按照以下步骤获取 Azure Cosmos DB 帐户凭据:

  1. 登录 Azure 门户
  2. 导航到 Azure Cosmos DB 帐户。
  3. 打开“键”窗格,复制帐户的 URI 和主键 。

如果使用 Azure Cosmos DB 模拟器,请获取本文中的模拟器凭据

步骤 5:初始化具有批量执行支持的 CosmosClient 对象

在代码编辑器中打开生成的 Program.cs 文件。 你将创建启用了批量执行的 CosmosClient 的新实例,并使用该实例对 Azure Cosmos DB 执行操作。

首先,让我们覆盖默认 Main 方法,并定义全局变量。 这些全局变量将包含终结点和授权密钥、数据库的名称、你将创建的容器,以及将批量插入的项数。 确保根据环境替换终结点 URL 和授权密钥值。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;

public class Program
{
     private const string EndpointUrl = "https://<your-account>.documents.azure.com:443/";
     private const string AuthorizationKey = "<your-account-key>";
     private const string DatabaseName = "bulk-tutorial";
     private const string ContainerName = "items";
     private const int AmountToInsert = 300000;

     static async Task Main(string[] args)
     {

     }
}

Main 方法中,添加以下代码以初始化 CosmosClient 对象:

CosmosClient cosmosClient = new CosmosClient(EndpointUrl, AuthorizationKey, new CosmosClientOptions() { AllowBulkExecution = true });

注意

CosmosClientOptions 中指定批量执行后,它们实际上在 CosmosClient 的生存期内不可变。 更改值将不起作用。

启用批量执行后,CosmosClient 在内部将并发操作分组为单个服务调用。 通过这种方式,可以通过跨分区分发服务调用来优化吞吐量利用率,最后将各个结果分配给原始调用方。

然后可以创建一个容器来存储所有项。 将 /pk 定义为分区键,将 50000 RU/s 定义为预配的吞吐量,并定义将排除所有字段以优化写入吞吐量的自定义索引策略。 在 CosmosClient 初始化语句后面添加以下代码:

Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(Program.DatabaseName);

await database.DefineContainer(Program.ContainerName, "/pk")
        .WithIndexingPolicy()
            .WithIndexingMode(IndexingMode.Consistent)
            .WithIncludedPaths()
                .Attach()
            .WithExcludedPaths()
                .Path("/*")
                .Attach()
        .Attach()
    .CreateAsync(50000);

步骤 6:填充并发任务的列表

若要利用批量执行支持,请基于数据源和要执行的操作创建异步任务的列表,并使用 Task.WhenAll 并发执行它们。 首先,让我们使用“假”数据从数据模型生成项目列表。 在实际应用程序中,项目会来自所需的数据源。

首先,使用 dotnet add package 命令将假包添加到解决方案中。

dotnet add package Bogus

定义要保存的项的定义。 需要在 Program.cs 文件中定义 Item 类:

public class Item
{
    public string id {get;set;}
    public string pk {get;set;}

    public string username{get;set;}
}

接下来,在 Program 类中创建一个 helper 函数。 此 helper 函数将获取你定义用于插入的项数并生成随机数据:

private static IReadOnlyCollection<Item> GetItemsToInsert()
{
    return new Bogus.Faker<Item>()
    .StrictMode(true)
    //Generate item
    .RuleFor(o => o.id, f => Guid.NewGuid().ToString()) //id
    .RuleFor(o => o.username, f => f.Internet.UserName())
    .RuleFor(o => o.pk, (f, o) => o.id) //partitionkey
    .Generate(AmountToInsert);
}

使用帮助程序函数初始化要使用的文档列表:

IReadOnlyCollection<Item> itemsToInsert = Program.GetItemsToInsert();

接下来,使用文档列表创建并发任务并填充任务列表以将项插入到容器中。 若要执行此操作,请将以下代码添加到 Program 类:

Container container = database.GetContainer(ContainerName);
List<Task> tasks = new List<Task>(AmountToInsert);
foreach (Item item in itemsToInsert)
{
    tasks.Add(container.CreateItemAsync(item, new PartitionKey(item.pk))
        .ContinueWith(itemResponse =>
        {
            if (!itemResponse.IsCompletedSuccessfully)
            {
                AggregateException innerExceptions = itemResponse.Exception.Flatten();
                if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
                {
                    Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
                }
                else
                {
                    Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
                }
            }
        }));
}

// Wait until all are done
await Task.WhenAll(tasks);

所有这些并发点操作将一起执行(批量),如简介部分中所述。

步骤 7:运行示例

若要运行示例,只需使用 dotnet 命令即可:

dotnet run

获取完整示例

如果没有时间完成本教程中的步骤,或者只需下载代码示例,则可从 GitHub 获取。

克隆项目后,请确保在 Program.cs 中更新所需的凭据。

可以通过更改为存储库目录并使用 dotnet 来运行示例:

cd cosmos-dotnet-bulk-import-throughput-optimizer
dotnet run

后续步骤

在本教程中,我们已完成以下步骤:

  • 创建 Azure Cosmos DB 帐户
  • 配置项目
  • 连接到启用了批量支持的 Azure Cosmos DB 帐户
  • 通过并发创建操作执行数据导入

你现在可以继续学习下一个教程:

尝试为迁移到 Azure Cosmos DB 进行容量计划? 可以使用有关现有数据库群集的信息进行容量规划。