Транзакций: Orleans

Orleans поддерживает распределенные транзакции ACID для постоянного состояния зерна. Транзакции реализуются с помощью Microsoft.Orleans. Транзакции пакета NuGet. Исходный код для примера приложения в этой статье состоит из четырех проектов:

  • Абстракции: библиотека классов, содержащая интерфейсы зерна и общие классы.
  • Зерны: библиотека классов, содержащая реализации зерна.
  • Сервер: консольное приложение, которое использует библиотеки классов абстракции и зерна и выступает в качестве Orleans silo.
  • Клиент: консольное приложение, использующее библиотеку классов абстракции, представляющую Orleans клиента.

Настройка

Orleans транзакции являются согласием. Для использования транзакций необходимо настроить хранилище и клиент. Если они не настроены, все вызовы транзакционных методов в реализации зерна получат OrleansTransactionsDisabledException. Чтобы включить транзакции в silo, вызовите SiloBuilderExtensions.UseTransactions построитель узлов silo:

var builder = Host.CreateDefaultBuilder(args)
    UseOrleans((context, siloBuilder) =>
    {
        siloBuilder.UseTransactions();
    });

Аналогичным образом, чтобы включить транзакции на клиенте, вызовите ClientBuilderExtensions.UseTransactions построитель узлов клиента:

var builder = Host.CreateDefaultBuilder(args)
    UseOrleansClient((context, clientBuilder) =>
    {
        clientBuilder.UseTransactions();
    });

Хранилище состояний транзакций

Чтобы использовать транзакции, необходимо настроить хранилище данных. Для поддержки различных хранилищ данных с транзакциями используется абстракция ITransactionalStateStorage<TState> хранилища. Эта абстракция зависит от потребностей транзакций, в отличие от универсального хранилища зерна (IGrainStorage). Чтобы использовать хранилище для конкретных транзакций, настройте silo с помощью любой ITransactionalStateStorageреализации, например Azure (AddAzureTableTransactionalStateStorage).

Например, рассмотрим следующую конфигурацию построителя узлов:

await Host.CreateDefaultBuilder(args)
    .UseOrleans((_, silo) =>
    {
        silo.UseLocalhostClustering();

        if (Environment.GetEnvironmentVariable(
                "ORLEANS_STORAGE_CONNECTION_STRING") is { } connectionString)
        {
            silo.AddAzureTableTransactionalStateStorage(
                "TransactionStore", 
                options => options.ConfigureTableServiceClient(connectionString));
        }
        else
        {
            silo.AddMemoryGrainStorageAsDefault();
        }

        silo.UseTransactions();
    })
    .RunConsoleAsync();

В целях разработки, если хранилище, зависят от транзакций, недоступно для нужного хранилища данных, можно использовать реализацию IGrainStorage . Для любого состояния транзакций, которое не настроено в хранилище, транзакции попытаются выполнить отработку отказа в хранилище зерна с помощью моста. Доступ к состоянию транзакции через мост к хранилищу зерна является менее эффективным и может не поддерживаться в будущем. Следовательно, рекомендация заключается в том, чтобы использовать это только для целей разработки.

Интерфейсы зерн

Для поддержки транзакций методы транзакций в интерфейсе зерна должны быть помечены как часть транзакции с помощью .TransactionAttribute Атрибуту необходимо указать, как выполняется вызов зерна в транзакционной среде, как описано в следующих TransactionOption значениях:

  • TransactionOption.Create: вызов транзакционный и всегда создает новый контекст транзакции (он запускает новую транзакцию), даже если он вызывается в существующем контексте транзакции.
  • TransactionOption.Join: вызов транзакционный, но может вызываться только в контексте существующей транзакции.
  • TransactionOption.CreateOrJoin: вызов транзакционный. Если он вызывается в контексте транзакции, он будет использовать этот контекст, в противном случае он создаст новый контекст.
  • TransactionOption.Suppress: вызов не является транзакционной, но может вызываться из транзакции. При вызове в контексте транзакции контекст не будет передан вызову.
  • TransactionOption.Supported: вызов не является транзакциональным, но поддерживает транзакции. При вызове в контексте транзакции контекст будет передан вызову.
  • TransactionOption.NotAllowed: вызов не является транзакциональным и не может вызываться из транзакции. При вызове в контексте транзакции он вызовет NotSupportedExceptionисключение .

