Megosztás a következőn keresztül:


Orleans tranzakció

Orleans támogatja az elosztott ACID-tranzakciókat az állandó szemcseállapothoz. A tranzakciók a Microsoft.használatávalOrleans implementálódnak. Tranzakciók NuGet-csomag. A jelen cikkben szereplő mintaalkalmazás forráskódja négy projektből áll:

  • Absztrakciók: A szemcsés felületeket és a megosztott osztályokat tartalmazó osztálytár.
  • Szemcsék: A szemcsés implementációkat tartalmazó osztálykönyvtár.
  • Kiszolgáló: Egy konzolalkalmazás, amely az absztrakciókat és az osztálykódtárakat használja, és silóként Orleans működik.
  • Ügyfél: Az ügyfelet képviselő Orleans absztrakciós osztálytárat használó konzolalkalmazás.

Beállítás

Orleans a tranzakciókat a rendszer leiratkozza. A silót és az ügyfelet egyaránt konfigurálni kell a tranzakciók használatára. Ha nincsenek konfigurálva, a rendszer megkapja a tranzakciós metódusokra irányuló hívásokat a OrleansTransactionsDisabledExceptiongrain implementációban. A siló tranzakcióinak engedélyezéséhez hívja SiloBuilderExtensions.UseTransactions meg a siló gazdagép-készítőt:

var builder = Host.CreateDefaultBuilder(args)
    UseOrleans((context, siloBuilder) =>
    {
        siloBuilder.UseTransactions();
    });

Hasonlóképpen, az ügyfél tranzakcióinak engedélyezéséhez hívja meg ClientBuilderExtensions.UseTransactions az ügyfél gazdagépszerkesztőt:

var builder = Host.CreateDefaultBuilder(args)
    UseOrleansClient((context, clientBuilder) =>
    {
        clientBuilder.UseTransactions();
    });

Tranzakciós állapot tárterülete

A tranzakciók használatához konfigurálnia kell egy adattárat. A különböző tranzakciókkal rendelkező adattárak támogatásához a tárterület absztrakciója ITransactionalStateStorage<TState> használható. Ez az absztrakció a tranzakciók igényeire vonatkozik, ellentétben az általános szemcsés tárolással (IGrainStorage). Tranzakcióspecifikus tárolás használatához konfigurálja a silót az Azure (AddAzureTableTransactionalStateStorage) bármely implementációjának ITransactionalStateStoragehasználatával.

Vegyük például a következő gazdagépszerkesztő konfigurációját:

await Host.CreateDefaultBuilder(args)
    .UseOrleans((_, silo) =>
    {
        silo.UseLocalhostClustering();

        if (Environment.GetEnvironmentVariable(
                "ORLEANS_STORAGE_CONNECTION_STRING") is { } connectionString)
        {
            silo.AddAzureTableTransactionalStateStorage(
                "TransactionStore", 
                options => options.ConfigureTableServiceClient(connectionString));
        }
        else
        {
            silo.AddMemoryGrainStorageAsDefault();
        }

        silo.UseTransactions();
    })
    .RunConsoleAsync();

Fejlesztési célokra, ha a tranzakcióspecifikus tároló nem érhető el a szükséges adattárhoz, akkor inkább implementációt IGrainStorage használhat. Az olyan tranzakciós állapotok esetében, amelyekhez nincs konfigurálva tároló, a tranzakciók egy híd használatával próbálják meg a feladatátvételt a gabonatárolóba. A tranzakciós állapot elérése hídon keresztül a gabonatárolóhoz kevésbé hatékony, és a jövőben nem támogatott. Ezért a javaslat az, hogy ezt csak fejlesztési célokra használja.

Szemcsés illesztők

