Aracılığıyla paylaş


Orleans İşlemler

Orleans kalıcı tanecik durumuna karşı dağıtılmış ACID işlemlerini destekler. İşlemler, Microsoft.Orleans.Transactions NuGet paketi kullanılarak uygulanır. Bu makaledeki örnek uygulamanın kaynak kodu dört projeden oluşur:

  • Soyutlamalar: Grain arabirimleri ve paylaşılan sınıfları içeren bir sınıf kütüphanesi.
  • Tanecikler: Tanecik uygulamalarını içeren bir sınıf kitaplığı.
  • Sunucu: Soyutlamaları ve grains sınıf kütüphanelerini tüketen ve silo işlevi gören bir konsol uygulaması.
  • İstemci: İstemciyi temsil Orleans eden soyutlama sınıf kitaplığını kullanan bir konsol uygulaması.

Kurulum

Orleans işlemler isteğe bağlıdır. Hem silo hem de istemci işlemleri kullanacak şekilde yapılandırılmalıdır. Yapılandırılmazlarsa, bir taneli uygulamadaki işlem yöntemlerine yapılan tüm çağrılar bir OrleansTransactionsDisabledExceptionalır. Siloda işlemleri etkinleştirmek için silo ana bilgisayar oluşturucusunu çağırın SiloBuilderExtensions.UseTransactions :

var builder = Host.CreateDefaultBuilder(args)
    .UseOrleans((context, siloBuilder) =>
    {
        siloBuilder.UseTransactions();
    });

Benzer şekilde, istemcide işlemleri etkinleştirmek için istemci ana bilgisayar oluşturucusunu çağırın ClientBuilderExtensions.UseTransactions :

var builder = Host.CreateDefaultBuilder(args)
    .UseOrleansClient((context, clientBuilder) =>
    {
        clientBuilder.UseTransactions();
    });

İşlem durumu depolama

İşlemleri kullanmak için bir veri deposu yapılandırmanız gerekir. İşlemlerle Orleans çeşitli veri depolarını desteklemek için depolama soyutlamasını ITransactionalStateStorage<TState>kullanır. Bu soyutlama, genel taneli depolamanın (IGrainStorage) aksine işlemlerin gereksinimlerine özgüdür. İşlemlere özgü depolamayı kullanmak için siloyu Azure (ITransactionalStateStorage) gibi herhangi bir uygulamasını AddAzureTableTransactionalStateStoragekullanarak yapılandırın.

Örneğin, aşağıdaki konak oluşturucu yapılandırmasını göz önünde bulundurun:

using Azure.Data.Tables;

await Host.CreateDefaultBuilder(args)
    .UseOrleans((_, silo) =>
    {
        silo.UseLocalhostClustering();

        if (Environment.GetEnvironmentVariable(
                "ORLEANS_STORAGE_CONNECTION_STRING") is { } connectionString)
        {
            silo.AddAzureTableTransactionalStateStorage(
                "TransactionStore", 
                options => options.TableServiceClient = new TableServiceClient(connectionString));
        }
        else
        {
            silo.AddMemoryGrainStorageAsDefault();
        }

        silo.UseTransactions();
    })
    .RunConsoleAsync();

Geliştirme amacıyla, ihtiyacınız olan veri deposu için işleme özgü depolama kullanılamıyorsa, bunun yerine bir IGrainStorage uygulama kullanabilirsiniz. Yapılandırılmış depo olmayan herhangi bir işlem durumu için işlemler, köprü kullanarak tahıl depolamaya yük devretmeyi dener. İşlemsel duruma tahıl depolama köprüsü üzerinden erişim daha az etkin olup, gelecekte desteklenmeyebilir. Bu nedenle, bu yaklaşımı yalnızca geliştirme amacıyla kullanmanızı öneririz.

Tane arabirimleri

