Управление политиками разрешения конфликтов в Azure Cosmos DB

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

Если операции записи выполняются в нескольких регионах и несколько клиентов обращаются к одному и тому же элементу, может возникнуть конфликт. Такую проблему можно решить с помощью разных политик устранения конфликтов. В этой статье объясняется, как управлять политиками устранения конфликтов.

Совет

Политика разрешения конфликтов может указываться только во время создания контейнера и не может быть изменена после создания контейнера.

Создание политики разрешения конфликтов, реализующей подход "Сохраняются изменения, внесенные последними"

В этих примерах показано, как настроить контейнер с политикой разрешения конфликтов, реализующей подход "Сохраняются изменения, внесенные последними". В пути по умолчанию для политики "Сохраняются изменения, внесенные последними" указывается поле метки времени или свойство _ts. Для API для NoSQL это также может быть задан путь, определяемый пользователем, с числовым типом. При возникновении конфликта сохраняется самое большое значение. Если путь не задан или является недопустимым, по умолчанию используется значение _ts. Сведения о конфликтах, устраненных с помощью этой политики, не будут отображаться в веб-канале конфликтов. Эту политику поддерживают все API.

Пакет SDK для .NET

DocumentCollection lwwCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.lwwCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.LastWriterWins,
          ConflictResolutionPath = "/myCustomId",
      },
  });

Пакет SDK для Java версии 4

Асинхронный API пакета SDK для Java версии 4 (Maven com.azure::azure-cosmos)


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

Пакеты средств разработки для Java версии 2

Пакет SDK Async Java версии 2 (Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

Пакет SDK для Node.js, JavaScript и TypeScript

const database = client.database(this.databaseName);
const { container: lwwContainer } = await database.containers.createIfNotExists(
  {
    id: this.lwwContainerName,
    conflictResolutionPolicy: {
      mode: "LastWriterWins",
      conflictResolutionPath: "/myCustomId"
    }
  }
);

Пакет SDK для Python

database = client.get_database_client(database=database_id)
lww_conflict_resolution_policy = {'mode': 'LastWriterWins', 'conflictResolutionPath': '/regionId'}
lww_container = database.create_container(id=lww_container_id, partition_key=PartitionKey(path="/id"), 
    conflict_resolution_policy=lww_conflict_resolution_policy)

Создание пользовательской политики устранения конфликтов с помощью хранимой процедуры

В этих примерах показано, как настроить контейнер с пользовательской политикой разрешения конфликтов. Эта политика использует логику в хранимой процедуре для устранения конфликта. Если хранимая процедура назначена для разрешения конфликтов, конфликты не будут отображаться в канале конфликтов, если в указанной хранимой процедуре не возникнет ошибка.

После создания политики с использованием контейнера необходимо создать хранимую процедуру. В примере пакета SDK для .NET ниже показан пример этого рабочего процесса. Эта политика поддерживается только в API для NoSQL.

Пример пользовательской хранимой процедуры для устранения конфликтов

Пользовательские хранимые процедуры для устранения конфликтов нужно реализовать с помощью сигнатуры функции, представленной ниже. Имя функции не обязательно должно совпадать с именем, используемым при регистрации хранимой процедуры с использованием контейнера, но такой подход упрощает именование. Ниже описаны параметры, которые нужно реализовать для этой хранимой процедуры.

  • incomingItem: элемент вставляется или обновляется в фиксации, в которой возникают конфликты. Имеет значение NULL для операций удаления.
  • existingItem: элемент, зафиксированный в текущий момент. Значение NULL не устанавливается при обновлении и устанавливается при вставке или удалении.
  • isTombstone: логическое значение, указывающее, конфликтует ли incomingItem с ранее удаленным элементом. При значении true existingItem также имеет значение NULL.
  • conflictingItems: массив фиксированной версии всех элементов в контейнере, которые конфликтуют с incomingItem по идентификатору или любым другим свойствам уникального индекса.

Важно!

Так же, как и любая хранимая процедура, пользовательская процедура разрешения конфликтов может обращаться к данным с одним ключом секции. Процедура позволяет выполнять любые операции вставки, обновления или удаления для устранения конфликтов.

В этом примере представлена хранимая процедура, которая позволяет устранять конфликты, выбирая наименьшее значение из пути /myCustomId.

function resolver(incomingItem, existingItem, isTombstone, conflictingItems) {
  var collection = getContext().getCollection();

  if (!incomingItem) {
      if (existingItem) {

          collection.deleteDocument(existingItem._self, {}, function (err, responseOptions) {
              if (err) throw err;
          });
      }
  } else if (isTombstone) {
      // delete always wins.
  } else {
      if (existingItem) {
          if (incomingItem.myCustomId > existingItem.myCustomId) {
              return; // existing item wins
          }
      }

      var i;
      for (i = 0; i < conflictingItems.length; i++) {
          if (incomingItem.myCustomId > conflictingItems[i].myCustomId) {
              return; // existing conflict item wins
          }
      }

      // incoming item wins - clear conflicts and replace existing with incoming.
      tryDelete(conflictingItems, incomingItem, existingItem);
  }

  function tryDelete(documents, incoming, existing) {
      if (documents.length > 0) {
          collection.deleteDocument(documents[0]._self, {}, function (err, responseOptions) {
              if (err) throw err;

              documents.shift();
              tryDelete(documents, incoming, existing);
          });
      } else if (existing) {
          collection.replaceDocument(existing._self, incoming,
              function (err, documentCreated) {
                  if (err) throw err;
              });
      } else {
          collection.createDocument(collection.getSelfLink(), incoming,
              function (err, documentCreated) {
                  if (err) throw err;
              });
      }
  }
}

Пакет SDK для .NET

DocumentCollection udpCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.udpCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.Custom,
          ConflictResolutionProcedure = string.Format("dbs/{0}/colls/{1}/sprocs/{2}", this.databaseName, this.udpCollectionName, "resolver"),
      },
  });