Ahhoz, hogy egy gabona támogassa a tranzakciókat, a szemcsés felületen lévő tranzakciós metódusokat a tranzakció részeként kell megjelölni a TransactionAttribute. Az attribútumnak meg kell jelölnie, hogyan viselkedik a szemcsés hívás tranzakciós környezetben az alábbi TransactionOption értékekkel részletesen:

  • TransactionOption.Create: A hívás tranzakciós, és mindig létrehoz egy új tranzakciós környezetet (új tranzakciót indít el), még akkor is, ha egy meglévő tranzakciós környezetben hívja meg.
  • TransactionOption.Join: A hívás tranzakciós, de csak egy meglévő tranzakció kontextusában hívható meg.
  • TransactionOption.CreateOrJoin: A hívás tranzakciós. Ha egy tranzakció kontextusában hívja meg, akkor ezt a környezetet fogja használni, különben létrehoz egy új környezetet.
  • TransactionOption.Suppress: A hívás nem tranzakciós, de meghívható egy tranzakción belülről. Ha egy tranzakció kontextusában hívják meg, a környezet nem lesz átadva a hívásnak.
  • TransactionOption.Supported: A hívás nem tranzakciós, de támogatja a tranzakciókat. Ha a hívás egy tranzakció kontextusában történik, a rendszer átadja a környezetet a hívásnak.
  • TransactionOption.NotAllowed: A hívás nem tranzakciós, és nem hívható meg egy tranzakción belülről. Ha egy tranzakció kontextusában hívja meg, a rendszer a NotSupportedException.

A hívások megjelölhetők úgy TransactionOption.Create, hogy a hívás mindig elindítja a tranzakciót. Például az Transfer alábbi ATM-szemcsében lévő művelet mindig elindít egy új tranzakciót, amely a két hivatkozott fiókot foglalja magában.

namespace TransactionalExample.Abstractions;

public interface IAtmGrain : IGrainWithIntegerKey
{
    [Transaction(TransactionOption.Create)]
    Task Transfer(string fromId, string toId, decimal amountToTransfer);
}

A tranzakciós műveletek Withdraw és Deposit a fiók szemcséje meg van jelölveTransactionOption.Join, ami azt jelzi, hogy csak egy meglévő tranzakció kontextusában hívhatók meg, ami akkor fordul elő, ha a rendszer meghívja őket .IAtmGrain.Transfer A GetBalance hívás úgy van megjelölve CreateOrJoin , hogy egy meglévő tranzakción belülről is meghívható legyen, például keresztül IAtmGrain.Transfervagy önállóan.

namespace TransactionalExample.Abstractions;

public interface IAccountGrain : IGrainWithStringKey
{
    [Transaction(TransactionOption.Join)]
    Task Withdraw(decimal amount);

    [Transaction(TransactionOption.Join)]
    Task Deposit(decimal amount);

    [Transaction(TransactionOption.CreateOrJoin)]
    Task<decimal> GetBalance();
}

Fontos tényezők

A OnActivateAsync hívás nem jelölhető tranzakciósként, mivel egy ilyen híváshoz megfelelő beállítás szükséges a hívás előtt. Csak a szemcsés alkalmazás API-hoz létezik. Ez azt jelenti, hogy a tranzakciós állapot ezen metódusok részeként való olvasásának kísérlete kivételt okoz a futtatókörnyezetben.

Szemcsés implementációk

A szemcsés végrehajtásnak egy ITransactionalState<TState> aspektust kell használnia a gabonaállapot ACID-tranzakciókon keresztüli kezeléséhez.

public interface ITransactionalState<TState>
    where TState : class, new()
{
    Task<TResult> PerformRead<TResult>(
        Func<TState, TResult> readFunction);

    Task<TResult> PerformUpdate<TResult>(
        Func<TState, TResult> updateFunction);
}

A fenntartott állapothoz való összes olvasási vagy írási hozzáférést a tranzakciós állapot aspektusának átadott szinkron függvényekkel kell elvégezni. Ez lehetővé teszi, hogy a tranzakciós rendszer tranzakciós módon hajtsa végre vagy mondja le ezeket a műveleteket. Ha egy szemcsén belüli tranzakciós állapotot szeretne használni, definiáljon egy szerializálható állapotosztályt, amelyet meg kell őrizni, és deklarálja a tranzakciós állapotot a gabona konstruktorában egy TransactionalStateAttribute. Az utóbbi deklarálja az állam nevét, és opcionálisan azt is, hogy melyik tranzakciós állapottárolót használja. További információ: Beállítás.

[AttributeUsage(AttributeTargets.Parameter)]
public class TransactionalStateAttribute : Attribute
{
    public TransactionalStateAttribute(string stateName, string storageName = null)
    {
        // ...
    }
}

Az állapotobjektum például a Balance következőképpen van definiálva:

namespace TransactionalExample.Abstractions;

[GenerateSerializer]
public record class Balance
{
    [Id(0)]
    public decimal Value { get; set; } = 1_000;
}

Az előző állapotobjektum:

  • A rendszer a GenerateSerializerAttribute szerializáló létrehozására utasítja a Orleans kódgenerátort.
  • Rendelkezik egy Value olyan tulajdonsággal, amelyet a IdAttribute tag egyedi azonosítására szolgáló díszít.