Bir taneciğin işlemleri desteklemesi için, işlem yöntemlerini tanecik arabiriminde TransactionAttribute işaretlemeniz ve bir işlemin parçası olarak kullanmanız gerekir. özniteliğinin, aşağıdaki TransactionOption değerlerle ayrıntılı olarak açıklandığı gibi, grain çağrısının işlem ortamında nasıl davrandığını belirtmesi gerekir:

  • TransactionOption.Create: Çağrısı işlemseldir ve mevcut bir işlem bağlamında çağrılsa bile her zaman yeni bir işlem bağlamı oluşturur (yeni bir işlem başlatır).
  • TransactionOption.Join: Çağrı işlemseldir, ancak yalnızca mevcut bir işlem bağlamında çağrılabilir.
  • TransactionOption.CreateOrJoin: Çağrı işlemseldir. bir işlem bağlamında çağrılırsa, bu bağlamı kullanır, aksi takdirde yeni bir bağlam oluşturur.
  • TransactionOption.Suppress: Çağrı işlemsel değildir, ancak bir işlem içinden çağrılabilir. İşlemle ilgili bir bağlamda çağrıldığında, bağlam çağrıya aktarılmayacaktır.
  • TransactionOption.Supported: Çağrı işlemsel değildir ancak işlemleri destekler. Bir işlem bağlamında çağrıldığında, işlem bağlamı çağrıya aktarılır.
  • TransactionOption.NotAllowed: Çağrı işlemsel değildir ve bir işlemin içinden çağrılamaz. Bir işlem bağlamında çağrılırsa, NotSupportedException hatası fırlatır.

Çağrıları TransactionOption.Create olarak işaretleyebilirsiniz, bu da çağrının her zaman kendi işlemini başlatacağı anlamına gelir. Örneğin, aşağıdaki ATM işlemindeki Transfer işlemi, her zaman referans verilen iki hesabı içeren yeni bir işlem başlatır.

namespace TransactionalExample.Abstractions;

public interface IAtmGrain : IGrainWithIntegerKey
{
    [Transaction(TransactionOption.Create)]
    Task Transfer(string fromId, string toId, decimal amountToTransfer);
}

İşlem işlemleri Withdraw ve Deposit hesap diliminde işaretlenir TransactionOption.Join. Bu, yalnızca mevcut bir işlem bağlamında çağrılabileceğini gösterir. Bu işlem sırasında IAtmGrain.Transferçağrılırsa bu durum geçerlidir. Mevcut bir işlemden (GetBalance gibi) veya tek başına çağrılabilecek şekilde CreateOrJoin çağrısı IAtmGrain.Transfer olarak işaretlenmiştir.

namespace TransactionalExample.Abstractions;

public interface IAccountGrain : IGrainWithStringKey
{
    [Transaction(TransactionOption.Join)]
    Task Withdraw(decimal amount);

    [Transaction(TransactionOption.Join)]
    Task Deposit(decimal amount);

    [Transaction(TransactionOption.CreateOrJoin)]
    Task<decimal> GetBalance();
}

Dikkat edilmesi gereken önemli hususlar

Çünkü herhangi böyle bir çağrı, çağrıdan önce düzgün bir kurulum gerektirir, OnActivateAsync'i işlemsel olarak işaretleyemezsiniz. Yalnızca taneli uygulama API'sine yöneliktir. Bu, bu yöntemlerin bir parçası olarak işlem durumunu okumaya çalışmanın çalışma zamanında bir istisna fırlatmasına neden olduğu anlamına gelir.

Hububat uygulamaları

Bir grain uygulaması, grain durumunu ITransactionalState<TState> aracılığıyla yönetmek için bir boyutu kullanmalıdır.

public interface ITransactionalState<TState>
    where TState : class, new()
{
    Task<TResult> PerformRead<TResult>(
        Func<TState, TResult> readFunction);

    Task<TResult> PerformUpdate<TResult>(
        Func<TState, TResult> updateFunction);
}

İşlem durumu modeline geçirilen zaman uyumlu işlevler aracılığıyla kalıcı duruma tüm okuma veya yazma erişimini gerçekleştirin. Bu, işlem sisteminin bu işlemleri işlem yoluyla gerçekleştirmesine veya iptal etmesine olanak tanır. Bir grain içinde işlem durumunu kullanmak için, kaydedilecek serileştirilebilir bir durum sınıfı tanımlayın ve grain'ın oluşturucusunda bir TransactionalStateAttribute kullanarak işlem durumunu bildirin. Bu öznitelik, durum adını ve isteğe bağlı olarak hangi işlem durumu depolamasının kullanılacağını bildirir. Daha fazla bilgi için bkz. Kurulum.

[AttributeUsage(AttributeTargets.Parameter)]
public class TransactionalStateAttribute : Attribute
{
    public TransactionalStateAttribute(string stateName, string storageName = null)
    {
        // ...
    }
}

Örneğin, Balance durum nesnesi aşağıdaki gibi tanımlanır:

namespace TransactionalExample.Abstractions;

[GenerateSerializer]
public record class Balance
{
    [Id(0)]
    public decimal Value { get; set; } = 1_000;
}

