Sdílet prostřednictvím


Použijte tokeny zrušení v Orleans zrnech

Orleans podporuje kooperativní zrušení v jemnozrnných metodách prostřednictvím standardu CancellationToken. Tato funkce umožňuje včas zastavit dlouhotrvající operace, zrušit práci, která už není potřeba, a zlepšit rychlost odezvy a využití prostředků vaší aplikace.

Přehled

Orleans podporuje předávání CancellationToken instancí do metod rozhraní grain. To funguje pro:

  • Pravidelné metody zrna, které vrací Task, Task<T>, a tak dále
  • Metody streamování, které vracejí IAsyncEnumerable<T>
  • Volání z klienta na zrno i volání mezi zrny

Zrušení je soustavné, což znamená, že vaše implementace musí sledovat token a správně reagovat, aby byla efektivní. Pokud se nepozoruje token zrušení, modul runtime automaticky nezastaví provádění metody. Toto chování je konzistentní s většinou knihoven, které podporují CancellationToken. Jinými slovy, .NET používá kooperativní zrušení. Hlavní výhodou tohoto přístupu je, že zrušení může nastat pouze v jasně identifikovatelných bodech, ne v žádném daném pokynu. Tímto můžete spustit logiku vyčištění, když je zrušení signalizováno.

Před provedením volání typu grain modul runtime zkontroluje, jestli už je zadané CancellationToken zrušeno. Pokud ano, okamžitě vyvolá OperationCanceledException aniž by vystavila požadavek. Podobně platí, že pokud se žádost o zařazení do fronty zruší před zahájením provádění agregace, zruší se bez provedení.

Zatímco se provádí volání zrna, běhové prostředí naslouchá zrušení a propaguje signál zrušení vzdálenému volajícímu. Signály zrušení se šíří pouze v době, kdy je volání aktivní. Jakmile se volání dokončí (obvykle nebo s chybou), jakékoli následné signály zrušení se na vzdáleného volajícího nepředají, i když se stále spouští. Tato sémantika se podobá tomu, jak funguje zrušení pomocí vzdálených volání s jinými rozhraními API, jako jsou HttpClient, gRPC nebo Azure API klienti.

Základní použití

Pokud chcete přidat podporu zrušení do metod zrna, postupujte takto:

1. Definujte rozhraní zrnitosti.

Přidejte CancellationToken parametr jako poslední parametr v metodě rozhraní grain. Zpřístupnit ji jako volitelnou s výchozí hodnotou pro lepší použitelnost:

public interface IProcessingGrain : IGrainWithGuidKey
{
    Task<string> ProcessDataAsync(string data, int chunks, CancellationToken cancellationToken = default);
}

Implementujte metodu zrnitosti

V implementaci úlohy grain pravidelně kontrolujte token zrušení během dlouhotrvajících operací.

public class ProcessingGrain : Grain, IProcessingGrain
{
    public async Task<string> ProcessDataAsync(string data, int chunks, CancellationToken cancellationToken = default)
    {
        // Check cancellation before starting work
        cancellationToken.ThrowIfCancellationRequested();

        var results = new List<string>();

        for (int i = 0; i < chunks; i++)
        {
            // Check cancellation before each chunk
            cancellationToken.ThrowIfCancellationRequested();

            // Process each chunk
            var chunkResult = await ProcessChunkAsync(data, i);
            results.Add(chunkResult);

            // Use cancellation token with async operations when possible
            await Task.Delay(100, cancellationToken);
        }

        return string.Join(", ", results);
    }

    private async Task<string> ProcessChunkAsync(string data, int chunkIndex)
    {
        // Simulate processing work
        await Task.Delay(50);
        return $"{data}_chunk_{chunkIndex}";
    }
}

3. Volání zrnitosti se zrušením

Vytvořte CancellationTokenSource a předejte jeho token metodě grain.

var grain = grainFactory.GetGrain<IProcessingGrain>(Guid.NewGuid());

using var cts = new CancellationTokenSource();

// Set a timeout for automatic cancellation
cts.CancelAfter(TimeSpan.FromSeconds(30));

try
{
    var result = await grain.ProcessDataAsync("sample data", 20, cts.Token);
    Console.WriteLine($"Result: {result}");
}
catch (OperationCanceledException)
{
    Console.WriteLine("Operation was canceled");
}

// Manual cancellation example
var grain2 = grainFactory.GetGrain<IProcessingGrain>(Guid.NewGuid());
using var cts2 = new CancellationTokenSource();

// Start a long-running task
var task = grain2.ProcessDataAsync("large dataset", 1000, cts2.Token);

// Cancel after 5 seconds
await Task.Delay(5000);
cts2.Cancel();

try
{
    await task;
}
catch (OperationCanceledException)
{
    Console.WriteLine("Long processing was canceled");
}

Streamování s využitím IAsyncEnumerable<T>

Zrušení je zvláště užitečné pro scénáře streamování, ve kterých možná budete chtít ukončit výčet v rané fázi. Orleans podporuje tokeny zrušení v asynchronních enumerovatelných zrnových metodách.

using System.Runtime.CompilerServices;

public interface IDataStreamGrain : IGrainWithGuidKey
{
    IAsyncEnumerable<DataPoint> StreamDataAsync(int count, CancellationToken cancellationToken = default);
}

public class DataStreamGrain : Grain, IDataStreamGrain
{
    public async IAsyncEnumerable<DataPoint> StreamDataAsync(
        int count,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        for (int i = 0; i < count; i++)
        {
            // Check cancellation before each yield
            cancellationToken.ThrowIfCancellationRequested();
            // Generate or fetch data
            var dataPoint = await GenerateDataPointAsync(i, cancellationToken);
            yield return dataPoint;

            // Optional: add delay with cancellation support
            await Task.Delay(100, cancellationToken);
        }
    }

    private async Task<DataPoint> GenerateDataPointAsync(int index, CancellationToken cancellationToken)
    {
        // Simulate data generation
        await Task.Delay(10, cancellationToken);
        return new DataPoint { Index = index, Value = Random.Shared.NextDouble() };
    }
}

public record DataPoint(int Index, double Value);

Spotřebovávání datového proudu

Při využívání asynchronních výčtů máte dva přístupy pro předávání tokenů zrušení: přímé volání metody se zrušením a použití rozšiřující metody WithCancellation.

Přístup 1: Volání přímé metody se zrušením

Pokud asynchronní enumerovatelná metoda má [EnumeratorCancellation] parametr, předejte token přímo:

var grain = grainFactory.GetGrain<IDataStreamGrain>(Guid.NewGuid());

using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(10)); // Auto-cancel after 10 seconds

try
{
    // The token is passed directly to the method and will be combined
    // with any token passed to GetAsyncEnumerator() internally
    await foreach (var dataPoint in grain.StreamDataAsync(1000, cts.Token))
    {
        Console.WriteLine($"Received: {dataPoint}");

        // Process the data point
        // Cancellation will stop the enumeration automatically
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine("Streaming was canceled");
}

Přístup 2: Použití WithCancellation metody rozšíření

V situacích, kdy máte existující instanci IAsyncEnumerable<T> nebo potřebujete přepsat token pro zrušení:

var grain = grainFactory.GetGrain<IDataStreamGrain>(Guid.NewGuid());
var asyncEnumerable = grain.StreamDataAsync(1000);

using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(10));