//Create the stored procedure
await clients[0].CreateStoredProcedureAsync(
UriFactory.CreateStoredProcedureUri(this.databaseName, this.udpCollectionName, "resolver"), new StoredProcedure
{
    Id = "resolver",
    Body = File.ReadAllText(@"resolver.js")
});

Пакет SDK для Java версии 4

Асинхронный API пакета SDK для Java версии 4 (Maven com.azure::azure-cosmos)


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

Пакеты средств разработки для Java версии 2

Пакет SDK Async Java версии 2 (Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

После создания контейнера создайте хранимую процедуру resolver.

Пакет SDK для Node.js, JavaScript и TypeScript

const database = client.database(this.databaseName);
const { container: udpContainer } = await database.containers.createIfNotExists(
  {
    id: this.udpContainerName,
    conflictResolutionPolicy: {
      mode: "Custom",
      conflictResolutionProcedure: `dbs/${this.databaseName}/colls/${
        this.udpContainerName
      }/sprocs/resolver`
    }
  }
);

После создания контейнера создайте хранимую процедуру resolver.

Пакет SDK для Python

database = client.get_database_client(database=database_id)
udp_custom_resolution_policy = {'mode': 'Custom' }
udp_container = database.create_container(id=udp_container_id, partition_key=PartitionKey(path="/id"),
    conflict_resolution_policy=udp_custom_resolution_policy)

После создания контейнера создайте хранимую процедуру resolver.

Создание пользовательской политики разрешения конфликтов

В этих примерах показано, как настроить контейнер с пользовательской политикой разрешения конфликтов. С этой реализацией каждый конфликт будет отображаться в веб-канале конфликтов. Это до вас, чтобы обрабатывать конфликты по отдельности из канала конфликтов.

Пакет SDK для .NET

DocumentCollection manualCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.manualCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.Custom,
      },
  });

Пакет SDK для Java версии 4

Асинхронный API пакета SDK для Java версии 4 (Maven com.azure::azure-cosmos)


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

Пакеты средств разработки для Java версии 2

Пакет SDK Async Java версии 2 (Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

Пакет SDK для Node.js, JavaScript и TypeScript

const database = client.database(this.databaseName);
const {
  container: manualContainer
} = await database.containers.createIfNotExists({
  id: this.manualContainerName,
  conflictResolutionPolicy: {
    mode: "Custom"
  }
});

Пакет SDK для Python

database = client.get_database_client(database=database_id)
manual_resolution_policy = {'mode': 'Custom'}
manual_container = database.create_container(id=manual_container_id, partition_key=PartitionKey(path="/id"), 
    conflict_resolution_policy=manual_resolution_policy)

Чтение из веб-канала конфликтов

В этих примерах показано, как выполнять чтение из веб-канала конфликтов контейнера. Конфликты могут отображаться в веб-канале конфликтов только по нескольким причинам:

  • Конфликт не был разрешен автоматически
  • Конфликт вызвал ошибку с указанной хранимой процедурой
  • Политика разрешения конфликтов настроена на пользовательскую и не назначает хранимую процедуру для обработки конфликтов

Пакет SDK для .NET

FeedResponse<Conflict> conflicts = await delClient.ReadConflictFeedAsync(this.collectionUri);

Пакеты SDK для Java

Пакет SDK для Java версии 4 (Maven com.azure::azure-cosmos)

int requestPageSize = 3;
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();

CosmosPagedFlux<CosmosConflictProperties> conflictReadFeedFlux = container.readAllConflicts(options);

conflictReadFeedFlux.byPage(requestPageSize).toIterable().forEach(page -> {

    int expectedNumberOfConflicts = 0;
    int numberOfResults = 0;
    Iterator<CosmosConflictProperties> pageIt = page.getElements().iterator();

    while (pageIt.hasNext()) {
        CosmosConflictProperties conflictProperties = pageIt.next();

        // Read the conflict and committed item
        CosmosAsyncConflict conflict = container.getConflict(conflictProperties.getId());
        CosmosConflictResponse response = conflict.read(new CosmosConflictRequestOptions()).block();

        // response.
    }
});

Пакет SDK для Node.js, JavaScript и TypeScript

const container = client
  .database(this.databaseName)
  .container(this.lwwContainerName);

const { result: conflicts } = await container.conflicts.readAll().toArray();

Python

conflicts_iterator = iter(container.list_conflicts())
conflict = next(conflicts_iterator, None)
while conflict:
    # Do something with conflict
    conflict = next(conflicts_iterator, None)

Следующие шаги

Узнайте больше о следующих понятиях Azure Cosmos DB.