Önceki durum nesnesi:

  • GenerateSerializerAttribute ile süslenmiştir ki Orleans kod oluşturucu bir seri hale getirici oluştursun.
  • Value özelliği, üyeyi benzersiz olarak tanımlamak için IdAttribute ile süslenmiştir.

Durum Balance nesnesi daha sonra uygulamada aşağıdaki gibi kullanılır AccountGrain :

namespace TransactionalExample.Grains;

[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
    private readonly ITransactionalState<Balance> _balance;

    public AccountGrain(
        [TransactionalState(nameof(balance))]
        ITransactionalState<Balance> balance) =>
        _balance = balance ?? throw new ArgumentNullException(nameof(balance));

    public Task Deposit(decimal amount) =>
        _balance.PerformUpdate(
            balance => balance.Value += amount);

    public Task Withdraw(decimal amount) =>
        _balance.PerformUpdate(balance =>
        {
            if (balance.Value < amount)
            {
                throw new InvalidOperationException(
                    $"Withdrawing {amount} credits from account " +
                    $"\"{this.GetPrimaryKeyString()}\" would overdraw it." +
                    $" This account has {balance.Value} credits.");
            }

            balance.Value -= amount;
        });

    public Task<decimal> GetBalance() =>
        _balance.PerformRead(balance => balance.Value);
}

Önemli

İşlem bağlamının doğru şekilde grain çağrısına geçirilmesi için işlem grain'i ReentrantAttribute ile işaretlenmelidir.

Yukarıdaki örnekte, TransactionalStateAttribute oluşturucu parametresinin balance adlı "balance"bir işlem durumuyla ilişkilendirilmesi gerektiğini bildirir. Bu bildirimle, OrleansITransactionalState<TState> adlı işlem durumu depolama alanından yüklenen durumla birlikte bir "TransactionStore" örneğini enjekte eder. Durumu PerformUpdate aracılığıyla değiştirebilir veya PerformRead aracılığıyla okuyabilirsiniz. İşlem altyapısı, bir işlemin parçası olarak gerçekleştirilen bu tür değişikliklerin (bir kümeye Orleans dağıtılmış birden çok grain arasında bile) tümünün işleme alınmasını veya işlemi oluşturan grain çağrısının tamamlanmasıyla tümünün geri alınmasını sağlar (IAtmGrain.Transfer önceki örnekte).

İstemciden işlem yöntemlerini çağırma

İşlemsel tanecik yöntemini çağırmanın önerilen yolu, ITransactionClient kullanmaktır. Orleans istemcisini yapılandırdığınızda bağımlılık ekleme hizmet sağlayıcısına otomatik olarak kaydolur ITransactionClientOrleans . ITransactionClient kullanarak bir işlem bağlamı oluşturun ve bu bağlamda işlem tanecik yöntemlerini çağırın. Aşağıdaki örnek, işlem taneleri yöntemlerini çağırmak için ITransactionClient nasıl kullanacağınızı göstermektedir.

using IHost host = Host.CreateDefaultBuilder(args)
    .UseOrleansClient((_, client) =>
    {
        client.UseLocalhostClustering()
            .UseTransactions();
    })
    .Build();

await host.StartAsync();

var client = host.Services.GetRequiredService<IClusterClient>();
var transactionClient= host.Services.GetRequiredService<ITransactionClient>();

var accountNames = new[] { "Xaawo", "Pasqualino", "Derick", "Ida", "Stacy", "Xiao" };
var random = Random.Shared;

while (!Console.KeyAvailable)
{
    // Choose some random accounts to exchange money
    var fromIndex = random.Next(accountNames.Length);
    var toIndex = random.Next(accountNames.Length);
    while (toIndex == fromIndex)
    {
        // Avoid transferring to/from the same account, since it would be meaningless
        toIndex = (toIndex + 1) % accountNames.Length;
    }

    var fromKey = accountNames[fromIndex];
    var toKey = accountNames[toIndex];
    var fromAccount = client.GetGrain<IAccountGrain>(fromKey);
    var toAccount = client.GetGrain<IAccountGrain>(toKey);

    // Perform the transfer and query the results
    try
    {
        var transferAmount = random.Next(200);

        await transactionClient.RunTransaction(
            TransactionOption.Create, 
            async () =>
            {
                await fromAccount.Withdraw(transferAmount);
                await toAccount.Deposit(transferAmount);
            });

        var fromBalance = await fromAccount.GetBalance();
        var toBalance = await toAccount.GetBalance();

        Console.WriteLine(
            $"We transferred {transferAmount} credits from {fromKey} to " +
            $"{toKey}.\n{fromKey} balance: {fromBalance}\n{toKey} balance: {toBalance}\n");
    }
    catch (Exception exception)
    {
        Console.WriteLine(
            $"Error transferring credits from " +
            $"{fromKey} to {toKey}: {exception.Message}");

        if (exception.InnerException is { } inner)
        {
            Console.WriteLine($"\tInnerException: {inner.Message}\n");
        }

        Console.WriteLine();
    }

    // Sleep and run again
    await Task.Delay(TimeSpan.FromMilliseconds(200));
}

