Migreren van de bulkexecutorbibliotheek naar de bulkondersteuning in Azure Cosmos DB .NET V3 SDK
VAN TOEPASSING OP: NoSQL
In dit artikel worden de vereiste stappen beschreven voor het migreren van de code van een bestaande toepassing die gebruikmaakt van de .NET bulkexecutorbibliotheek naar de functie voor bulkondersteuning in de nieuwste versie van de .NET SDK.
Bulkondersteuning inschakelen
Schakel bulkondersteuning voor het CosmosClient
exemplaar in via de AllowBulkExecution-configuratie :
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Taken maken voor elke bewerking
Bulkondersteuning in de .NET SDK werkt door gebruik te maken van de taakparallelbibliotheek en groeperingsbewerkingen die gelijktijdig plaatsvinden.
Er is geen enkele methode in de SDK die uw lijst met documenten of bewerkingen als invoerparameter gebruikt, maar u moet een taak maken voor elke bewerking die u bulksgewijs wilt uitvoeren en vervolgens gewoon wachten totdat ze zijn voltooid.
Als uw eerste invoer bijvoorbeeld een lijst is met items waarvan elk item het volgende schema heeft:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Als u bulkimport wilt uitvoeren (vergelijkbaar met het gebruik van BulkExecutor.BulkImportAsync), moet u gelijktijdige aanroepen naar CreateItemAsync
. Voorbeeld:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Als u bulksgewijs wilt bijwerken (vergelijkbaar met het gebruik van BulkExecutor.BulkUpdateAsync), moet u gelijktijdige aanroepen naar de ReplaceItemAsync
methode hebben na het bijwerken van de itemwaarde. Voorbeeld:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
En als u bulksgewijs wilt verwijderen (vergelijkbaar met het gebruik van BulkExecutor.BulkDeleteAsync), moet u gelijktijdige aanroepen naar DeleteItemAsync
, met de id
en partitiesleutel van elk item. Voorbeeld:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Status van taakresultaat vastleggen
In de vorige codevoorbeelden hebben we een gelijktijdige lijst met taken gemaakt en de CaptureOperationResponse
methode voor elk van deze taken aangeroepen. Deze methode is een extensie waarmee we een vergelijkbaar antwoordschema kunnen onderhouden als BulkExecutor door fouten vast te leggen en het gebruik van aanvraageenheden bij te houden.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
Waar de OperationResponse
wordt gedeclareerd als:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Gelijktijdige bewerkingen uitvoeren
Als u het bereik van de volledige lijst met taken wilt bijhouden, gebruiken we deze helperklasse:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
De ExecuteAsync
methode wacht totdat alle bewerkingen zijn voltooid en u kunt deze als volgt gebruiken:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Statistieken vastleggen
De vorige code wacht totdat alle bewerkingen zijn voltooid en berekent de vereiste statistieken. Deze statistieken zijn vergelijkbaar met die van de bulkexecutorbibliotheek BulkImportResponse.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
De BulkOperationResponse
volgende bevat:
- De totale tijd die nodig is om de lijst met bewerkingen te verwerken via bulkondersteuning.
- Het aantal geslaagde bewerkingen.
- Het totale aantal verbruikte aanvraageenheden.
- Als er fouten optreden, wordt er een lijst weergegeven met tuples die de uitzondering en het bijbehorende item bevatten voor logboekregistratie en identificatiedoeleinden.
Configuratie opnieuw proberen
Bulkexecutorbibliotheek had richtlijnen die vermeldden om het MaxRetryWaitTimeInSeconds
en MaxRetryAttemptsOnThrottledRequests
van RetryOptions in te stellen om 0
het beheer aan de bibliotheek te delegeren.
Voor bulkondersteuning in de .NET SDK is er geen verborgen gedrag. U kunt de opties voor opnieuw proberen rechtstreeks configureren via cosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests en CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Notitie
In gevallen waarin de ingerichte aanvraageenheden veel lager zijn dan verwacht op basis van de hoeveelheid gegevens, kunt u overwegen deze in te stellen op hoge waarden. De bulkbewerking duurt langer, maar heeft een hogere kans om volledig te slagen vanwege de hogere nieuwe pogingen.
Prestatieverbeteringen
Net als bij andere bewerkingen met de .NET SDK leidt het gebruik van de stream-API's tot betere prestaties en voorkomt u onnodige serialisatie.
Het gebruik van stream-API's is alleen mogelijk als de aard van de gegevens die u gebruikt overeenkomt met die van een stroom van bytes (bijvoorbeeld bestandsstromen). In dergelijke gevallen verhoogt het gebruik van de CreateItemStreamAsync
, ReplaceItemStreamAsync
of DeleteItemStreamAsync
methoden en het werken met ResponseMessage
(in plaats van ItemResponse
) de doorvoer die kan worden bereikt.
Volgende stappen
- Zie het artikel over de Azure Cosmos DB SDK voor meer informatie over de .NET SDK-releases .
- Haal de volledige migratiebroncode op uit GitHub.
- Aanvullende bulkvoorbeelden op GitHub
- Wilt u capaciteitsplanning uitvoeren voor een migratie naar Azure Cosmos DB?
- Als alles wat u weet het aantal vcores en servers in uw bestaande databasecluster is, leest u meer over het schatten van aanvraageenheden met behulp van vCores of vCPU's
- Als u typische aanvraagtarieven voor uw huidige databaseworkload kent, leest u meer over het schatten van aanvraageenheden met behulp van azure Cosmos DB-capaciteitsplanner