Orleans トランザクション

Orleans では、永続的なグレイン状態に対する分散 ACID トランザクションがサポートされます。 トランザクションは Microsoft.Orleans.Transactions NuGet パッケージを使用して実装されます 。 この記事のサンプル アプリのソース コードは、次の 4 つのプロジェクトで構成されています。

  • 抽象化: グレイン インターフェイスと共有クラスを含むクラス ライブラリ。
  • グレイン: グレインの実装を含むクラス ライブラリ。
  • サーバー: 抽象化とグレイン クラス ライブラリを使用し、Orleans サイロとして機能するコンソール アプリ。
  • クライアント: Orleans クライアントを表す抽象化クラス ライブラリを使用するコンソール アプリ。

セットアップ

Orleans トランザクションはオプトインです。 サイロとクライアントの両方がトランザクションを使用するように構成されている必要があります。 そのように構成されていない場合、グレイン実装でトランザクション メソッドを呼び出すと、OrleansTransactionsDisabledException が返されます。 サイロでのトランザクションを有効にするには、サイロのホスト ビルダーに対し SiloBuilderExtensions.UseTransactions を呼び出します。

var builder = Host.CreateDefaultBuilder(args)
    UseOrleans((context, siloBuilder) =>
    {
        siloBuilder.UseTransactions();
    });

同様に、クライアントでのトランザクションを有効にするには、クライアントのホスト ビルダーに対し ClientBuilderExtensions.UseTransactions を呼び出します。

var builder = Host.CreateDefaultBuilder(args)
    UseOrleansClient((context, clientBuilder) =>
    {
        clientBuilder.UseTransactions();
    });

トランザクション状態のストレージ

トランザクションを使用するには、データ ストアを構成する必要があります。 トランザクションでさまざまなデータ ストアをサポートするために、ストレージの抽象化 ITransactionalStateStorage<TState> が使用されます。 この抽象化はトランザクションのニーズに固有のものであり、汎用グレイン ストレージ (IGrainStorage) とは異なります。 トランザクション固有のストレージを使用するには、Azure (AddAzureTableTransactionalStateStorage) など、任意の ITransactionalStateStorage の実装を使用してサイロを構成します。

たとえば、次のようなホスト ビルダーの構成があるとします。

await Host.CreateDefaultBuilder(args)
    .UseOrleans((_, silo) =>
    {
        silo.UseLocalhostClustering();

        if (Environment.GetEnvironmentVariable(
                "ORLEANS_STORAGE_CONNECTION_STRING") is { } connectionString)
        {
            silo.AddAzureTableTransactionalStateStorage(
                "TransactionStore", 
                options => options.ConfigureTableServiceClient(connectionString));
        }
        else
        {
            silo.AddMemoryGrainStorageAsDefault();
        }

        silo.UseTransactions();
    })
    .RunConsoleAsync();

開発目的では、必要なデータストアでトランザクション固有のストレージを使用できない場合は、代わりに IGrainStorage 実装を使用できます。 ストアが構成されていないトランザクション状態に対しては、トランザクションはブリッジを使用してグレイン ストレージにフェールオーバーしようとします。 ブリッジ経由でトランザクション状態のグレイン ストレージへアクセスすることは効率が悪く、今後サポートされなくなる可能性があります。 そのため、これは開発目的でのみ使用することをお勧めします。

グレイン インターフェイス

グレインがトランザクションをサポートするには、TransactionAttribute を使用して、グレイン インターフェイスのトランザクション メソッドをトランザクションの一部としてマークする必要があります。 この属性は、トランザクション環境でのグレイン呼び出しの動作を、以下の TransactionOption 値を使って詳しく示す必要があります。

  • TransactionOption.Create: 呼び出しはトランザクションであり、既存のトランザクション コンテキスト内で呼び出された場合でも、常に新しいトランザクション コンテキストを作成します (新しいトランザクションが開始されます)。
  • TransactionOption.Join: 呼び出しはトランザクションですが、既存のトランザクションのコンテキスト内でのみ呼び出すことができます。
  • TransactionOption.CreateOrJoin: 呼び出しはトランザクションです。 トランザクションのコンテキスト内で呼び出されると、そのコンテキストが使用され、そうでない場合は新しいコンテキストが作成されます。
  • TransactionOption.Suppress: 呼び出しはトランザクションではありませんが、トランザクション内から呼び出すことができます。 トランザクションのコンテキスト内で呼び出された場合、コンテキストは呼び出しに渡されません。
  • TransactionOption.Supported: 呼び出しはトランザクションではありませんが、トランザクションをサポートします。 トランザクションのコンテキスト内で呼び出された場合、コンテキストは呼び出しに渡されます。
  • TransactionOption.NotAllowed: 呼び出しはトランザクションではありません。また、トランザクション内から呼び出すことはできません。 トランザクションのコンテキスト内で呼び出されると、NotSupportedException がスローされます。

