Gunakan pustaka .NET eksekutor massal untuk melakukan operasi massal di Microsoft Azure Cosmos DB

BERLAKU UNTUK: NoSQL

Catatan

Pustaka pelaksana massal yang dijelaskan dalam artikel ini dipertahankan untuk aplikasi yang menggunakan versi .NET SDK 2.x. Untuk aplikasi baru, Anda dapat menggunakan dukungan massal yang tersedia langsung dengan .NET SDK versi 3.x, dan tidak memerlukan pustaka eksternal apa pun.

Jika saat ini Anda menggunakan pustaka pelaksana massal dan berencana untuk bermigrasi ke dukungan massal pada SDK yang lebih baru, gunakan langkah-langkah dalam panduan Migrasi untuk memigrasikan aplikasi Anda.

Tutorial ini memberikan instruksi tentang cara menggunakan pustaka .NET pelaksana massal untuk mengimpor dan memperbarui dokumen ke kontainer Azure Cosmos DB. Untuk mempelajari tentang pustaka pelaksana massal dan caranya membantu Anda menggunakan throughput dan penyimpanan besar-besaran, lihat gambaran umum pustaka pelaksana massal Azure Cosmos DB. Dalam tutorial ini, Anda akan melihat contoh aplikasi .NET di mana mengimpor dokumen yang dihasilkan secara acak ke dalam kontainer Azure Cosmos DB. Setelah Anda mengimpor data, pustaka menunjukkan kepada Anda bagaimana Anda dapat memperbarui data yang diimpor secara massal dengan menentukan patch sebagai operasi untuk dilakukan pada bidang dokumen tertentu.

Saat ini, pustaka pelaksana massal didukung oleh Azure Cosmos DB untuk NoSQL dan API hanya untuk akun Gremlin. Artikel ini menjelaskan cara menggunakan pustaka .NET pelaksana massal dengan API untuk akun NoSQL. Untuk mempelajari cara menggunakan pustaka .NET pelaksana massal dengan API untuk akun Gremlin, lihat Menyerap data secara massal di Azure Cosmos DB untuk Gremlin dengan menggunakan pustaka pelaksana massal.

Prasyarat

Buat klon sampel aplikasi

Sekarang mari kita beralih bekerja dengan kode dengan mengunduh sampel aplikasi .NET dari GitHub. Aplikasi ini melakukan operasi massal pada data yang disimpan di akun Azure Cosmos DB Anda. Untuk mengkloning aplikasi, buka prompt perintah, navigasikan ke direktori tempat Anda ingin menyalinnya, dan jalankan perintah berikut:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

Repositori kloning berisi dua sampel, BulkImportSample dan BulkUpdateSample. Anda dapat membuka salah satu aplikasi sampel, memperbarui string koneksi dalam file App.config dengan string koneksi akun Azure Cosmos DB Anda, membangun solusi, dan menjalankannya.

Aplikasi BulkImportSample menghasilkan dokumen acak dan mengimpornya secara massal ke akun Azure Cosmos DB Anda. Aplikasi BulkUpdateSample memperbarui dokumen yang diimpor secara massal dengan menentukan patch sebagai operasi untuk dilakukan pada bidang dokumen tertentu. Di bagian berikutnya, Anda akan mengulas kode di masing-masing aplikasi sampel ini.

Mengimpor data secara massal ke akun Azure Cosmos DB

  1. Navigasi ke folder BulkImportSample dan buka file BulkImportSample.sln .

  2. String koneksi Microsoft Azure Cosmos DB diambil dari file App.config seperti yang ditunjukkan dalam kode berikut:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    Importir massal membuat database baru dan kontainer dengan nama database, nama kontainer, dan nilai throughput yang ditentukan dalam file App.config tertentu.

  3. Selanjutnya, objek DocumentClient diinisialisasi dengan mode koneksi TCP Langsung:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. Objek BulkExecutor diinisialisasi dengan nilai coba lagi tinggi untuk waktu tunggu dan permintaan yang dibatasi. Kemudian mereka diatur ke 0 untuk meneruskan kontrol kemacetan ke BulkExecutor selama masa pakainya.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. Aplikasi memanggil API BulkImportAsync . Pustaka .NET menyediakan dua kelebihan beban API impor massal—yang menerima daftar dokumen JSON berseri dan yang lainnya menerima daftar dokumen POCO yang dideserialisasi. Untuk mempelajari lebih lanjut tentang definisi dari masing-masing metode yang kelebihan beban ini, lihat dokumentasi API.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    Metode BulkImportAsync menerima parameter berikut:

    Parameter Keterangan
    enableUpsert Bendera untuk mengaktifkan operasi upsert pada dokumen. Jika dokumen dengan ID yang diberikan sudah ada, dokumen akan diperbarui. Secara default, ini diatur ke false.
    disableAutomaticIdGeneration Bendera untuk menonaktifkan ID generasi otomatis. Secara default, ini diatur ke true.
    maxConcurrencyPerPartitionKeyRange Derajat konkurensi maksimum per rentang kunci partisi. Pengaturan ke null menyebabkan pustaka menggunakan nilai default 20.
    maxInMemorySortingBatchSize Jumlah maksimum dokumen yang ditarik dari enumerator dokumen, yang diteruskan ke panggilan API di setiap tahap. Untuk fase pengurutan dalam memori yang terjadi sebelum mengimpor secara massal. Mengatur parameter ini ke null menyebabkan pustaka menggunakan nilai minimum default (documents.count, 1000000).
    cancellationToken Token pembatalan untuk keluar dari operasi impor massal dengan anggun.

