Работа с надежными коллекциями

Платформа Service Fabric предлагает модель программирования с отслеживанием состояния, которая доступна разработчикам .NET через API-интерфейсы Reliable Collections. В частности, Service Fabric предоставляет такие классы, как Reliable Dictionaries (далее — надежные словари) и Reliable Queues (далее — надежные очереди). При использовании этих классов состояние секционируется (для масштабируемости), реплицируется (для доступности) и обрабатывается в ходе транзакции в секции (для семантики ACID). Рассмотрим типовое использование объекта надежного словаря, чтобы узнать, какие функции выполняет этот объект.

try
{
   // Create a new Transaction object for this partition
   using (ITransaction tx = base.StateManager.CreateTransaction())
   {
      // AddAsync takes key's write lock; if >4 secs, TimeoutException
      // Key & value put in temp dictionary (read your own writes),
      // serialized, redo/undo record is logged & sent to secondary replicas
      await m_dic.AddAsync(tx, key, value, cancellationToken);

      // CommitAsync sends Commit record to log & secondary replicas
      // After quorum responds, all locks released
      await tx.CommitAsync();
   }
   // If CommitAsync isn't called, Dispose sends Abort
   // record to log & all locks released
}
catch (TimeoutException)
{
   // choose how to handle the situation where you couldn't get a lock on the file because it was 
   // already in use. You might delay and retry the operation
   await Task.Delay(100);
}

Для всех операций с объектами надежных словарей (за исключением операции ClearAsync, которая является обратимой) требуется объект ITransaction. С этим объектом связываются все изменения, которые вы пытаетесь внести в любые объекты надежных словарей или очередей в одном разделе. Мы получаем объект ITransaction, вызывая метод CreateTransaction в StateManager раздела.

В приведенном выше коде объект ITransaction передается в метод AddAsync надежного словаря. Внутри методов словаря, которые принимают ключ, устанавливают блокировки на чтение или запись, связанные с ключом. Если этот метод изменяет значение ключа, то метод устанавливает блокировку на запись в ключе. Если этот метод только считывает данные из значения ключа, к ключу применяется блокировка чтения. Метод AddAsync изменяет значение ключа на новое, которое было передано, поэтому к ключу применяется блокировка на запись. Таким образом, если два (или более) потока пытаются одновременно добавить значения с тем же ключом, один поток получит блокировку записи, а остальные потоки заблокируются. По умолчанию методы блокируются до четырех секунд для получения блокировки. Через четыре секунды методы создают исключение TimeoutException. С помощью перегрузок методов можно явным образом указать значение времени ожидания.

Для этого нужно написать код, который будет реагировать на исключение TimeoutException, перехватывая его и повторяя всю операцию (как показано в приведенном выше коде). В этом простом коде мы вызываем Task.Delay через каждые 100 мс. Но на самом деле, возможно, будет лучше использовать экспоненциальную задержку.

После получения блокировки метод AddAsync добавляет ссылки на объект ключа и значения для внутренних временных словарей, связанных с объектом ITransaction. Это делается для обеспечения семантики, которая позволяет владельцу считывать собственные записи. Таким образом, если после вызова AddAsync вызвать метод TryGetValueAsync (с использованием того же объекта ITransaction), значение будет возвращено, даже если транзакция еще не зафиксирована.

Примечание.

Вызов TryGetValueAsync с новой транзакцией вернет ссылку на последнее зафиксированное значение. Не изменяйте эту ссылку напрямую, так как в этом случае обходится механизм сохранения и репликации изменений. Рекомендуется сделать значения доступными только для чтения, чтобы изменить значение ключа можно было только через API надежного словаря.

Затем метод AddAsync сериализует объекты ключа и значения в массивы байтов, а также добавляет эти массивы байтов в файл журнала на локальном узле. Наконец, метод AddAsync отправляет массивы байтов во все вторичные реплики, чтобы реплики имели ту же информацию о ключе и значении. Хотя информация о ключе и значении записана в файл журнала, эти сведения не считаются частью словаря, пока не будет зафиксирована связанная с ними транзакция.

В приведенном выше коде вызов CommitAsync фиксирует все операции транзакции. В частности, этот вызов добавляет в файл журнала на локальном узле сведения о фиксации, а также отправляет запись о фиксации во все вторичные реплики. После получения ответа от большинства реплик все изменения данных считаются постоянными, и все блокировки, связанные с ключами, которые обрабатывались с помощью объекта ITransaction, отменяются. Теперь другие потоки или транзакции могут обрабатывать те же ключи и их значения.