呼び出しは TransactionOption.Create としてマークできます。つまり、呼び出しは常にトランザクションを開始します。 たとえば、以下の ATM グレインの Transfer 操作では、参照されている 2 つのアカウントに関わる新しいトランザクションが常に開始されます。

namespace TransactionalExample.Abstractions;

public interface IAtmGrain : IGrainWithIntegerKey
{
    [Transaction(TransactionOption.Create)]
    Task Transfer(string fromId, string toId, decimal amountToTransfer);
}

アカウント グレインに対するトランザクション操作 WithdrawDeposit は、TransactionOption.Join とマークされています。これは、既存のトランザクションのコンテキスト内でのみ呼び出すことができることを示します。IAtmGrain.Transfer の間に呼び出された場合は、これに該当ます。 GetBalance 呼び出しは、CreateOrJoin とマークされています。つまり、既存のトランザクション内から 呼び出すことができます (例: 経由 IAtmGrain.Transfer)。または、それ自体で呼び出すことができます。

namespace TransactionalExample.Abstractions;

public interface IAccountGrain : IGrainWithStringKey
{
    [Transaction(TransactionOption.Join)]
    Task Withdraw(decimal amount);

    [Transaction(TransactionOption.Join)]
    Task Deposit(decimal amount);

    [Transaction(TransactionOption.CreateOrJoin)]
    Task<decimal> GetBalance();
}

重要な考慮事項

OnActivateAsync はトランザクションとしてマークできません。このような呼び出しでは、呼び出しの前に適切なセットアップが必要だからです。 これは、グレイン アプリケーション API に対してのみ存在します。 つまり、これらのメソッドの一部としてトランザクション状態を読み取ろうとすると、ランタイムで例外がスローされます。

グレインの実装

グレインの実装では、ACID トランザクション経由でグレインの状態を管理するために ITransactionalState<TState> ファセットを使用する必要があります。

public interface ITransactionalState<TState>
    where TState : class, new()
{
    Task<TResult> PerformRead<TResult>(
        Func<TState, TResult> readFunction);

    Task<TResult> PerformUpdate<TResult>(
        Func<TState, TResult> updateFunction);
}

持続的な状態に対するすべての読み取りまたは書き込みアクセスは、トランザクション状態ファセットに渡される同期関数を経由して実行する必要があります。 これにより、トランザクション システムはトランザクションでこれらの操作を実行または取り消すことができます。 グレイン内でトランザクション状態を使用するには、シリアル化可能な状態クラスを持続的なものと定義し、グレインのコンストラクターのトランザクション状態を TransactionalStateAttribute を使って宣言します。 後者は、状態名を宣言し、必要に応じて、どのトランザクション状態ストレージを使用するかを宣言します。 詳しくは、「セットアップ」をご覧ください。

[AttributeUsage(AttributeTargets.Parameter)]
public class TransactionalStateAttribute : Attribute
{
    public TransactionalStateAttribute(string stateName, string storageName = null)
    {
        // ...
    }
}

たとえば、 Balance 状態オブジェクトは次のように定義されます。

namespace TransactionalExample.Abstractions;

[GenerateSerializer]
public record class Balance
{
    [Id(0)]
    public decimal Value { get; set; } = 1_000;
}

上記の状態オブジェクトでは以下を満たします。

  • Orleans コード ジェネレーターにシリアライザーを生成するように指示するために GenerateSerializerAttribute で修飾されています。
  • メンバーを一意に識別するために IdAttribute で修飾された Value プロパティを持っています。

