建立交易式批次作業時,從容器執行個體開始進行,並呼叫 CreateTransactionalBatch:
PartitionKey partitionKey = new PartitionKey("road-bikes");
TransactionalBatch batch = container.CreateTransactionalBatch(partitionKey);
接下來,將多個作業新增至批次:
Product bike = new (
id: "68719520766",
category: "road-bikes",
name: "Chropen Road Bike"
);
batch.CreateItem<Product>(bike);
Part part = new (
id: "68719519885",
category: "road-bikes",
name: "Tronosuros Tire",
productId: bike.id
);
batch.CreateItem<Part>(part);
最後,在批次上呼叫 ExecuteAsync:
using TransactionalBatchResponse response = await batch.ExecuteAsync();
一旦收到回應,請檢查回應是否成功。 如果回應指出成功,請擷取結果:
if (response.IsSuccessStatusCode)
{
TransactionalBatchOperationResult<Product> productResponse;
productResponse = response.GetOperationResultAtIndex<Product>(0);
Product productResult = productResponse.Resource;
TransactionalBatchOperationResult<Part> partResponse;
partResponse = response.GetOperationResultAtIndex<Part>(1);
Part partResult = partResponse.Resource;
}
重要
如果失敗,失敗的作業會有其對應錯誤的狀態碼。 所有其他作業都會有 424 狀態碼 (失敗的相依性)。 如果作業因嘗試建立已存在的項目而失敗,則會傳回狀態碼 409 (衝突)。 狀態碼會啟用一個,以識別交易失敗的原因。
建立交易式批次作業時,請呼叫 CosmosBatch.createCosmosBatch:
PartitionKey partitionKey = new PartitionKey("road-bikes");
CosmosBatch batch = CosmosBatch.createCosmosBatch(partitionKey);
接下來,將多個作業新增至批次:
Product bike = new Product();
bike.setId("68719520766");
bike.setCategory("road-bikes");
bike.setName("Chropen Road Bike");
batch.createItemOperation(bike);
Part part = new Part();
part.setId("68719519885");
part.setCategory("road-bikes");
part.setName("Tronosuros Tire");
part.setProductId(bike.getId());
batch.createItemOperation(part);
最後,使用容器執行個體以透過批次呼叫 executeCosmosBatch:
CosmosBatchResponse response = container.executeCosmosBatch(batch);
一旦收到回應,請檢查回應是否成功。 如果回應指出成功,請擷取結果:
if (response.isSuccessStatusCode())
{
List<CosmosBatchOperationResult> results = response.getResults();
}
重要
如果失敗,失敗的作業會有其對應錯誤的狀態碼。 所有其他作業都會有 424 狀態碼 (失敗的相依性)。 如果作業因嘗試建立已存在的項目而失敗,則會傳回狀態碼 409 (衝突)。 狀態碼會啟用一個,以識別交易失敗的原因。
取得或建立容器實例:
container = database.create_container_if_not_exists(id="batch_container",
partition_key=PartitionKey(path='/category'))
在 Python 中,交易式 Batch 作業看起來與單一作業 API 非常類似,而且是包含的 Tuple(operation_type_string、args_tuple、batch_operation_kwargs_dictionary)。 以下是將用來示範批次作業功能的範例專案:
create_demo_item = {
"id": "68719520766",
"category": "road-bikes",
"name": "Chropen Road Bike"
}
# for demo, assume that this item already exists in the container.
# the item id will be used for read operation in the batch
read_demo_item1 = {
"id": "68719519884",
"category": "road-bikes",
"name": "Tronosuros Tire",
"productId": "68719520766"
}
# for demo, assume that this item already exists in the container.
# the item id will be used for read operation in the batch
read_demo_item2 = {
"id": "68719519886",
"category": "road-bikes",
"name": "Tronosuros Tire",
"productId": "68719520766"
}
# for demo, assume that this item already exists in the container.
# the item id will be used for read operation in the batch
read_demo_item3 = {
"id": "68719519887",
"category": "road-bikes",
"name": "Tronosuros Tire",
"productId": "68719520766"
}
# for demo, we'll upsert the item with id 68719519885
upsert_demo_item = {
"id": "68719519885",
"category": "road-bikes",
"name": "Tronosuros Tire Upserted",
"productId": "68719520768"
}
# for replace demo, we'll replace the read_demo_item2 with this item
replace_demo_item = {
"id": "68719519886",
"category": "road-bikes",
"name": "Tronosuros Tire replaced",
"productId": "68719520769"
}
# for replace with etag match demo, we'll replace the read_demo_item3 with this item
# The use of etags and if-match/if-none-match options allows users to run conditional replace operations
# based on the etag value passed. When using if-match, the request will only succeed if the item's latest etag
# matches the passed in value. For more on optimistic concurrency control, see the link below:
# https://learn.microsoft.com/azure/cosmos-db/nosql/database-transactions-optimistic-concurrency
replace_demo_item_if_match_operation = {
"id": "68719519887",
"category": "road-bikes",
"name": "Tronosuros Tireh",
"wasReplaced": "Replaced based on etag match"
"productId": "68719520769"
}
準備要新增至批次的作業:
create_item_operation = ("create", (create_demo_item,), {})
read_item_operation = ("read", ("68719519884",), {})
delete_item_operation = ("delete", ("68719519885",), {})
upsert_item_operation = ("upsert", (upsert_demo_item,), {})
replace_item_operation = ("replace", ("68719519886", replace_demo_item), {})
replace_item_if_match_operation = ("replace",
("68719519887", replace_demo_item_if_match_operation),
{"if_match_etag": container.client_connection.last_response_headers.get("etag")})
將作業新增至批次:
batch_operations = [
create_item_operation,
read_item_operation,
delete_item_operation,
upsert_item_operation,
replace_item_operation,
replace_item_if_match_operation
]
最後,執行批次:
try:
# Run that list of operations
batch_results = container.execute_item_batch(batch_operations=batch_operations, partition_key="road_bikes")
# Batch results are returned as a list of item operation results - or raise a CosmosBatchOperationError if
# one of the operations failed within your batch request.
print("\nResults for the batch operations: {}\n".format(batch_results))
except exceptions.CosmosBatchOperationError as e:
error_operation_index = e.error_index
error_operation_response = e.operation_responses[error_operation_index]
error_operation = batch_operations[error_operation_index]
print("\nError operation: {}, error operation response: {}\n".format(error_operation, error_operation_response))
# [END handle_batch_error]
請注意,在批次中使用修補作業和replace_if_match_etag作業
批次作業 kwargs 字典有限,而且只接受三個不同的索引鍵值。 在想要在批次中使用條件式修補的情況下,可以使用filter_predicate密鑰進行修補作業,或想要搭配任何作業使用 etag 時,也可以使用if_match_etag/if_none_match_etag密鑰。
batch_operations = [
("replace", (item_id, item_body), {"if_match_etag": etag}),
("patch", (item_id, operations), {"filter_predicate": filter_predicate, "if_none_match_etag": etag}),
]
如果失敗,失敗的作業會有其對應錯誤的狀態碼。 所有其他作業都會有 424 狀態碼 (失敗的相依性)。 如果作業因嘗試建立已存在的項目而失敗,則會傳回狀態碼 409 (衝突)。 狀態碼會啟用一個,以識別交易失敗的原因。
交易式批次作業的執行方式
執行交易式批次時,交易式批次中的所有作業都會分組、串行化為單一承載,並以單一要求傳送至 Azure Cosmos DB 服務。
服務會接收要求並執行交易式範圍內的所有作業,並使用相同的序列化通訊協定傳回回應。 此回應可能是成功或失敗,並會根據作業提供個別的作業回應。
SDK 會為您公開回應以驗證結果,並選擇性地擷取每個內部作業結果。
限制
目前,有兩個已知限制:
- Azure Cosmos DB 要求大小限制會將交易式批次承載的大小限制為不超過 2 MB,運行時間上限為 5 秒。
- 每個交易式批次目前有100個作業的限制,以確保效能如預期般,且在SLA內。
下一步