Az Balance állapotobjektumot ezután a rendszer az alábbiak szerint használja a AccountGrain megvalósításban:

namespace TransactionalExample.Grains;

[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
    private readonly ITransactionalState<Balance> _balance;

    public AccountGrain(
        [TransactionalState(nameof(balance))]
        ITransactionalState<Balance> balance) =>
        _balance = balance ?? throw new ArgumentNullException(nameof(balance));

    public Task Deposit(decimal amount) =>
        _balance.PerformUpdate(
            balance => balance.Value += amount);

    public Task Withdraw(decimal amount) =>
        _balance.PerformUpdate(balance =>
        {
            if (balance.Value < amount)
            {
                throw new InvalidOperationException(
                    $"Withdrawing {amount} credits from account " +
                    $"\"{this.GetPrimaryKeyString()}\" would overdraw it." +
                    $" This account has {balance.Value} credits.");
            }

            balance.Value -= amount;
        });

    public Task<decimal> GetBalance() =>
        _balance.PerformRead(balance => balance.Value);
}

Fontos

A tranzakciós szemcsét meg kell jelölni azzal, ReentrantAttribute hogy a tranzakciós környezet megfelelően legyen átadva a gabonahívásnak.

Az előző példában a TransactionalStateAttribute rendszer azt deklarálja, hogy a balance konstruktorparamétert egy tranzakciós állapothoz kell társítani."balance" Ezzel a deklarációval Orleans egy ITransactionalState<TState> példányt injektál a tranzakciós állapot tárolójából "TransactionStore"betöltött állapottal. Az állapot a következőn keresztül PerformUpdate módosítható vagy olvasható.PerformRead A tranzakciós infrastruktúra biztosítja, hogy a tranzakció részeként végrehajtott módosítások , még a fürtön elosztott Orleans több szem között is, mind le lesznek véglegesítve, vagy mind visszavonva lesznek a tranzakciót létrehozó gabonahívás befejezésekor (IAtmGrain.Transfer az előző példában).

Tranzakciós metódusok meghívása ügyféltől

A tranzakciós szemcse metódus meghívásának ajánlott módja a ITransactionClient. Az ITransactionClient ügyfél konfigurálásakor a rendszer automatikusan regisztrálja a Orleans függőséginjektálási szolgáltatónál. Ez ITransactionClient egy tranzakciós környezet létrehozásához és a tranzakciós szemcsés metódusok meghívásához használható ebben a környezetben. Az alábbi példa bemutatja, hogyan hívhatja meg a ITransactionClient tranzakciós szemcsés metódusokat.

using IHost host = Host.CreateDefaultBuilder(args)
    .UseOrleansClient((_, client) =>
    {
        client.UseLocalhostClustering()
            .UseTransactions();
    })
    .Build();

await host.StartAsync();

var client = host.Services.GetRequiredService<IClusterClient>();
var transactionClient= host.Services.GetRequiredService<ITransactionClient>();

var accountNames = new[] { "Xaawo", "Pasqualino", "Derick", "Ida", "Stacy", "Xiao" };
var random = Random.Shared;

while (!Console.KeyAvailable)
{
    // Choose some random accounts to exchange money
    var fromIndex = random.Next(accountNames.Length);
    var toIndex = random.Next(accountNames.Length);
    while (toIndex == fromIndex)
    {
        // Avoid transferring to/from the same account, since it would be meaningless
        toIndex = (toIndex + 1) % accountNames.Length;
    }

    var fromKey = accountNames[fromIndex];
    var toKey = accountNames[toIndex];
    var fromAccount = client.GetGrain<IAccountGrain>(fromKey);
    var toAccount = client.GetGrain<IAccountGrain>(toKey);

    // Perform the transfer and query the results
    try
    {
        var transferAmount = random.Next(200);

        await transactionClient.RunTransaction(
            TransactionOption.Create, 
            async () =>
            {
                await fromAccount.Withdraw(transferAmount);
                await toAccount.Deposit(transferAmount);
            });

        var fromBalance = await fromAccount.GetBalance();
        var toBalance = await toAccount.GetBalance();

        Console.WriteLine(
            $"We transferred {transferAmount} credits from {fromKey} to " +
            $"{toKey}.\n{fromKey} balance: {fromBalance}\n{toKey} balance: {toBalance}\n");
    }
    catch (Exception exception)
    {
        Console.WriteLine(
            $"Error transferring credits from " +
            $"{fromKey} to {toKey}: {exception.Message}");

        if (exception.InnerException is { } inner)
        {
            Console.WriteLine($"\tInnerException: {inner.Message}\n");
        }

        Console.WriteLine();
    }

    // Sleep and run again
    await Task.Delay(TimeSpan.FromMilliseconds(200));
}