Вызовы можно пометить как TransactionOption.Create, то есть вызов всегда будет запускать свою транзакцию. Например, Transfer операция в приведенном ниже фрагменте ATM всегда запускает новую транзакцию, которая включает две указанные учетные записи.

namespace TransactionalExample.Abstractions;

public interface IAtmGrain : IGrainWithIntegerKey
{
    [Transaction(TransactionOption.Create)]
    Task Transfer(string fromId, string toId, decimal amountToTransfer);
}

Транзакционные операции Withdraw и Deposit зерна учетной записи помечены TransactionOption.Join, указывая, что они могут вызываться только в контексте существующей транзакции, что было бы в случае, если они были вызваны во время IAtmGrain.Transfer. Вызов GetBalance помечается CreateOrJoin таким образом, чтобы его можно было вызывать из существующей транзакции, например через IAtmGrain.Transferили самостоятельно.

namespace TransactionalExample.Abstractions;

public interface IAccountGrain : IGrainWithStringKey
{
    [Transaction(TransactionOption.Join)]
    Task Withdraw(decimal amount);

    [Transaction(TransactionOption.Join)]
    Task Deposit(decimal amount);

    [Transaction(TransactionOption.CreateOrJoin)]
    Task<decimal> GetBalance();
}

Важные замечания

Не OnActivateAsync удалось пометить как транзакционный, так как для такого вызова требуется правильная настройка перед вызовом. Он существует только для API приложения для зерна. Это означает, что попытка считывания состояния транзакций в рамках этих методов вызовет исключение во время выполнения.

Реализации зерновых элементов

Реализация зерна должна использовать ITransactionalState<TState> аспект для управления состоянием зерна с помощью транзакций ACID.

public interface ITransactionalState<TState>
    where TState : class, new()
{
    Task<TResult> PerformRead<TResult>(
        Func<TState, TResult> readFunction);

    Task<TResult> PerformUpdate<TResult>(
        Func<TState, TResult> updateFunction);
}

Все доступ на чтение или запись к сохраненному состоянию должны выполняться с помощью синхронных функций, передаваемых аспекту состояния транзакций. Это позволяет системе транзакций выполнять или отменять эти операции транзакционно. Для использования транзакционного состояния в зерне необходимо определить сериализуемый класс состояния, который необходимо сохранить, и объявить состояние транзакций в конструкторе зерна с помощью a TransactionalStateAttribute. Последний объявляет имя состояния и, при необходимости, какое хранилище состояний транзакций следует использовать. Дополнительные сведения см. в разделе "Настройка".

[AttributeUsage(AttributeTargets.Parameter)]
public class TransactionalStateAttribute : Attribute
{
    public TransactionalStateAttribute(string stateName, string storageName = null)
    {
        // ...
    }
}

Например, Balance объект состояния определяется следующим образом:

namespace TransactionalExample.Abstractions;

[GenerateSerializer]
public record class Balance
{
    [Id(0)]
    public decimal Value { get; set; } = 1_000;
}

Предыдущий объект состояния:

  • Украшена GenerateSerializerAttribute инструкцией генератора Orleans кода для создания сериализатора.
  • Value Имеет свойство, которое украшено IdAttribute уникальной идентификацией элемента.

Затем Balance объект состояния используется в AccountGrain реализации следующим образом:

namespace TransactionalExample.Grains;