Önceki istemci kodunda:

  • IHostApplicationBuilder, UseOrleansClient ile yapılandırılır.
  • IClusterClient ve ITransactionClient arabirimleri hizmet sağlayıcısından alınır.
  • from ve to değişkenlerine IAccountGrain referansları atanır.
  • ITransactionClient, şu çağrıyı yaparak bir işlem oluşturmak için kullanılır:
    • Withdraw hesap tahıl referansında from .
    • Deposit hesap tahıl referansında to .

transactionDelegate içinde bir istisna fırlatılmadığı veya çelişkili bir transactionOption belirtilmediği sürece işlemler her zaman tamamlanır. İşlemel tanecik yöntemlerini çağırmanın önerilen yolu ITransactionClient kullanmak olsa da, bunları doğrudan başka bir taneden de çağırabilirsiniz.

Başka bir grain'den işlem yöntemlerini çağırma

Herhangi bir başka tanecik yönteminde olduğu gibi, bir tanecik arabiriminde işlem yöntemlerini çağırın. ITransactionClient kullanmaya alternatif olarak, aşağıdaki AtmGrain uygulaması Transfer arabiriminde işlemsel IAccountGrain yöntemini çağırır.

Başvurulan AtmGrain iki hesap dilimini çözümleyen ve ve Withdrawiçin uygun çağrıları Deposit yapan uygulamayı göz önünde bulundurun:

namespace TransactionalExample.Grains;

[StatelessWorker]
public class AtmGrain : Grain, IAtmGrain
{
    public Task Transfer(
        string fromId,
        string toId,
        decimal amount) =>
        Task.WhenAll(
            GrainFactory.GetGrain<IAccountGrain>(fromId).Withdraw(amount),
            GrainFactory.GetGrain<IAccountGrain>(toId).Deposit(amount));
}

İstemci uygulama kodunuz, AtmGrain.Transfer işlemsel olarak aşağıdaki şekilde çağrı yapabilir:

IAtmGrain atmOne = client.GetGrain<IAtmGrain>(0);

Guid from = Guid.NewGuid();
Guid to = Guid.NewGuid();

await atmOne.Transfer(from, to, 100);

uint fromBalance = await client.GetGrain<IAccountGrain>(from).GetBalance();
uint toBalance = await client.GetGrain<IAccountGrain>(to).GetBalance();

Önceki çağrılarda, bir IAtmGrain hesaptan diğerine 100 birim para birimi aktarmak için kullanılır. Aktarım tamamlandıktan sonra her iki hesap da mevcut bakiyelerini öğrenmek için sorgulanır. Para birimi aktarımı ve her iki hesap sorgusu da ACID işlemleri olarak gerçekleştirilir.

Önceki örnekte gösterildiği gibi, işlemler diğer grain çağrıları gibi Task içinde değerler döndürebilir. Ancak, çağrı hatası durumunda uygulama özel durumları fırlatmazlar, bunun yerine bir OrleansTransactionException veya TimeoutException oluştururlar. Uygulama işlem sırasında bir özel durum oluşturursa ve bu özel durum işlemin başarısız olmasına neden olursa (diğer sistem hatalarından dolayı başarısız olmasına karşı), uygulama özel durumu iç OrleansTransactionExceptionözel durumu haline gelir.

Türünde OrleansTransactionAbortedException bir işlem özel durumu oluşturulursa işlem başarısız olur ve yeniden denenebilir. Atılan diğer tüm özel durumlar, işlemin belirsiz bir durumla sonlandığını gösterir. İşlemler dağıtılmış işlemler olduğundan, bilinmeyen bir durumdaki bir işlem başarılı olmuş, başarısız olmuş veya hala devam ediyor olabilir. Bu nedenle, işlemi yeniden denemekten veya durumu kontrol etmeden önce, geçişli iptalleri önlemek için bir çağrı zaman aşımı süresinin (SiloMessagingOptions.SystemResponseTimeout) geçmesine izin vermek tavsiye edilir.