そして、 Balance 状態オブジェクトは AccountGrain 実装で次のように使用されます。

namespace TransactionalExample.Grains;

[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
    private readonly ITransactionalState<Balance> _balance;

    public AccountGrain(
        [TransactionalState(nameof(balance))]
        ITransactionalState<Balance> balance) =>
        _balance = balance ?? throw new ArgumentNullException(nameof(balance));

    public Task Deposit(decimal amount) =>
        _balance.PerformUpdate(
            balance => balance.Value += amount);

    public Task Withdraw(decimal amount) =>
        _balance.PerformUpdate(balance =>
        {
            if (balance.Value < amount)
            {
                throw new InvalidOperationException(
                    $"Withdrawing {amount} credits from account " +
                    $"\"{this.GetPrimaryKeyString()}\" would overdraw it." +
                    $" This account has {balance.Value} credits.");
            }

            balance.Value -= amount;
        });

    public Task<decimal> GetBalance() =>
        _balance.PerformRead(balance => balance.Value);
}

重要

トランザクション グレインを ReentrantAttribute でマークして、トランザクション コンテキストがグレイン呼び出しに正しく渡されるようにする必要があります。

前の例で、TransactionalStateAttribute は、balance コンストラクター パラメーターを "balance" という名前のトランザクション状態に関連付ける必要があることを宣言するために使用されています。 この宣言では、 Orleans が、"TransactionStore" という名前のトランザクション状態ストレージから読み込まれた状態を持つ ITransactionalState<TState> インスタンスを挿入します。 状態は、PerformUpdate で変更したり、 PerformRead で読み出したりできます。 トランザクション インフラストラクチャにより、トランザクションの一部として実行されるこのような変更は、Orleans クラスター上に分散された複数のグレイン間であっても、トランザクションを生成したグレイン呼び出しの完了時には、すべてコミットされるか、すべて元に戻されることが保証されます (前の例では IAtmGrain.Transfer)。

クライアントからトランザクション メソッドを呼び出す

トランザクション グレイン メソッドを呼び出すには、ITransactionClient を使用することをお勧めします。 ITransactionClient は、Orleans クライアントの構成時に依存関係挿入サービス プロバイダーに自動的に登録されます。 ITransactionClient は、トランザクション コンテキストを作成し、そのコンテキスト内でトランザクション グレイン メソッドを呼び出すために使用されます。 次の例は、ITransactionClient を使用してトランザクション グレイン メソッドを呼び出す方法を示しています。

using IHost host = Host.CreateDefaultBuilder(args)
    .UseOrleansClient((_, client) =>
    {
        client.UseLocalhostClustering()
            .UseTransactions();
    })
    .Build();

await host.StartAsync();

var client = host.Services.GetRequiredService<IClusterClient>();
var transactionClient= host.Services.GetRequiredService<ITransactionClient>();

var accountNames = new[] { "Xaawo", "Pasqualino", "Derick", "Ida", "Stacy", "Xiao" };
var random = Random.Shared;

while (!Console.KeyAvailable)
{
    // Choose some random accounts to exchange money
    var fromIndex = random.Next(accountNames.Length);
    var toIndex = random.Next(accountNames.Length);
    while (toIndex == fromIndex)
    {
        // Avoid transferring to/from the same account, since it would be meaningless
        toIndex = (toIndex + 1) % accountNames.Length;
    }

    var fromKey = accountNames[fromIndex];
    var toKey = accountNames[toIndex];
    var fromAccount = client.GetGrain<IAccountGrain>(fromKey);
    var toAccount = client.GetGrain<IAccountGrain>(toKey);

    // Perform the transfer and query the results
    try
    {
        var transferAmount = random.Next(200);

        await transactionClient.RunTransaction(
            TransactionOption.Create, 
            async () =>
            {
                await fromAccount.Withdraw(transferAmount);
                await toAccount.Deposit(transferAmount);
            });

        var fromBalance = await fromAccount.GetBalance();
        var toBalance = await toAccount.GetBalance();

        Console.WriteLine(
            $"We transferred {transferAmount} credits from {fromKey} to " +
            $"{toKey}.\n{fromKey} balance: {fromBalance}\n{toKey} balance: {toBalance}\n");
    }
    catch (Exception exception)
    {
        Console.WriteLine(
            $"Error transferring credits from " +
            $"{fromKey} to {toKey}: {exception.Message}");

        if (exception.InnerException is { } inner)
        {
            Console.WriteLine($"\tInnerException: {inner.Message}\n");
        }

        Console.WriteLine();
    }

    // Sleep and run again
    await Task.Delay(TimeSpan.FromMilliseconds(200));
}