[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
    private readonly ITransactionalState<Balance> _balance;

    public AccountGrain(
        [TransactionalState(nameof(balance))]
        ITransactionalState<Balance> balance) =>
        _balance = balance ?? throw new ArgumentNullException(nameof(balance));

    public Task Deposit(decimal amount) =>
        _balance.PerformUpdate(
            balance => balance.Value += amount);

    public Task Withdraw(decimal amount) =>
        _balance.PerformUpdate(balance =>
        {
            if (balance.Value < amount)
            {
                throw new InvalidOperationException(
                    $"Withdrawing {amount} credits from account " +
                    $"\"{this.GetPrimaryKeyString()}\" would overdraw it." +
                    $" This account has {balance.Value} credits.");
            }

            balance.Value -= amount;
        });

    public Task<decimal> GetBalance() =>
        _balance.PerformRead(balance => balance.Value);
}

Внимание

Для правильного передачи контекста транзакции вызову зерна необходимо пометить с помощью маркера ReentrantAttribute транзакций.

В предыдущем примере используется для объявления о том, TransactionalStateAttribute что balance параметр конструктора должен быть связан с именем транзакционного "balance"состояния. В этом объявлении Orleans будет внедрен ITransactionalState<TState> экземпляр с состоянием, загруженным из хранилища транзакционных состояний с именем "TransactionStore". Состояние можно изменить с помощью PerformUpdate или считывать с помощью PerformRead. Инфраструктура транзакций гарантирует, что любые такие изменения, выполненные в рамках транзакции, даже между несколькими зернами, распределенными по Orleans кластеру, будут зафиксированы или все будут отменены после завершения вызова зерна, созданного транзакцией (IAtmGrain.Transfer в предыдущем примере).

Вызов методов транзакций из клиента

Рекомендуемый способ вызова метода детализации транзакций — использовать ITransactionClient. Автоматически ITransactionClient регистрируется в поставщике услуг внедрения зависимостей при настройке Orleans клиента. Используется ITransactionClient для создания контекста транзакции и вызова методов транзакций в этом контексте. В следующем примере показано, как использовать ITransactionClient методы транзакционного зерна.

using IHost host = Host.CreateDefaultBuilder(args)
    .UseOrleansClient((_, client) =>
    {
        client.UseLocalhostClustering()
            .UseTransactions();
    })
    .Build();

await host.StartAsync();

var client = host.Services.GetRequiredService<IClusterClient>();
var transactionClient= host.Services.GetRequiredService<ITransactionClient>();

var accountNames = new[] { "Xaawo", "Pasqualino", "Derick", "Ida", "Stacy", "Xiao" };
var random = Random.Shared;

while (!Console.KeyAvailable)
{
    // Choose some random accounts to exchange money
    var fromIndex = random.Next(accountNames.Length);
    var toIndex = random.Next(accountNames.Length);
    while (toIndex == fromIndex)
    {
        // Avoid transferring to/from the same account, since it would be meaningless
        toIndex = (toIndex + 1) % accountNames.Length;
    }

    var fromKey = accountNames[fromIndex];
    var toKey = accountNames[toIndex];
    var fromAccount = client.GetGrain<IAccountGrain>(fromKey);
    var toAccount = client.GetGrain<IAccountGrain>(toKey);

    // Perform the transfer and query the results
    try
    {
        var transferAmount = random.Next(200);

        await transactionClient.RunTransaction(
            TransactionOption.Create, 
            async () =>
            {
                await fromAccount.Withdraw(transferAmount);
                await toAccount.Deposit(transferAmount);
            });

        var fromBalance = await fromAccount.GetBalance();
        var toBalance = await toAccount.GetBalance();

        Console.WriteLine(
            $"We transferred {transferAmount} credits from {fromKey} to " +
            $"{toKey}.\n{fromKey} balance: {fromBalance}\n{toKey} balance: {toBalance}\n");
    }
    catch (Exception exception)
    {
        Console.WriteLine(
            $"Error transferring credits from " +
            $"{fromKey} to {toKey}: {exception.Message}");

        if (exception.InnerException is { } inner)
        {
            Console.WriteLine($"\tInnerException: {inner.Message}\n");
        }

        Console.WriteLine();
    }

    // Sleep and run again
    await Task.Delay(TimeSpan.FromMilliseconds(200));
}