Если метод CommitAsync не вызывается (обычно из-за исключения), объект ITransaction ликвидируется. Когда незафиксированный объект ITransaction удаляется, Service Fabric добавляет в файл журнала на локальном узле сведения о прерывании операции. Во вторичные реплики ничего не отправляется. Затем отменяются все блокировки, связанные с ключами, которые обрабатывались с помощью транзакции.

Непостоянные надежные коллекции

В некоторых рабочих нагрузках, например в реплицированном кэше, допустима случайная потеря данных. Отказ от сохранения данных на диск поможет добиться снижения задержек и повышения пропускной способности при записи в надежные словари. Недостаток отказа от сохраняемости состоит в том, что при потере кворума произойдет полная потеря данных. Т. к. потеря кворума — достаточно редкое явление, повышение производительности может быть более важным фактором для таких нагрузок, чем вероятность потери данных.

В настоящее время поддержка непостоянных данных доступна только для надежных словарей и очередей, но не для надежных одновременных очередей. Чтобы правильно принять решение о непостоянных коллекциях, ознакомьтесь со списком предостережений.

Чтобы включить поддержку непостоянных коллекций для вашей службы, укажите для флага HasPersistedState в объявлении типа службы значение false, как в следующем примере:

<StatefulServiceType ServiceTypeName="MyServiceType" HasPersistedState="false" />

Примечание.

Существующие постоянно хранимые службы невозможно сделать непостоянными, и наоборот. При желании можно удалить существующую службу, а затем развернуть эту службу с обновленным флагом. Это означает, что при изменении флага HasPersistedState будут потеряны все данные.

Типичные проблемы и способы их решения

Теперь мы знаем, как надежные подключения работают на внутреннем уровне. Рассмотрим некоторые случаи их неправильного использования. Взгляните на этот код.

using (ITransaction tx = StateManager.CreateTransaction())
{
   // AddAsync serializes the name/user, logs the bytes,
   // & sends the bytes to the secondary replicas.
   await m_dic.AddAsync(tx, name, user);

   // The line below updates the property's value in memory only; the
   // new value is NOT serialized, logged, & sent to secondary replicas.
   user.LastLogin = DateTime.UtcNow;  // Corruption!

   await tx.CommitAsync();
}

Если вы работаете с обычным словарем .NET, вы можете добавить в словарь ключ и значение, а затем изменить значение свойства (например, LastLogin). Но этот код не будет правильно работать с надежным словарем. Как вы помните, вызов метода AddAsync сериализует объекты ключей и значений в массивы байтов, сохраняет эти массивы в локальный файл, а также отправляет их во вторичные реплики. Если впоследствии изменить свойство, то поменяется только значение этого свойства в памяти. Оно не повлияет на локальный файл и на данные, отправленные в реплики. При сбое процесса все данные в памяти будут утеряны. Если запускается новый процесс или основной репликой становится другая реплика, доступным остается только старое значение свойства.

Допустить описанную выше ошибку очень просто. А узнаете вы о ней, только когда произойдет сбой. Чтобы получить правильный код, достаточно поменять местами две строки:

using (ITransaction tx = StateManager.CreateTransaction())
{
   user.LastLogin = DateTime.UtcNow;  // Do this BEFORE calling AddAsync
   await m_dic.AddAsync(tx, name, user);
   await tx.CommitAsync();
}

Вот другой пример, демонстрирующий распространенную ошибку.

using (ITransaction tx = StateManager.CreateTransaction())
{
   // Use the user's name to look up their data
   ConditionalValue<User> user = await m_dic.TryGetValueAsync(tx, name);

   // The user exists in the dictionary, update one of their properties.
   if (user.HasValue)
   {
      // The line below updates the property's value in memory only; the
      // new value is NOT serialized, logged, & sent to secondary replicas.
      user.Value.LastLogin = DateTime.UtcNow; // Corruption!
      await tx.CommitAsync();
   }
}

Опять же, если используются обычные словари .NET, приведенный выше код работает нормально. Это обычная ситуация — разработчик использует ключ для поиска значений. Если это значение существует, разработчик меняет значение свойства. Но если используются API-интерфейсы Reliable Collections, в коде возникает та же проблема. Не изменяйте объект, если он отправлен в Reliable Collections.

Чтобы правильно обновить значение в API, нужно получить ссылку на существующее значение и считать объект, на который указывает эта ссылка, неизменяемым. Теперь создайте новый объект, который будет точной копией исходного объекта. Вы можете изменить состояние этого нового объекта и записать новый объект в коллекцию, чтобы он был сериализован в массив байтов, добавлен в локальный файл и отправлен в реплику. После фиксации изменений объекты в памяти, локальный файл и все реплики будут в одинаковом состоянии. Все хорошо!

