在 Azure Cosmos DB 中管理衝突解決方案原則
適用於:NoSQL
使用多重區域寫入時,如果有多個用戶端寫入至同一個項目,便可能會發生衝突。 發生衝突時,您可使用不同的衝突解決原則來解決衝突。 本文說明如何管理衝突解決原則。
提示
您只能在建立容器時指定衝突解決原則,而且無法在建立容器之後修改。
建立最後寫入為準衝突解決原則
這些範例示範如何設定最後寫入為準衝突解決原則的容器。 最後寫入為準的預設路徑是時間戳記欄位或 _ts
屬性。 針對適用於 NoSQL 的 API,這也可設定為數值類型的使用者定義路徑。 發生衝突時,會以最大值為準。 如果未設定路徑或路徑無效,便會預設為 _ts
。 已使用此原則解決的衝突將不會出現在衝突摘要中。 此原則可供所有 API 使用。
.NET SDK
DocumentCollection lwwCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
{
Id = this.lwwCollectionName,
ConflictResolutionPolicy = new ConflictResolutionPolicy
{
Mode = ConflictResolutionMode.LastWriterWins,
ConflictResolutionPath = "/myCustomId",
},
});
Java V4 SDK
Java SDK V4 (Maven com.azure::azure-cosmos) 非同步 API
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");
CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();
Java V2 SDK
Async Java V2 SDK (Maven com.microsoft.azure::azure-cosmosdb)
DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();
Node.js/JavaScript/TypeScript SDK
const database = client.database(this.databaseName);
const { container: lwwContainer } = await database.containers.createIfNotExists(
{
id: this.lwwContainerName,
conflictResolutionPolicy: {
mode: "LastWriterWins",
conflictResolutionPath: "/myCustomId"
}
}
);
Python SDK
database = client.get_database_client(database=database_id)
lww_conflict_resolution_policy = {'mode': 'LastWriterWins', 'conflictResolutionPath': '/regionId'}
lww_container = database.create_container(id=lww_container_id, partition_key=PartitionKey(path="/id"),
conflict_resolution_policy=lww_conflict_resolution_policy)
使用預存程序建立自訂衝突解決原則
這些範例示範如何設定自訂衝突解決原則的容器。 此原則會使用預存程序中的邏輯來解決衝突。 如果指定了預存程序來解決衝突,除非指定的預存程序中發生錯誤,否則衝突不會顯示在衝突摘要中。
使用容器建立原則之後,您必須建立預存程序。 以下 .NET SDK 範例顯示此工作流程的範例。 只有適用於 NoSQL 的 API 才支援此原則。
自訂衝突解決預存程序範例
您必須使用如下所示的函式簽章來實作自訂衝突解決預存程序。 函式名稱不需要符合向容器註冊預存程序時所使用的名稱,但符合的話可簡化命名程序。 下面會說明必須為此預存程序實作的參數。
- incomingItem:在產生衝突的認可中,所要插入或更新的項目。 若為刪除作業,此參數為 null。
- existingItem目前認可的項目。 在更新中,此值為非 Null,若為插入或刪除作業,則為 Null。
- isTombstone:布林值,會指出 incomingItem 是否與先前刪除的項目衝突。 若為 true,則 existingItem 也是 null。
- conflictingItems:容器中所有項目的認可版本所構成的陣列,這些項目與 incomingItem 的識別碼或任何其他唯一的索引屬性有所衝突。
重要
和任何預存程序一樣,自訂衝突解決程序可以存取任何具有相同分割索引鍵的資料,並可執行任何插入、更新或刪除作業來解決衝突。
這個預存程序範例會藉由從 /myCustomId
路徑選取最低值來解決衝突。
function resolver(incomingItem, existingItem, isTombstone, conflictingItems) {
var collection = getContext().getCollection();
if (!incomingItem) {
if (existingItem) {
collection.deleteDocument(existingItem._self, {}, function (err, responseOptions) {
if (err) throw err;
});
}
} else if (isTombstone) {
// delete always wins.
} else {
if (existingItem) {
if (incomingItem.myCustomId > existingItem.myCustomId) {
return; // existing item wins
}
}
var i;
for (i = 0; i < conflictingItems.length; i++) {
if (incomingItem.myCustomId > conflictingItems[i].myCustomId) {
return; // existing conflict item wins
}
}
// incoming item wins - clear conflicts and replace existing with incoming.
tryDelete(conflictingItems, incomingItem, existingItem);
}
function tryDelete(documents, incoming, existing) {
if (documents.length > 0) {
collection.deleteDocument(documents[0]._self, {}, function (err, responseOptions) {
if (err) throw err;
documents.shift();
tryDelete(documents, incoming, existing);
});
} else if (existing) {
collection.replaceDocument(existing._self, incoming,
function (err, documentCreated) {
if (err) throw err;
});
} else {
collection.createDocument(collection.getSelfLink(), incoming,
function (err, documentCreated) {
if (err) throw err;
});
}
}
}
.NET SDK
DocumentCollection udpCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
{
Id = this.udpCollectionName,
ConflictResolutionPolicy = new ConflictResolutionPolicy
{
Mode = ConflictResolutionMode.Custom,
ConflictResolutionProcedure = string.Format("dbs/{0}/colls/{1}/sprocs/{2}", this.databaseName, this.udpCollectionName, "resolver"),
},
});
//Create the stored procedure
await clients[0].CreateStoredProcedureAsync(
UriFactory.CreateStoredProcedureUri(this.databaseName, this.udpCollectionName, "resolver"), new StoredProcedure
{
Id = "resolver",
Body = File.ReadAllText(@"resolver.js")
});
Java V4 SDK
Java SDK V4 (Maven com.azure::azure-cosmos) 非同步 API
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");
CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();
Java V2 SDK
Async Java V2 SDK (Maven com.microsoft.azure::azure-cosmosdb)
DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();
建立容器之後,您必須建立 resolver
預存程序。
Node.js/JavaScript/TypeScript SDK
const database = client.database(this.databaseName);
const { container: udpContainer } = await database.containers.createIfNotExists(
{
id: this.udpContainerName,
conflictResolutionPolicy: {
mode: "Custom",
conflictResolutionProcedure: `dbs/${this.databaseName}/colls/${
this.udpContainerName
}/sprocs/resolver`
}
}
);
建立容器之後,您必須建立 resolver
預存程序。
Python SDK
database = client.get_database_client(database=database_id)
udp_custom_resolution_policy = {'mode': 'Custom' }
udp_container = database.create_container(id=udp_container_id, partition_key=PartitionKey(path="/id"),
conflict_resolution_policy=udp_custom_resolution_policy)
建立容器之後,您必須建立 resolver
預存程序。
建立自訂衝突解決原則
這些範例示範如何設定自訂衝突解決原則的容器。 透過此實作,每個衝突都會顯示在衝突摘要中。 您必須個別處理衝突摘要中的衝突。
.NET SDK
DocumentCollection manualCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
{
Id = this.manualCollectionName,
ConflictResolutionPolicy = new ConflictResolutionPolicy
{
Mode = ConflictResolutionMode.Custom,
},
});
Java V4 SDK
Java SDK V4 (Maven com.azure::azure-cosmos) 非同步 API
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();
CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();
Java V2 SDK
Async Java V2 SDK (Maven com.microsoft.azure::azure-cosmosdb)
DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();
Node.js/JavaScript/TypeScript SDK
const database = client.database(this.databaseName);
const {
container: manualContainer
} = await database.containers.createIfNotExists({
id: this.manualContainerName,
conflictResolutionPolicy: {
mode: "Custom"
}
});
Python SDK
database = client.get_database_client(database=database_id)
manual_resolution_policy = {'mode': 'Custom'}
manual_container = database.create_container(id=manual_container_id, partition_key=PartitionKey(path="/id"),
conflict_resolution_policy=manual_resolution_policy)
從衝突摘要讀取
這些範例示範如何從容器的衝突摘要讀取。 衝突只會基於幾個原因顯示在衝突摘要中:
- 衝突未自動解決
- 衝突導致指定的預存程序發生錯誤
- 衝突解決原則已設定為 [自訂],而且未指定預存程序來處理衝突
.NET SDK
FeedResponse<Conflict> conflicts = await delClient.ReadConflictFeedAsync(this.collectionUri);
Java SDK
Java V4 SDK (Maven com.azure::azure-cosmos)
int requestPageSize = 3;
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
CosmosPagedFlux<CosmosConflictProperties> conflictReadFeedFlux = container.readAllConflicts(options);
conflictReadFeedFlux.byPage(requestPageSize).toIterable().forEach(page -> {
int expectedNumberOfConflicts = 0;
int numberOfResults = 0;
Iterator<CosmosConflictProperties> pageIt = page.getElements().iterator();
while (pageIt.hasNext()) {
CosmosConflictProperties conflictProperties = pageIt.next();
// Read the conflict and committed item
CosmosAsyncConflict conflict = container.getConflict(conflictProperties.getId());
CosmosConflictResponse response = conflict.read(new CosmosConflictRequestOptions()).block();
// response.
}
});
Node.js/JavaScript/TypeScript SDK
const container = client
.database(this.databaseName)
.container(this.lwwContainerName);
const { result: conflicts } = await container.conflicts.readAll().toArray();
Python
conflicts_iterator = iter(container.list_conflicts())
conflict = next(conflicts_iterator, None)
while conflict:
# Do something with conflict
conflict = next(conflicts_iterator, None)
下一步
深入了解下列 Azure Cosmos DB 概念: