Aracılığıyla paylaş


Orleans işlem

Orleans kalıcı tanecik durumuna karşı dağıtılmış ACID işlemlerini destekler. İşlemler Microsoft kullanılarak uygulanır..Orleans. İşlemler NuGet paketi. Bu makaledeki örnek uygulamanın kaynak kodu dört projeden oluşur:

  • Soyutlamalar: Taneli arabirimleri ve paylaşılan sınıfları içeren bir sınıf kitaplığı.
  • Tanecikler: Tanecik uygulamalarını içeren bir sınıf kitaplığı.
  • Sunucu: Soyutlamaları kullanan ve sınıf kitaplıklarını kullanan ve silo işlevi Orleans gören bir konsol uygulaması.
  • İstemci: İstemciyi temsil Orleans eden soyutlama sınıf kitaplığını kullanan bir konsol uygulaması.

Ayarlama

Orleans işlemler kabul edilir. Silo ve istemcinin her ikisi de işlemleri kullanacak şekilde yapılandırılmalıdır. Yapılandırılmazlarsa, bir taneli uygulamadaki işlem yöntemlerine yapılan tüm çağrılar alır OrleansTransactionsDisabledException. Siloda işlemleri etkinleştirmek için silo ana bilgisayar oluşturucusunu çağırın SiloBuilderExtensions.UseTransactions :

var builder = Host.CreateDefaultBuilder(args)
    UseOrleans((context, siloBuilder) =>
    {
        siloBuilder.UseTransactions();
    });

Benzer şekilde, istemcide işlemleri etkinleştirmek için istemci ana bilgisayar oluşturucusunu çağırın ClientBuilderExtensions.UseTransactions :

var builder = Host.CreateDefaultBuilder(args)
    UseOrleansClient((context, clientBuilder) =>
    {
        clientBuilder.UseTransactions();
    });

İşlem durumu depolama

İşlemleri kullanmak için bir veri deposu yapılandırmanız gerekir. Çeşitli veri depolarını işlemlerle desteklemek için depolama soyutlaması ITransactionalStateStorage<TState> kullanılır. Bu soyutlama, genel taneli depolamanın (IGrainStorage) aksine işlemlerin gereksinimlerine özgüdür. İşlemlere özgü depolamayı kullanmak için siloyu Azure (AddAzureTableTransactionalStateStorage) gibi herhangi bir uygulamasını ITransactionalStateStoragekullanarak yapılandırın.

Örneğin, aşağıdaki konak oluşturucu yapılandırmasını göz önünde bulundurun:

await Host.CreateDefaultBuilder(args)
    .UseOrleans((_, silo) =>
    {
        silo.UseLocalhostClustering();

        if (Environment.GetEnvironmentVariable(
                "ORLEANS_STORAGE_CONNECTION_STRING") is { } connectionString)
        {
            silo.AddAzureTableTransactionalStateStorage(
                "TransactionStore", 
                options => options.ConfigureTableServiceClient(connectionString));
        }
        else
        {
            silo.AddMemoryGrainStorageAsDefault();
        }

        silo.UseTransactions();
    })
    .RunConsoleAsync();

Geliştirme amacıyla, ihtiyacınız olan veri deposu için işleme özgü depolama kullanılamıyorsa, bunun yerine bir IGrainStorage uygulama kullanabilirsiniz. Yapılandırılmış bir deposu olmayan herhangi bir işlem durumu için işlemler bir köprü kullanarak tahıl depolamaya yük devretmeyi dener. İşlem durumuna köprüden tane depolamaya erişim daha az verimlidir ve gelecekte desteklenmeyebilir. Bu nedenle, öneri bunu yalnızca geliştirme amacıyla kullanmaktır.

Tane arabirimleri

Bir dilimin işlemleri desteklemesi için, bir taneli arabirimdeki işlem yöntemleri kullanılarak TransactionAttributebir işlemin parçası olarak işaretlenmelidir. özniteliğinin, aşağıdaki TransactionOption değerlerle ayrıntılı olarak açıklandığı gibi, grain çağrısının işlem ortamında nasıl davrandığını belirtmesi gerekir:

  • TransactionOption.Create: Çağrısı işlemseldir ve mevcut bir işlem bağlamında çağrılsa bile her zaman yeni bir işlem bağlamı oluşturur (yeni bir işlem başlatır).
  • TransactionOption.Join: Çağrı işlemseldir, ancak yalnızca mevcut bir işlem bağlamında çağrılabilir.
  • TransactionOption.CreateOrJoin: Çağrı işlemseldir. bir işlem bağlamında çağrılırsa, bu bağlamı kullanır, aksi takdirde yeni bir bağlam oluşturur.
  • TransactionOption.Suppress: Çağrı işlemsel değildir, ancak bir işlem içinden çağrılabilir. Bir işlem bağlamında çağrılırsa, bağlam çağrıya geçirilmeyecektir.
  • TransactionOption.Supported: Çağrı işlemsel değildir ancak işlemleri destekler. Bir işlem bağlamında çağrılırsa, bağlam çağrıya geçirilir.
  • TransactionOption.NotAllowed: Çağrı işlemsel değildir ve bir işlemin içinden çağrılamaz. Bir işlem bağlamında çağrılırsa, oluşturur NotSupportedException.