Следующий пример кода показывает, как правильно обновить значение в коллекции Reliable Collections.

using (ITransaction tx = StateManager.CreateTransaction())
{
   // Use the user's name to look up their data
   ConditionalValue<User> currentUser = await m_dic.TryGetValueAsync(tx, name);

   // The user exists in the dictionary, update one of their properties.
   if (currentUser.HasValue)
   {
      // Create new user object with the same state as the current user object.
      // NOTE: This must be a deep copy; not a shallow copy. Specifically, only
      // immutable state can be shared by currentUser & updatedUser object graphs.
      User updatedUser = new User(currentUser);

      // In the new object, modify any properties you desire
      updatedUser.LastLogin = DateTime.UtcNow;

      // Update the key's value to the updateUser info
      await m_dic.SetValue(tx, name, updatedUser);
      await tx.CommitAsync();
   }
}

Определение неизменяемых типов данных для предотвращения ошибок разработчика

В идеале следовало бы, чтобы компилятор сообщал об ошибках, когда вы случайно пишете код, меняющий состояние объекта, который должен быть неизменяемым. Но компилятор C# не может это сделать. Таким образом, чтобы избежать потенциальных ошибок, мы настоятельно рекомендуем определять типы, используемые в Reliable Collections, как неизменяемые. Это означает, что вы будете использовать основные типы значений (например, числа [Int32, UInt64 и т. д.], DateTime, Guid, TimeSpan и т. п). Вы также можете использовать строку. Свойства коллекции лучше не использовать, так как их частая сериализация и десериализация может снизить производительность. Если же нужно использовать свойства коллекции, настоятельно рекомендуем применять библиотеку неизменяемых коллекций .NET (System.Collections.Immutable). Эту библиотеку можно скачать на сайте https://nuget.org. Мы также рекомендуем запечатывать классы и по возможности делать поля доступными только для чтения.

Тип UserInfo ниже демонстрирует, как определить неизменяемый тип, чтобы воспользоваться преимуществами упомянутых выше рекомендаций.

[DataContract]
// If you don't seal, you must ensure that any derived classes are also immutable
public sealed class UserInfo
{
   private static readonly IEnumerable<ItemId> NoBids = ImmutableList<ItemId>.Empty;

   public UserInfo(String email, IEnumerable<ItemId> itemsBidding = null) 
   {
      Email = email;
      ItemsBidding = (itemsBidding == null) ? NoBids : itemsBidding.ToImmutableList();
   }

   [OnDeserialized]
   private void OnDeserialized(StreamingContext context)
   {
      // Convert the deserialized collection to an immutable collection
      ItemsBidding = ItemsBidding.ToImmutableList();
   }

   [DataMember]
   public readonly String Email;

   // Ideally, this would be a readonly field but it can't be because OnDeserialized
   // has to set it. So instead, the getter is public and the setter is private.
   [DataMember]
   public IEnumerable<ItemId> ItemsBidding { get; private set; }

   // Since each UserInfo object is immutable, we add a new ItemId to the ItemsBidding
   // collection by creating a new immutable UserInfo object with the added ItemId.
   public UserInfo AddItemBidding(ItemId itemId)
   {
      return new UserInfo(Email, ((ImmutableList<ItemId>)ItemsBidding).Add(itemId));
   }
}

Тип ItemId также является неизменяемым, как показано ниже.

[DataContract]
public struct ItemId
{
   [DataMember] public readonly String Seller;
   [DataMember] public readonly String ItemName;
   public ItemId(String seller, String itemName)
   {
      Seller = seller;
      ItemName = itemName;
   }
}

Управление версиями схемы (обновления)

На внутреннем уровне надежные коллекции сериализуют объекты с помощью DataContractSerializer в .NET. Сериализованные объекты сохраняются на локальном диске основной реплики и передаются во вторичные реплики. По мере развития службы вы, вероятно, захотите изменить тип данных (схему), которая она использует. Управлять версиями данных необходимо предельно осторожно. Первое и самое главное: у вас должна быть возможность десериализовать старые данные. Это означает, что код десериализации должен быть неограниченно обратно совместимым: версия 333 кода службы должна поддерживать обработку данных, которые версия 1 кода службы разместила в надежной коллекции 5 лет назад.