上記のクライアント コードでは、次の手順を実行します。

  • UseOrleansClient を使用して IHostBuilder を構成します。
    • IClientBuilder では、localhost クラスタリングとトランザクションが使用されます。
  • IClusterClient インターフェイスと ITransactionClient インターフェイスは、サービス プロバイダーから取得されます。
  • from変数と to 変数には、それらのIAccountGrain 参照が割り当てられます。
  • ITransactionClient は、以下を呼び出してトランザクションを作成するために使用されます。
    • from アカウント グレイン参照に対する Withdraw の呼び出し。
    • Deposit アカウント グレイン参照に対する to の呼び出し。

transactionDelegate で例外がスローされた場合、または、矛盾した transactionOption が指定された場合でなければ、トランザクションは常にコミットされます。 トランザクション グレイン メソッドを呼び出すには、ITransactionClient を使用することをお勧めしますが、トランザクション グレイン メソッドを別のグレインから直接呼び出すこともできます。

別のグレインからトランザクション メソッドを呼び出す

グレイン インターフェイスのトランザクション メソッドは、他のグレイン メソッドと同様に呼び出されます。 ITransactionClient を使用しない代替方法として、以下の AtmGrain 実装では、IAccountGrain インターフェイスで Transfer メソッド (トランザクション) が呼び出されます。

2 つの参照先アカウント グレインを解決し、WithdrawDeposit への適切な呼び出しを行う AtmGrain 実装を考えてみましょう。

namespace TransactionalExample.Grains;

[StatelessWorker]
public class AtmGrain : Grain, IAtmGrain
{
    public Task Transfer(
        string fromId,
        string toId,
        decimal amount) =>
        Task.WhenAll(
            GrainFactory.GetGrain<IAccountGrain>(fromId).Withdraw(amount),
            GrainFactory.GetGrain<IAccountGrain>(toId).Deposit(amount));
}

クライアント アプリ コードは、以下のようにトランザクション方式で AtmGrain.Transfer を呼び出すことができます。

IAtmGrain atmOne = client.GetGrain<IAtmGrain>(0);

Guid from = Guid.NewGuid();
Guid to = Guid.NewGuid();

await atmOne.Transfer(from, to, 100);

uint fromBalance = await client.GetGrain<IAccountGrain>(from).GetBalance();
uint toBalance = await client.GetGrain<IAccountGrain>(to).GetBalance();

上記の呼び出しでは、 IAtmGrain を使用して、あるアカウントから別のアカウントに 100 単位の通貨を転送します。 転送が完了すると、両方のアカウントに対してクエリが実行され、現在の残高が取得されます。 通貨振替と両方の口座照会は、ACID トランザクションとして実行されます。

前の例に示したように、トランザクションは、他のグレイン呼び出しと同様、Task 内の値を返すことができます。 ただし、呼び出しの失敗時には、アプリケーションの例外ではなく、OrleansTransactionException または TimeoutException がスローされます。 アプリケーションがトランザクション中に例外をスローし、その例外が原因でトランザクションが失敗した場合 (他のシステムエラーが原因で失敗するのではなく)、アプリケーション例外は OrleansTransactionException の内部例外になります。

OrleansTransactionAbortedException 型のトランザクション例外がスローされた場合、トランザクションは失敗し、再試行できます。 その他の例外がスローされた場合は、トランザクションが不明な状態で終了したことを示します。 トランザクションは分散操作であるため、不明な状態のトランザクションは、成功、失敗、または進行中の可能性があります。 このため、連鎖的な中止を回避するために、状態を確認する前または操作を再試行する前に、呼び出しタイムアウト期間 (SiloMessagingOptions.SystemResponseTimeout) を経過させることをお勧めします。