Çağrılar olarak TransactionOption.Createişaretlenebilir, yani çağrı her zaman işlemini başlatır. Örneğin, Transfer aşağıdaki ATM dilimindeki işlem her zaman başvuruda bulunan iki hesabı içeren yeni bir işlem başlatır.

namespace TransactionalExample.Abstractions;

public interface IAtmGrain : IGrainWithIntegerKey
{
    [Transaction(TransactionOption.Create)]
    Task Transfer(string fromId, string toId, decimal amountToTransfer);
}

İşlem işlemleri Withdraw ve Deposit hesap diliminde işaretlenir TransactionOption.Joinve yalnızca mevcut bir işlem bağlamında çağrılabileceklerini belirtir. Bu işlem sırasında IAtmGrain.Transferçağrılırsa bu durum geçerli olur. ÇağrıGetBalance, aracılığıyla veya kendi başına gibi IAtmGrain.Transfermevcut bir işlem içinden çağrılabilmesi için işaretlenirCreateOrJoin.

namespace TransactionalExample.Abstractions;

public interface IAccountGrain : IGrainWithStringKey
{
    [Transaction(TransactionOption.Join)]
    Task Withdraw(decimal amount);

    [Transaction(TransactionOption.Join)]
    Task Deposit(decimal amount);

    [Transaction(TransactionOption.CreateOrJoin)]
    Task<decimal> GetBalance();
}

Dikkat edilmesi gereken önemli hususlar

bu OnActivateAsync tür bir çağrı çağrıdan önce düzgün bir kurulum gerektirdiğinden işlemsel olarak işaretlenemiyor. Yalnızca taneli uygulama API'sine yöneliktir. Bu, bu yöntemlerin bir parçası olarak işlem durumunu okuma girişiminin çalışma zamanında bir özel durum oluşturacağı anlamına gelir.

Hububat uygulamaları

Bir taneli uygulamanın ACID işlemleri aracılığıyla gren durumunu yönetmek için model ITransactionalState<TState> kullanması gerekir.

public interface ITransactionalState<TState>
    where TState : class, new()
{
    Task<TResult> PerformRead<TResult>(
        Func<TState, TResult> readFunction);

    Task<TResult> PerformUpdate<TResult>(
        Func<TState, TResult> updateFunction);
}

Kalıcı duruma tüm okuma veya yazma erişimi, işlem durumu modeline geçirilen zaman uyumlu işlevler aracılığıyla gerçekleştirilmelidir. Bu, işlem sisteminin bu işlemleri işlem yoluyla gerçekleştirmesine veya iptal etmesine olanak tanır. Bir dilim içinde işlem durumu kullanmak için, kalıcı hale getirilecek serileştirilebilir bir durum sınıfı tanımlarsınız ve bir ile TransactionalStateAttributetahılın oluşturucusunda işlem durumunu bildirirsiniz. İkincisi durum adını ve isteğe bağlı olarak hangi işlem durumu depolamasının kullanılacağını bildirir. Daha fazla bilgi için bkz . Kurulum.

[AttributeUsage(AttributeTargets.Parameter)]
public class TransactionalStateAttribute : Attribute
{
    public TransactionalStateAttribute(string stateName, string storageName = null)
    {
        // ...
    }
}

Örneğin, Balance durum nesnesi aşağıdaki gibi tanımlanır:

namespace TransactionalExample.Abstractions;

[GenerateSerializer]
public record class Balance
{
    [Id(0)]
    public decimal Value { get; set; } = 1_000;
}

Önceki durum nesnesi:

  • kod oluşturucusunun GenerateSerializerAttribute bir seri hale getirici oluşturmasını Orleans bildirmek için ile dekore edilmiştir.
  • Üyeyi benzersiz olarak tanımlamak için ile IdAttribute süslenmiş bir Value özelliği vardır.