try
{
    // WithCancellation passes the token to GetAsyncEnumerator()
    await foreach (var dataPoint in asyncEnumerable.WithCancellation(cts.Token))
    {
        Console.WriteLine($"Received: {dataPoint}");
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine("Streaming was canceled");
}

Zpětná kompatibilita

Jednou z klíčových výhod podpory zrušení pomocí tokenu Orleans je zpětná kompatibilita. Rozhraní a implementace modulů můžete upravit tak, aby zahrnovaly nebo odebraly CancellationToken parametry bez narušení stávajících klientů nebo jiných modulů, které tyto metody volají.

  • Přidání CancellationToken: Do metody existujícího rozhraní grain můžete přidat parametr CancellationToken. Pokud starší klient, zkompilovaný proti předchozímu rozhraní, volá tuto metodu bez poskytnutí tokenu, metoda grain obdrží CancellationToken.None z Orleans runtime. Pro kompatibilitu zdroje jazyka C# se stávajícími implementacemi rozhraní doporučujeme definovat tento nový parametr jako volitelný v rozhraní (například CancellationToken cancellationToken = default). Nové volající můžou poskytnout konkrétní token.

  • Odebrání CancellationToken: Pokud odeberete parametr CancellationToken z metody "grain", volání, která dříve používala token, budou stále úspěšná. Prostředí runtime Orleans jednoduše ignoruje nadbytečný parametr tokenu, který volající odesílá, a metoda grain se spustí bez něj. Volající může stále sledovat zrušení svého tokenu a v případě zrušení tokenu může vypršet časový limit nebo vyvolat výjimku na straně volání, ale samotná metoda grain token neobdrží.

Tato flexibilita umožňuje přírůstkově přijmout zrušení ve vašich Orleans aplikacích bez vynucení koordinované aktualizace napříč všemi komponentami.

Konfigurace

Nakonfigurujte chování zrušení prostřednictvím SiloMessagingOptions (pro silosy) a ClientMessagingOptions (pro klienty):

// In your silo configuration
siloBuilder.Configure<SiloMessagingOptions>(options =>
{
    // Send cancellation signal when requests timeout (default: true)
    options.CancelRequestOnTimeout = true;

    // Wait for callee to acknowledge cancellation (default: false)
    // Setting this to true provides stronger cancellation guarantees but may impact performance
    options.WaitForCancellationAcknowledgement = false;
});

// In your client configuration
clientBuilder.Configure<ClientMessagingOptions>(options =>
{
    options.CancelRequestOnTimeout = true;
    options.WaitForCancellationAcknowledgement = false;
});

Vysvětlení možností konfigurace

  • CancelRequestOnTimeout: Když true, Orleans automaticky odešle signál zrušení do cílového zrna, pokud vyprší časový limit požadavku. To pomáhá upozornit dlouhotrvající operace, že volající už nečeká.

  • WaitForCancellationAcknowledgement: Když truevolání zrušení počká na to, aby cílové agregační interval potvrdil, že přijal signál zrušení. To poskytuje silnější záruky, ale může mít vliv na výkon ve scénářích s vysokou propustností.

Diagnostika a řešení potíží

Diagnostika kompilátoru

Orleans Generování kódu vyžaduje, aby každá metoda zrna měla povolený pouze jeden CancellationToken parametr. Pokud přidáte více než jednu, zobrazí se chyba sestavení s ID ORLEANS0109 diagnostiky:

Typ YourGrainInterface obsahuje metodu YourMethod , která má více parametrů CancellationToken. Podporuje se pouze jeden parametr CancellationToken.

nesprávná odpověď:

public interface IMyGrain : IGrainWithGuidKey
{
    // This will cause ORLEANS0109 error
    Task ProcessAsync(CancellationToken token1, CancellationToken token2);
}

správně:

public interface IMyGrain : IGrainWithGuidKey
{
    Task ProcessAsync(CancellationToken cancellationToken = default);
}

Monitorování aktivity zrušení

Orleans poskytuje metriky, které vám pomůžou monitorovat chování zrušení ve vaší aplikaci:

  • orleans-app-requests-canceled: Sleduje počet zrušených požadavků.
  • Monitorujte tuto metriku, abyste porozuměli vzorům zrušení a identifikovali potenciální problémy.
// Example: Custom logging for cancellation tracking
public async Task MonitoredOperationAsync(CancellationToken cancellationToken = default)
{
    var stopwatch = Stopwatch.StartNew();

    try
    {
        await DoWorkAsync(cancellationToken);
        logger.LogInformation("Operation completed in {ElapsedMs}ms", stopwatch.ElapsedMilliseconds);
    }
    catch (OperationCanceledException)
    {
        logger.LogInformation("Operation canceled after {ElapsedMs}ms", stopwatch.ElapsedMilliseconds);
        throw;
    }
}

Důležité informace o výkonu

Hromadné rušení

Orleans optimalizuje výkon dávkováním požadavků na zrušení, pokud dojde k mnoha zrušením současně. Tím se snižuje zatížení sítě a procesoru během scénářů s vysokou mírou rušení, například při vypnutí služby nebo při vypršení časového limitu mnoha operací najednou.

Kontext spuštění zpětného volání

Zpětná volání zaregistrovaná na CancellationToken v rámci zrna se spouští v plánovači zrna. To vám umožní bezpečně provádět operace v kontextu agregace z zpětných volání zrušení.

public async Task ExampleWithCancellationCallbackAsync(CancellationToken cancellationToken = default)
{
    // Register a callback that will run on the grain's scheduler
    cancellationToken.Register(() =>
    {
        // This runs on the grain's execution context
        // Safe to access grain state here
        logger.LogInformation("Operation was canceled for grain {GrainId}", this.GetPrimaryKey());
    });

    // Continue with work...
    await DoWorkAsync(cancellationToken);
}

Poznámky a omezení chování

Zrušení spolupráce

Zrušení ve Orleans je kooperativní, což znamená:

  • Metoda grain musí aktivně kontrolovat CancellationToken a reagovat na požadavky na zrušení.
  • Pouhé předání zrušeného tokenu automaticky nezastaví spuštěnou operaci.
  • Pokud chcete dosáhnout nejlepších výsledků, zkontrolujte zrušení v pravidelných intervalech během dlouhotrvající práce.

Integrace časového limitu

Pokud CancelRequestOnTimeout je tato možnost povolená (výchozí), Orleans automaticky odesílá signály zrušení pro žádosti o vypršení časového limitu:

// If this call times out, Orleans sends a cancellation signal to the grain
await grain.LongRunningOperationAsync(cancellationToken);

Zrušení mezioddělové komunikace

Zrušení funguje napříč hranicemi izolovaných jednotek v distribuovaných Orleans clusterech.

  • Volání klienta ke zrnu je možné zrušit bez ohledu na to, které silo hostí zrno.
  • Volání mezi zrny podporují zrušení i v případě, že se zrna nacházejí na různých silech.
  • Síťové oddíly můžou zpozdit doručení signálu pro zrušení.

Vzory zpracování chyb

Řádně zpracujte zrušení se správnými vzory zpracování chyb:

public async Task<ProcessingResult> ProcessWithFallbackAsync(
    string data,
    CancellationToken cancellationToken = default)
{
    try
    {
        return await ProcessPrimaryAsync(data, cancellationToken);
    }
    catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
    {
        // Expected cancellation from our token - clean up and re-throw
        await CleanupAsync();
        throw;
    }
    catch (OperationCanceledException)
    {
        // Cancellation from a different token (unexpected) - treat as error
        logger.LogWarning("Received unexpected cancellation");
        throw;
    }
    catch (Exception ex)
    {
        // Unexpected error - try fallback if not canceled
        if (cancellationToken.IsCancellationRequested)
        {
            throw new OperationCanceledException(cancellationToken);
        }

        logger.LogWarning(ex, "Primary processing failed, trying fallback");
        return await ProcessFallbackAsync(data, cancellationToken);
    }
}

Při zpracování OperationCanceledException vždy zkontrolujte pomocí klauzule when, jestli zrušení pochází z očekávaného tokenu. Tento model rozlišuje očekávané zrušení (od tokenu vaší operace) a neočekávané zrušení (z jiných zdrojů).

Chování IAsyncEnumerable<T>

Pro scénáře streamování s IAsyncEnumerable<T>:

  • Zrušení předběžného výčtu: Pokud je token zrušen před zahájením výčtu, nevyvolají se žádné položky a je okamžitě vyvolána OperationCanceledException.
  • Zrušení výčtu v jeho průběhu: Pokud je výčet zrušen během svého průběhu, datový proud se zastaví na dalším kontrolním bodu pro zrušení, což je obvykle při další yield nebo await operaci.
  • Šíření výjimek: Když dojde ke zrušení, výjimka se vyvolá příjemci.
  • Vyčištění prostředků: Metoda asynchronního enumerátoru DisposeAsync() je volána automaticky komponentou await foreach pro vyčištění prostředků.
  • Částečné výsledky: Položky poskytnuté před zrušením se zachovají a doručí příjemci.

Aspekty časování zrušení

Účinnost zrušení závisí na tom, jak často asynchronní iterátor kontroluje token zrušení:

public async IAsyncEnumerable<DataPoint> StreamDataAsync(
    int count,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    for (int i = 0; i < count; i++)
    {
        // Good: Check before each item
        cancellationToken.ThrowIfCancellationRequested();

        var data = await ProcessItemAsync(i);

        // Good: Check after long-running operations
        cancellationToken.ThrowIfCancellationRequested();

        yield return data;

        // Good: Use cancellation-aware operations
        await Task.Delay(100, cancellationToken);
    }
}

Starší verze: GrainCancellationToken a GrainCancellationTokenSource

Před přímou podporou System.Threading.CancellationToken v metodách pro zpracování obilí poskytuje Orleans zvláštní mechanismus za použití GrainCancellationToken a GrainCancellationTokenSource. Tento přístup je nyní považován za starší verzi a System.Threading.CancellationToken je doporučenou metodou implementace zrušení v Orleans zrnech.

Následující informace jsou k dispozici pro kontext starších systémů nebo pro konkrétní potřeby zpětné kompatibility. Informace o novém vývoji najdete v CancellationToken části Využití popsané výše v tomto článku.

Přehled GrainCancellationToken

GrainCancellationToken je obálka kolem standardního CancellationToken. Umožňuje kooperativní zrušení mezi vlákny, pracovními položkami fondu vláken nebo Task objekty a lze jej předat jako argument metody zrnitosti.

A GrainCancellationTokenSource poskytuje GrainCancellationToken prostřednictvím své vlastnosti Token a odešle zprávu o zrušení, když je zavolána její metoda GrainCancellationTokenSource.Cancel.

Použití GrainCancellationToken

K použití starší verze GrainCancellationToken:

  1. Vytvoření instance objektu GrainCancellationTokenSource Tento objekt spravuje a odesílá oznámení o zrušení jednotlivých tokenů zrušení.

    var tcs = new GrainCancellationTokenSource();
    
  2. Předejte token vrácený vlastností GrainCancellationTokenSource.Token do každé metody grain, která by měla reagovat na zrušení.

    var waitTask = grain.LongIoWork(tcs.Token, TimeSpan.FromSeconds(10));
    
  3. Operace zrušitelného zrna musí zpracovávat podkladovou CancellationToken vlastnost GrainCancellationToken stejně jako každý jiný kód v .NET.

    public async Task LongIoWork(GrainCancellationToken tc, TimeSpan delay)
    {
        // Periodically check if cancellation has been requested
        while (!tc.CancellationToken.IsCancellationRequested)
        {
            // Perform a portion of the work
            await IoOperation(tc.CancellationToken);
    
            // Example: tc.CancellationToken.ThrowIfCancellationRequested();
        }
        // Perform cleanup if necessary and then exit or throw OperationCanceledException
    }
    
  4. Zavolejte metodu GrainCancellationTokenSource.Cancel() , která zahájí zrušení. To signalizuje všechny GrainCancellationToken instance vytvořené z tohoto zdroje.

    // Request cancellation
    await tcs.Cancel();
    
  5. Zavolejte metodu GrainCancellationTokenSource.Dispose(), když dokončíte práci s objektem GrainCancellationTokenSource, abyste uvolnili jeho prostředky.

    tcs.Dispose();
    

Důležité aspekty pro GrainCancellationToken

  • Metoda GrainCancellationTokenSource.Cancel() vrátí Taskhodnotu . Pokud chcete zajistit, aby zrušení bylo robustní vůči přechodným selháním komunikace, budete možná muset opakovat volání zrušení.
  • Zpětná volání zaregistrovaná v podkladovém System.Threading.CancellationToken (přístupném prostřednictvím GrainCancellationToken.CancellationToken) podléhají zárukám provedení v jediném vláknu v rámci aktivace domény, ve které byly zaregistrovány. To znamená, že budou spuštěny na plánovači úloh úložiště grainu.
  • Každý GrainCancellationToken může být, pokud je to potřeba, předán několika voláním metody.