Migrasi dari pustaka eksekutor massal ke dukungan massal di Azure Cosmos DB .NET V3 SDK
BERLAKU UNTUK: NoSQL
Artikel ini menjelaskan langkah-langkah yang diperlukan untuk memigrasikan kode aplikasi yang ada yang menggunakan pustaka eksekutor massal .NET ke fitur dukungan massal dalam versi terbaru .NET SDK.
Mengaktifkan dukungan massal
Aktifkan dukungan massal pada instans CosmosClient
melalui konfigurasi AllowBulkExecution:
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Membuat Tugas untuk setiap operasi
Dukungan massal dalam .NET SDK berfungsi dengan memanfaatkan Pustaka Paralel Tugas dan operasi pengelompokan yang terjadi secara bersamaan.
Tidak ada satu metode pun dalam SDK yang akan mengambil daftar dokumen atau operasi Anda sebagai parameter input, melainkan, Anda perlu membuat Tugas untuk setiap operasi yang ingin Anda jalankan secara massal, lalu cukup tunggu hingga selesai.
Misalnya, jika input awal Anda adalah daftar item di mana setiap item memiliki skema berikut:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Jika Anda ingin melakukan impor massal (mirip dengan penggunaan BulkExecutor.BulkImportAsync), Anda harus melakukan panggilan bersamaan ke CreateItemAsync
. Contohnya:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Jika Anda ingin melakukan pembaruan massal (mirip dengan penggunaan BulkExecutor.BulkUpdateAsync), Anda harus melakukan panggilan bersamaan ke metode ReplaceItemAsync
setelah memperbarui nilai item. Contohnya:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
Lalu jika Anda ingin melakukan penghapusan massal (mirip dengan penggunaan BulkExecutor.BulkDeleteAsync), Anda harus melakukan panggilan bersamaan ke DeleteItemAsync
, dengan id
dan partisi setiap item. Contohnya:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Menangkap status hasil tugas
Dalam contoh kode sebelumnya, kami telah membuat daftar tugas bersamaan, dan memanggil metode CaptureOperationResponse
pada setiap tugas tersebut. Metode ini adalah ekstensi yang memungkinkan kita mempertahankan skema respons yang mirip BulkExecutor, dengan menangkap kesalahan dan melacak penggunaan unit permintaan.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
Di mana OperationResponse
dinyatakan sebagai:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Menjalankan operasi secara bersamaan
Untuk melacak cakupan seluruh daftar Tugas, kami menggunakan kelas pembantu ini:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
Metode ExecuteAsync
ini akan menunggu hingga semua operasi selesai dan Anda dapat menggunakannya seperti:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Menangkap statistik
Kode sebelumnya menunggu hingga semua operasi selesai dan menghitung statistik yang diperlukan. Statistik ini mirip dengan yang ada pada BulkImportResponse pustaka eksekutor massal.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
BulkOperationResponse
berisi:
- Total waktu yang diambil untuk memproses daftar operasi melalui dukungan massal.
- Jumlah operasi yang berhasil.
- Total unit permintaan yang digunakan.
- Jika terjadi kegagalan, ini akan menampilkan daftar tuple yang berisi pengecualian dan item terkait untuk tujuan pencatatan dan identifikasi.
Mencoba ulang konfigurasi
Pustaka eksekutor massal memiliki panduan yang menyebutkan untuk mengatur MaxRetryWaitTimeInSeconds
dan MaxRetryAttemptsOnThrottledRequests
pada RetryOptions ke 0
untuk mendelegasikan kontrol ke pustaka.
Untuk dukungan massal di .NET SDK, tidak ada perilaku tersembunyi. Anda dapat mengonfigurasi opsi coba lagi secara langsung melalui CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests dan CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Catatan
Apabila unit permintaan yang disediakan jauh lebih rendah dari yang diharapkan berdasarkan jumlah data, Anda mungkin ingin mempertimbangkan pengaturan ini ke nilai tinggi. Operasi massal akan memakan waktu lebih lama tetapi memiliki peluang lebih tinggi untuk sepenuhnya berhasil karena percobaan ulang yang lebih tinggi.
Peningkatan performa
Seperti halnya operasi lain dengan .NET SDK, penggunaan API stream menghasilkan performa yang lebih baik dan menghindari serialisasi yang tidak perlu.
Penggunaan API stream hanya dimungkinkan jika sifat data yang Anda gunakan cocok dengan aliran byte (misalnya, aliran file). Dalam kasus tersebut, penggunaan metode CreateItemStreamAsync
, ReplaceItemStreamAsync
, atau DeleteItemStreamAsync
dan bekerja dengan ResponseMessage
(bukannya ItemResponse
) akan meningkatkan throughput yang dapat dicapai.
Langkah berikutnya
- Untuk mempelajari selengkapnya tentang rilis .NET SDK, lihat artikel Azure Cosmos DB SDK.
- Dapatkan kode sumber migrasi lengkap dari GitHub.
- Sampel massal tambahan di GitHub
- Mencoba melakukan perencanaan kapasitas untuk migrasi ke Azure Cosmos DB?
- Jika Anda hanya mengetahui jumlah vcore dan server di kluster database yang ada, baca tentang memperkirakan unit permintaan menggunakan vCore atau vCPU
- Jika Anda mengetahui rasio permintaan umum untuk beban kerja database Anda saat ini, baca memperkirakan unit permintaan menggunakan perencana kapasitas Azure Cosmos DB