Az előző ügyfélkódban:

  • A IHostBuilder beállítás a következővel UseOrleansClientvan konfigurálva: .
    • A IClientBuilder rendszer localhost-fürtözést és tranzakciókat használ.
  • A IClusterClient rendszer lekéri az és ITransactionClient a felületeket a szolgáltatótól.
  • A from rendszer hozzárendeli a változók és to a változók hivatkozásait IAccountGrain .
  • Ez ITransactionClient egy tranzakció létrehozásához használatos, amely a következő hívásokat hívja meg:
    • Withdraw a fiók szemcsés from referenciáján.
    • Deposit a fiók szemcsés to referenciáján.

A tranzakciók mindig véglegesítésre kerülnek, kivéve, ha transactionDelegate a megadott vagy egymásnak ellentmondó transactionOption kivétel van érvényben. Bár a tranzakciós szemcse metódusok meghívásának ajánlott módja a ITransactionClientmetódus használata, a tranzakciós szemcse metódusokat közvetlenül egy másik szemből is meghívhatja.

Tranzakciós metódusok meghívása másik szemcséből

A tranzakciós metódusokat a szemcsés felületen úgy nevezzük, mint bármely más gabonametódust. Az alábbi implementáció alternatív megközelítésként ITransactionClientAtmGrain meghívja a Transfer (tranzakciós) metódust a IAccountGrain felületen.

Fontolja meg a megvalósítást AtmGrain , amely feloldja a két hivatkozott fiókszemcséket, és végrehajtja a megfelelő hívásokat a következőre Withdraw Deposit:

namespace TransactionalExample.Grains;

[StatelessWorker]
public class AtmGrain : Grain, IAtmGrain
{
    public Task Transfer(
        string fromId,
        string toId,
        decimal amount) =>
        Task.WhenAll(
            GrainFactory.GetGrain<IAccountGrain>(fromId).Withdraw(amount),
            GrainFactory.GetGrain<IAccountGrain>(toId).Deposit(amount));
}

Az ügyfélalkalmazás kódja tranzakciós módon hívható AtmGrain.Transfer meg az alábbiak szerint:

IAtmGrain atmOne = client.GetGrain<IAtmGrain>(0);

Guid from = Guid.NewGuid();
Guid to = Guid.NewGuid();

await atmOne.Transfer(from, to, 100);

uint fromBalance = await client.GetGrain<IAccountGrain>(from).GetBalance();
uint toBalance = await client.GetGrain<IAccountGrain>(to).GetBalance();

Az előző hívásokban IAtmGrain a rendszer 100 pénznemegységet ad át egyik számláról a másikra. Az átvitel befejezése után a rendszer mindkét fiókot lekérdezi az aktuális egyenleg lekéréséhez. A pénznemátutalás, valamint mindkét fiók lekérdezése ACID-tranzakcióként történik.

Ahogy az előző példában is látható, a tranzakciók egy , más szemcsés hívásokhoz hasonló értékeket Taskadhatnak vissza. Híváshiba esetén azonban nem alkalmazáskivételt, hanem inkább egy OrleansTransactionException vagy TimeoutException. Ha az alkalmazás kivételt ad a tranzakció során, és ez a kivétel a tranzakció meghiúsulását okozza (szemben a többi rendszerhiba miatt bekövetkező hibával), az alkalmazáskivétel lesz a belső kivétele a OrleansTransactionExceptiontranzakciónak.

Ha tranzakciós kivételt ad ki, OrleansTransactionAbortedExceptiona tranzakció sikertelen volt, és újrapróbálkozott. Bármely más kivétel azt jelzi, hogy a tranzakció ismeretlen állapottal fejeződött be. Mivel a tranzakciók elosztott műveletek, az ismeretlen állapotú tranzakciók sikeresek, sikertelenek vagy még folyamatban lehetnek. Ezért célszerű engedélyezni a hívás időtúllépési időszakát (SiloMessagingOptions.SystemResponseTimeout) a kaszkádolt megszakítások elkerülése érdekében az állapot ellenőrzése vagy a művelet újrapróbálkozása előtt.