Definisi objek respons impor massal
Hasil panggilan API impor massal berisi atribut berikut:

Parameter Keterangan
NumberOfDocumentsImported (panjang) Jumlah total dokumen yang berhasil diimpor dari total dokumen yang diberikan ke panggilan API impor massal.
TotalRequestUnitsConsumed (ganda) Total unit permintaan (RU) yang dikonsumsi oleh panggilan API impor massal.
TotalTimeTaken (Rentang Waktu) Total waktu yang diambil oleh panggilan API impor massal untuk menyelesaikan eksekusi.
BadInputDocuments (Objek daftar><) Daftar dokumen berformat buruk yang tidak berhasil diimpor dalam panggilan API impor massal. Perbaiki dokumen yang dikembalikan dan coba lagi impor. Dokumen berformat buruk menyertakan dokumen yang nilai ID-nya bukan string (null atau jenis data lainnya dianggap tidak valid).

Memperbarui data secara massal di akun Azure Cosmos DB Anda

Anda dapat memperbarui dokumen yang ada dengan menggunakan API BulkUpdateAsync . Dalam contoh ini, Anda mengatur bidang ke Name nilai baru dan menghapus Description bidang dari dokumen yang sudah ada. Untuk set lengkap operasi pembaruan yang didukung, lihat dokumentasi API.

  1. Navigasi ke folder BulkUpdateSample dan buka file BulkUpdateSample.sln .

  2. Tentukan item pembaruan bersama dengan operasi pembaruan bidang terkait. Dalam contoh ini, Anda menggunakan SetUpdateOperation untuk memperbarui Name bidang dan UnsetUpdateOperation untuk menghapus Description bidang dari semua dokumen. Anda juga dapat melakukan operasi lain seperti menambahkan bidang dokumen dengan nilai tertentu, mendorong nilai tertentu ke bidang array, atau menghapus nilai tertentu dari bidang array. Untuk mempelajari berbagai metode yang disediakan oleh API pembaruan massal, lihat dokumentasi API.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. Aplikasi memanggil API BulkUpdateAsync . Untuk mempelajari tentang definisi metode BulkUpdateAsync , lihat dokumentasi API.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    Metode BulkUpdateAsync menerima parameter berikut:

    Parameter Keterangan
    maxConcurrencyPerPartitionKeyRange Derajat konkurensi maksimum per rentang kunci partisi. Mengatur parameter ini ke null membuat pustaka menggunakan nilai default(20).
    maxInMemorySortingBatchSize Jumlah maksimum item pembaruan yang diambil dari pencacah item pembaruan diteruskan ke panggilan API di setiap tahap. Untuk fase pengurutan dalam memori yang terjadi sebelum pembaruan massal. Mengatur parameter ini ke null menyebabkan pustaka menggunakan nilai minimum default (updateItems.count, 1000000).
    cancellationToken Token pembatalan untuk keluar dari operasi pembaruan massal dengan anggun.

Memperbarui definisi objek respons secara massal
Hasil panggilan API pembaruan massal berisi atribut berikut:

Parameter Keterangan
NumberOfDocumentsUpdated (panjang) Jumlah dokumen yang berhasil diperbarui dari total dokumen yang diberikan ke panggilan API pembaruan massal.
TotalRequestUnitsConsumed (ganda) Total unit permintaan (RUs) yang digunakan oleh panggilan API pembaruan massal.
TotalTimeTaken (Rentang Waktu) Total waktu yang diambil oleh panggilan API pembaruan massal untuk menyelesaikan eksekusi.

Tips performa

Pertimbangkan poin-poin berikut untuk performa yang lebih baik saat Anda menggunakan pustaka pelaksana massal:

  • Untuk performa terbaik, jalankan aplikasi Anda dari komputer virtual Azure yang berada di wilayah yang sama dengan wilayah tulis akun Azure Cosmos DB Anda.

  • Disarankan agar Anda membuat instans satu objek BulkExecutor untuk seluruh aplikasi dalam satu komputer virtual yang sesuai dengan kontainer Azure Cosmos DB tertentu.

  • Eksekusi API operasi massal tunggal mengonsumsi sebagian besar CPU komputer klien dan IO jaringan saat menelurkan beberapa tugas secara internal. Hindari menelurkan beberapa tugas bersamaan dalam proses aplikasi Anda yang menjalankan panggilan API operasi massal. Jika satu panggilan API operasi massal yang berjalan pada satu komputer virtual tidak dapat mengonsumsi seluruh throughput kontainer (jika throughput > kontainer Anda 1 juta RU/s), lebih disukai untuk membuat komputer virtual terpisah untuk menjalankan panggilan API operasi massal secara bersamaan.

  • InitializeAsync() Pastikan metode dipanggil setelah membuat instans objek BulkExecutor untuk mengambil peta partisi kontainer Azure Cosmos DB target.

  • Dalam aplikasi Anda App.Config, pastikan gcServer diaktifkan untuk performa yang lebih baik

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • Pustaka memancarkan jejak yang dapat dikumpulkan baik ke dalam file log atau di konsol. Untuk mengaktifkan keduanya, tambahkan kode berikut ke file App.Config aplikasi Anda:

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

Langkah berikutnya