Кроме того, код службы обновляет один домен обновления одновременно. Таким образом, во время обновления одновременно работают две различные версии кода службы. Проследите, чтобы новая версия кода службы не использовала новую схему, так как старые версии кода службы могут не работать с новой схемой. По возможности следует разрабатывать каждую новую версию службы так, чтобы она была совместима с версией 1. Это означает, что версия 1 кода службы должна игнорировать все элементы схемы, которые она явно не обрабатывает. При этом также должно обеспечиваться хранение всех данных, которые ей явно неизвестны. Эти данные должны записываться обратно при обновлении ключа или значения словаря.

Предупреждение

Хотя можно изменить схему ключа, необходимо убедиться, что алгоритмы равенства и сравнения ключа стабильны. Поведение надежных коллекций после изменения в любом из этих алгоритмов не определено и может привести к повреждению данных, потере и сбоям служб. В качестве ключа можно использовать строки .NET, но в качестве ключа нужно использовать саму строку, а не результат String.GetHashCode.

Кроме того, можно выполнить многоэтапное обновление.

  1. Обновление службы до новой версии, которая
    • имеет как исходную версию 1, так и новую версию контрактов данных версии 2, включенную в пакет кода службы;
    • при необходимости регистрирует пользовательские сериализаторы состояния версии 2;
    • выполняет все операции с исходной коллекцией V1 с помощью контрактов данных версии 1.
  2. Обновление службы до новой версии, которая
    • создает новую коллекцию V2;
    • выполняет каждую операцию добавления, обновления и удаления в первой версии 1, а затем коллекции версии 2 в одной транзакции;
    • выполняет операции чтения только в коллекции версии 1.
  3. Скопируйте все данные из коллекции V1 в коллекцию V2.
    • Это можно сделать в фоновом процессе с помощью версии службы, развернутой на шаге 2.
    • Повторное получение всех ключей из коллекции V1. Перечисление выполняется с помощью IsolationLevel.Snapshot по умолчанию, чтобы избежать блокировки коллекции в течение операции.
    • Для каждого ключа используйте отдельную транзакцию для
      • TryGetValueAsync из коллекции V1.
      • Если значение уже удалено из коллекции версии 1 с момента начала процесса копирования, ключ должен быть пропущен и не восстановлен в коллекции версии 2.
      • TryAddAsync — значение коллекции версии 2.
      • Если значение уже добавлено в коллекцию версии 2 с момента начала процесса копирования, ключ должен быть пропущен.
      • Транзакция должна быть зафиксирована TryAddAsync только в том случае, если возвращается true.
      • API доступа к значениям по умолчанию используют значение IsolationLevel.ReadRepeatable и полагаются на блокировку, чтобы гарантировать, что значения не изменяются другим вызывающим оператором, пока транзакция не будет зафиксирована или прервана.
  4. Обновление службы до новой версии, которая
    • выполняет операции чтения только в коллекции версии 2;
    • по-прежнему выполняет каждую операцию добавления, обновления и удаления в первой версии 1, а затем коллекций версии 2, чтобы сохранить возможность отката до версии 1.
  5. Комплексно протестируйте службу и убедитесь, что она работает должным образом.
    • Если вы пропустили операцию доступа к значению, которая не была обновлена для работы с коллекцией версии 1 и версии 2, возможно, отсутствуют данные.
    • Если отсутствуют данные отката к шагу 1, удалите коллекцию версии 2 и повторите процесс.
  6. Обновление службы до новой версии, которая
    • выполняет все операции только в коллекции версии 2;
    • Возврат к версии 1 больше не возможен с откатом службы и потребует переката с обратными шагами 2-4.
  7. Обновление службы новой версии, которая
  8. Дождитесь усечения журнала.
    • По умолчанию это происходит каждые 50 МБ записей (добавляет, обновляет и удаляет) в надежные коллекции.
  9. Обновление службы до новой версии, которая
    • Больше не имеет контрактов данных версии 1, включенных в пакет кода службы.

Следующие шаги

Дополнительные сведения о создании контрактов данных с прямой совместимостью см. в статье Контракты данных, совместимые с любыми будущими изменениями.

Рекомендации по управлению версиями контрактов данных см. в статье Управление версиями контракта данных.

Указания по реализации контрактов данных, которые не зависят от версий, см. в статье Обратные вызовы сериализации, независимые от версий.

Указания по предоставлению структуры данных, которые могут взаимодействовать в нескольких версиях, см. в статье Интерфейс IExtensibleDataObject.

Сведения о настройке надежных коллекций см. в разделе "Конфигурация репликатора"