Durum Balance nesnesi daha sonra uygulamada aşağıdaki gibi kullanılır AccountGrain :

namespace TransactionalExample.Grains;

[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
    private readonly ITransactionalState<Balance> _balance;

    public AccountGrain(
        [TransactionalState(nameof(balance))]
        ITransactionalState<Balance> balance) =>
        _balance = balance ?? throw new ArgumentNullException(nameof(balance));

    public Task Deposit(decimal amount) =>
        _balance.PerformUpdate(
            balance => balance.Value += amount);

    public Task Withdraw(decimal amount) =>
        _balance.PerformUpdate(balance =>
        {
            if (balance.Value < amount)
            {
                throw new InvalidOperationException(
                    $"Withdrawing {amount} credits from account " +
                    $"\"{this.GetPrimaryKeyString()}\" would overdraw it." +
                    $" This account has {balance.Value} credits.");
            }

            balance.Value -= amount;
        });

    public Task<decimal> GetBalance() =>
        _balance.PerformRead(balance => balance.Value);
}

Önemli

İşlem bağlamını doğru şekilde grain çağrısına geçirildiğinden emin olmak için işlem dilimi ile ReentrantAttribute işaretlenmelidir.

Yukarıdaki örnekte, TransactionalStateAttribute oluşturucu parametresinin balance adlı "balance"bir işlem durumuyla ilişkilendirilmesi gerektiğini bildirmek için kullanılır. Bu bildirimle, Orleans adlı "TransactionStore"işlem durumu depolama alanından yüklenen bir durum içeren bir ITransactionalState<TState> örnek ekler. Durum aracılığıyla değiştirilebilir PerformUpdate veya aracılığıyla PerformReadokunabilir. İşlem altyapısı, bir işlemin parçası olarak gerçekleştirilen bu tür değişikliklerin, bir küme üzerinde Orleans dağıtılmış birden çok tane arasında bile olsa, işlemi oluşturan taneli çağrı tamamlandıktan sonra (IAtmGrain.Transfer önceki örnekte) tümünün işlenmesini veya tümünün geri alınmasını sağlar.

İstemciden işlem yöntemlerini çağırma

İşlem dilimi yöntemini çağırmanın önerilen yolu kullanmaktır ITransactionClient. ITransactionClient, istemci yapılandırıldığında bağımlılık ekleme hizmeti sağlayıcısına Orleans otomatik olarak kaydedilir. ITransactionClient bir işlem bağlamı oluşturmak ve bu bağlamda işlem dilimi yöntemlerini çağırmak için kullanılır. Aşağıdaki örnekte işlem dilimi yöntemlerini çağırmak için öğesinin nasıl kullanılacağı ITransactionClient gösterilmektedir.

using IHost host = Host.CreateDefaultBuilder(args)
    .UseOrleansClient((_, client) =>
    {
        client.UseLocalhostClustering()
            .UseTransactions();
    })
    .Build();

await host.StartAsync();

var client = host.Services.GetRequiredService<IClusterClient>();
var transactionClient= host.Services.GetRequiredService<ITransactionClient>();

var accountNames = new[] { "Xaawo", "Pasqualino", "Derick", "Ida", "Stacy", "Xiao" };
var random = Random.Shared;

while (!Console.KeyAvailable)
{
    // Choose some random accounts to exchange money
    var fromIndex = random.Next(accountNames.Length);
    var toIndex = random.Next(accountNames.Length);
    while (toIndex == fromIndex)
    {
        // Avoid transferring to/from the same account, since it would be meaningless
        toIndex = (toIndex + 1) % accountNames.Length;
    }

    var fromKey = accountNames[fromIndex];
    var toKey = accountNames[toIndex];
    var fromAccount = client.GetGrain<IAccountGrain>(fromKey);
    var toAccount = client.GetGrain<IAccountGrain>(toKey);

    // Perform the transfer and query the results
    try
    {
        var transferAmount = random.Next(200);

        await transactionClient.RunTransaction(
            TransactionOption.Create, 
            async () =>
            {
                await fromAccount.Withdraw(transferAmount);
                await toAccount.Deposit(transferAmount);
            });

        var fromBalance = await fromAccount.GetBalance();
        var toBalance = await toAccount.GetBalance();

        Console.WriteLine(
            $"We transferred {transferAmount} credits from {fromKey} to " +
            $"{toKey}.\n{fromKey} balance: {fromBalance}\n{toKey} balance: {toBalance}\n");
    }
    catch (Exception exception)
    {
        Console.WriteLine(
            $"Error transferring credits from " +
            $"{fromKey} to {toKey}: {exception.Message}");

        if (exception.InnerException is { } inner)
        {
            Console.WriteLine($"\tInnerException: {inner.Message}\n");
        }

        Console.WriteLine();
    }

    // Sleep and run again
    await Task.Delay(TimeSpan.FromMilliseconds(200));
}