В приведенном выше коде клиента:

  • Параметр IHostBuilder настроен с UseOrleansClientпомощью .
    • Используется IClientBuilder localhost кластеризация и транзакции.
  • Интерфейсы IClusterClient извлекаются ITransactionClient из поставщика услуг.
  • Их from ссылки назначаются IAccountGrain переменнымto.
  • Используется ITransactionClient для создания транзакции, вызова:
    • Withdraw в справочнике по зерню учетной from записи.
    • Deposit в справочнике по зерню учетной to записи.

Транзакции всегда фиксируются, если в указанном или противоречивом transactionOption виде не возникает transactionDelegate исключение. Хотя рекомендуемый способ вызова методов транзакционного зерна — использовать ITransactionClientметоды транзакционного зерна, можно также вызывать методы транзакционного зерна непосредственно из другого зерна.

Вызов методов транзакции из другого зерна

Транзакционные методы в интерфейсе зерна называются как любой другой метод зерна. В качестве альтернативного подхода с помощью ITransactionClientAtmGrain реализации ниже вызывается Transfer метод (который является транзакциональным) в интерфейсеIAccountGrain.

Рассмотрим реализацию AtmGrain , которая разрешает две ссылки на учетные записи и выполняет соответствующие вызовы Withdraw и Deposit:

namespace TransactionalExample.Grains;

[StatelessWorker]
public class AtmGrain : Grain, IAtmGrain
{
    public Task Transfer(
        string fromId,
        string toId,
        decimal amount) =>
        Task.WhenAll(
            GrainFactory.GetGrain<IAccountGrain>(fromId).Withdraw(amount),
            GrainFactory.GetGrain<IAccountGrain>(toId).Deposit(amount));
}

Код клиентского приложения может вызываться AtmGrain.Transfer транзакционно следующим образом:

IAtmGrain atmOne = client.GetGrain<IAtmGrain>(0);

Guid from = Guid.NewGuid();
Guid to = Guid.NewGuid();

await atmOne.Transfer(from, to, 100);

uint fromBalance = await client.GetGrain<IAccountGrain>(from).GetBalance();
uint toBalance = await client.GetGrain<IAccountGrain>(to).GetBalance();

В предыдущих вызовах IAtmGrain используется для передачи 100 единиц валюты из одной учетной записи в другую. После завершения передачи обе учетные записи запрашиваются, чтобы получить текущий баланс. Денежный перевод, а также оба запроса учетной записи выполняются в виде транзакций ACID.

Как показано в предыдущем примере, транзакции могут возвращать значения в виде Taskдругих вызовов зерна. Но при сбое вызова они не будут вызывать исключения приложений OrleansTransactionException , а скорее или TimeoutException. Если приложение создает исключение во время транзакции и это исключение приводит к сбою транзакции (в отличие от сбоя из-за других системных сбоев), исключение приложения будет внутренним исключением OrleansTransactionException.

Если исключение транзакции создается из типа OrleansTransactionAbortedException, транзакция завершилась ошибкой и может быть извлечена. Любое другое исключение, вызываемое, указывает, что транзакция завершается неизвестным состоянием. Так как транзакции являются распределенными операциями, транзакция в неизвестном состоянии может завершиться успешной, неудачной или по-прежнему выполняется. По этой причине рекомендуется разрешить период времени ожидания вызова (SiloMessagingOptions.SystemResponseTimeout) пройти, чтобы избежать каскадных прерываний, прежде чем проверить состояние или повторить операцию.