Önceki istemci kodunda:

  • IHostBuilder ile UseOrleansClientyapılandırılır.
    • , IClientBuilder localhost kümelemini ve işlemlerini kullanır.
  • IClusterClient ve ITransactionClient arabirimleri hizmet sağlayıcısından alınır.
  • from ve to değişkenlerine başvuruları atanırIAccountGrain.
  • ITransactionClient, şu çağrıyı yaparak bir işlem oluşturmak için kullanılır:
    • Withdraw hesap dilimi başvurusunda from .
    • Deposit hesap dilimi başvurusunda to .

içinde veya transactionDelegate belirtilen çelişkili transactionOption bir özel durum olmadığı sürece işlemler her zaman işlenir. İşlem hub'ı yöntemlerini çağırmanın önerilen yolu kullanmak olsa ITransactionClientda, işlem hub'ı yöntemlerini doğrudan başka bir tanecikten de çağırabilirsiniz.

Başka bir dilimden işlem yöntemlerini çağırma

Bir tanecik arabirimindeki işlem yöntemleri, diğer herhangi bir tanecik yöntemi gibi adlandırılır. kullanarak ITransactionClientAtmGrain alternatif bir yaklaşım olarak, aşağıdaki uygulama arabiriminde Transfer yöntemini (işlemselIAccountGrain) çağırır.

Başvurulan AtmGrain iki hesap dilimini çözümleyen ve ve Depositiçin uygun çağrıları Withdraw yapan uygulamayı göz önünde bulundurun:

namespace TransactionalExample.Grains;

[StatelessWorker]
public class AtmGrain : Grain, IAtmGrain
{
    public Task Transfer(
        string fromId,
        string toId,
        decimal amount) =>
        Task.WhenAll(
            GrainFactory.GetGrain<IAccountGrain>(fromId).Withdraw(amount),
            GrainFactory.GetGrain<IAccountGrain>(toId).Deposit(amount));
}

İstemci uygulama kodunuz aşağıdaki gibi işlemsel bir şekilde çağrı AtmGrain.Transfer yapabilir:

IAtmGrain atmOne = client.GetGrain<IAtmGrain>(0);

Guid from = Guid.NewGuid();
Guid to = Guid.NewGuid();

await atmOne.Transfer(from, to, 100);

uint fromBalance = await client.GetGrain<IAccountGrain>(from).GetBalance();
uint toBalance = await client.GetGrain<IAccountGrain>(to).GetBalance();

Önceki çağrılarda, bir IAtmGrain hesaptan diğerine 100 birim para birimi aktarmak için kullanılır. Aktarım tamamlandıktan sonra her iki hesap da geçerli bakiyelerini almak için sorgulanır. Para birimi aktarımı ve her iki hesap sorgusu da ACID işlemleri olarak gerçekleştirilir.

Önceki örnekte gösterildiği gibi, işlemler diğer ayrıntılı çağrılar gibi içindeki Taskdeğerleri döndürebilir. Ancak çağrı hatası durumunda uygulama özel durumları oluşturmaz, bunun yerine bir OrleansTransactionException veya TimeoutExceptionoluşturur. Uygulama işlem sırasında bir özel durum oluşturursa ve bu özel durum işlemin başarısız olmasına neden olursa (diğer sistem hataları nedeniyle başarısız olmasının aksine), uygulama özel durumu iç özel durumu OrleansTransactionExceptionolacaktır.

türünde OrleansTransactionAbortedExceptionbir işlem özel durumu oluşturulursa işlem başarısız oldu ve yeniden denenebilir. Oluşan diğer özel durumlar, işlemin bilinmeyen bir durumla sonlandırıldığını gösterir. İşlemler dağıtılmış işlemler olduğundan, bilinmeyen bir durumdaki bir işlem başarılı olmuş, başarısız olmuş veya hala devam ediyor olabilir. Bu nedenle, durumu doğrulamadan veya işlemi yeniden denemeden önce art arda durdurmaları önlemek için bir çağrı zaman aşımı süresinin (SiloMessagingOptions.SystemResponseTimeout) geçmesine izin vermek önerilir.