Aracılığıyla paylaş


Dayanıklı İşlevler nedir?

Dayanıklı İşlevler, sunucusuz bir işlem ortamında durum bilgisi olan işlevler yazmanızı sağlayan bir Azure İşlevleri özelliğidir. Uzantı, Azure İşlevleri programlama modelini kullanarak orchestrator işlevleri yazarak durum bilgisi olan iş akışları ve varlık işlevleri yazarak durum bilgisi olan varlıklar tanımlamanızı sağlar. Arka planda uzantı sizin için durumu, denetim noktalarını ve yeniden başlatmaları yöneterek iş mantığınıza odaklanmanızı sağlar.

Desteklenen diller

Dayanıklı İşlevler tüm Azure İşlevleri programlama dilleri ile çalışacak şekilde tasarlanmıştır, ancak her dil için farklı minimum gereksinimleri olabilir. Aşağıdaki tabloda desteklenen en düşük uygulama yapılandırmaları gösterilmektedir:

Dil yığını Azure İşlevleri çalışma zamanı sürümleri Dil çalışanı sürümü En düşük paket sürümü
.NET / C# / F# İşlevler 1.0+ İşlemde
İşlem dışı
yok
JavaScript/TypeScript (v3 prog. model) İşlevler 2.0+ Düğüm 8+ 2.x paketleri
JavaScript/TypeScript (v4 programlama modeli) İşlevler 4.25+ Node.js 18+ 3,15 üzeri paketler
Python İşlevler 2.0+ Python 3.7+ 2.x paketleri
Python (v2 prog. modeli) İşlevler 4.0+ Python 3.7+ 3,15'den fazla paket
PowerShell İşlevler 3.0+ PowerShell 7+ 2.x paketleri
Java İşlevler 4.0+ Java 8+ 4.x paket

Önemli

Bu makalede, Node.js programlama modelinin birden çok sürümünü desteklemek için sekmeler kullanılır. Genel kullanıma sunulan v4 modeli, JavaScript ve TypeScript geliştiricileri için daha esnek ve sezgisel bir deneyime sahip olacak şekilde tasarlanmıştır. v4 modelinin nasıl çalıştığı hakkında daha fazla bilgi için Azure İşlevleri Node.js geliştirici kılavuzuna bakın. v3 ile v4 arasındaki farklar hakkında daha fazla bilgi edinmek için geçiş kılavuzuna bakın.

Önemli

Bu makalede, Python programlama modelinin birden çok sürümünü desteklemek için sekmeler kullanılır. v2 modeli genel olarak kullanılabilir ve dekoratörler aracılığıyla işlev yazmak için daha kod odaklı bir yol sağlamak üzere tasarlanmıştır. v2 modeli hakkında daha fazla bilgi için bkz. Azure İşlevleri Python geliştirici kılavuzu.

Azure İşlevleri gibi, Visual Studio, Visual Studio Code ve Azure portalını kullanarak Dayanıklı İşlevler geliştirmenize yardımcı olacak şablonlar vardır.

Uygulama desenleri

Dayanıklı İşlevler için birincil kullanım örneği sunucusuz uygulamalarda karmaşık, durum bilgisi olan koordinasyon gereksinimlerini basitleştirmektir. Aşağıdaki bölümlerde, Dayanıklı İşlevler'den yararlanabilecek tipik uygulama desenleri açıklanmaktadır.

Desen #1: İşlev zincirleme

İşlev zincirleme düzeninde, bir işlev dizisi belirli bir sırada yürütülür. Bu düzende, bir işlevin çıkışı başka bir işlevin girişine uygulanır. Her işlev arasında kuyrukların kullanılması, bir işlevden diğerine denetim akışı olsa bile sistemin dayanıklı ve ölçeklenebilir kalmasını sağlar.

İşlev zincirleme deseninin diyagramı.

aşağıdaki örnekte gösterildiği gibi işlev zincirleme desenini kısa bir şekilde uygulamak için Dayanıklı İşlevler kullanabilirsiniz.

Bu örnekte , , F1F2ve F3 değerleri F4aynı işlev uygulamasındaki diğer işlevlerin adlarıdır. Normal kesinlik temelli kodlama yapılarını kullanarak denetim akışı uygulayabilirsiniz. Kod yukarıdan aşağıya doğru yürütülür. Kod, koşullular ve döngüler gibi mevcut dil denetimi akışı semantiğini içerebilir. Bloklara try/catch/finally hata işleme mantığı ekleyebilirsiniz.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

Diğer işlevleri ada göre çağırmak, parametreleri geçirmek ve fonksiyon çıktısını döndürebilmek için context parametresini kullanabilirsiniz. Kod her çağırıldığında await, Dayanıklı İşlevler çerçevesi mevcut işlev örneğinin ilerlemesini kaydedir. İşlem veya sanal makine yürütme sırasında geri dönüşüme geçerse, işlev örneği önceki await çağrıdan devam eder. Daha fazla bilgi için sonraki Desen #2: Fan-out/fan-in bölümüne bakın.

Bu örnekte , , F1F2ve F3 değerleri F4aynı işlev uygulamasındaki diğer işlevlerin adlarıdır. Normal kesinlik temelli kodlama yapılarını kullanarak denetim akışı uygulayabilirsiniz. Kod yukarıdan aşağıya doğru yürütülür. Kod, koşullular ve döngüler gibi mevcut dil denetimi akışı semantiğini içerebilir. Bloklara try/catch/finally hata işleme mantığı ekleyebilirsiniz.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

Diğer işlevleri ada göre çağırmak, parametreleri geçirmek ve işlev çıkışı döndürmek için nesnesini kullanabilirsiniz context.df . Kod her çağırışındayield, Dayanıklı İşlevler çerçevesi geçerli işlev örneğinin ilerleme durumunu denetler. İşlem veya sanal makine yürütme sırasında geri dönüşüme geçerse, işlev örneği önceki yield çağrıdan devam eder. Daha fazla bilgi için Desen #2: Fan-out/fan-in başlıklı sonraki bölüme bakın.

Not

context JavaScript'teki nesnesi tüm işlev bağlamını temsil eder. Ana bağlamdaki df özelliğini kullanarak Dayanıklı İşlevler bağlamına erişin.

Bu örnekte , , F1F2ve F3 değerleri F4aynı işlev uygulamasındaki diğer işlevlerin adlarıdır. Normal kesinlik temelli kodlama yapılarını kullanarak denetim akışı uygulayabilirsiniz. Kod yukarıdan aşağıya doğru yürütülür. Kod, koşullular ve döngüler gibi mevcut dil denetimi akışı semantiğini içerebilir.

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result

main = df.Orchestrator.create(orchestrator_function)

Nesneyi context isim vererek diğer fonksiyonları çağırmak, parametreler geçmek ve fonksiyon çıktısını döndürmek için kullanabilirsiniz. Kod her çağırışındayield, Dayanıklı İşlevler çerçevesi geçerli işlev örneğinin ilerleme durumunu denetler. İşlem veya sanal makine yürütme sırasında geri dönüşüme geçerse, işlev örneği önceki yield çağrıdan devam eder. Daha fazla bilgi için sonraki bölüm olan Desen 2: Fan-out/fan-in'e bakın.

Not

context Python'daki nesnesi düzenleme bağlamını temsil eder. Orkestrasyon bağlamında function_context özelliğini kullanarak ana Azure İşlevleri bağlamına erişin.

Bu örnekte , , F1F2ve F3 değerleri F4aynı işlev uygulamasındaki diğer işlevlerin adlarıdır. Normal kesinlik temelli kodlama yapılarını kullanarak denetim akışı uygulayabilirsiniz. Kod yukarıdan aşağıya doğru yürütülür. Kod, koşullular ve döngüler gibi mevcut dil denetimi akışı semantiğini içerebilir.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

Komutunu kullanarak Invoke-DurableActivity diğer işlevleri ada göre çağırabilir, parametreleri geçirebilir ve işlev çıktısı döndürebilirsiniz. Kod Invoke-DurableActivity her çağırdığında, NoWait anahtar olmadan, Dayanıklı İşlevler çerçevesi mevcut işlev örneğinin ilerleme durumunu kontrol noktaları oluşturur. İşlem veya sanal makine yürütme sırasında geri dönüşüme geçerse, işlev örneği önceki Invoke-DurableActivity çağrıdan devam eder. Daha fazla bilgi için sonraki Desen #2: Fan-out/fan-in bölümüne bakın.

Bu örnekte , , F1F2ve F3 değerleri F4aynı işlev uygulamasındaki diğer işlevlerin adlarıdır. Normal kesinlik temelli kodlama yapılarını kullanarak denetim akışı uygulayabilirsiniz. Kod yukarıdan aşağıya doğru yürütülür. Kod, koşullular ve döngüler gibi mevcut dil denetimi akışı semantiğini içerebilir.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

Diğer fonksiyonları adlarına göre çağırmak, parametreleri geçirmek ve fonksiyon çıktısını döndürmek için ctx nesnesini kullanabilirsiniz. Bu yöntemlerin çıkışı, çağrılan işlev tarafından döndürülen veri türü olan bir Task<V> nesnedir V . her çağırdığınızdaTask<V>.await(), Dayanıklı İşlevler çerçevesi geçerli işlev örneğinin ilerleme durumunu denetler. İşlem yürütme sırasında beklenmedik şekilde yarıda kesilirse, işlev örneği bir önceki Task<V>.await() çağrıdan devam eder. Daha fazla bilgi için bir sonraki bölüm olan Desen #2: "Fan-out/fan-in" bölümüne bakın.

Desen #2: Fan çıkışı/fan-in

Fan-out/fan-in deseninde, birden çok işlevi paralel olarak yürütür ve ardından tüm işlevlerin tamamlanmasını beklersiniz. Genellikle, işlevlerden döndürülen sonuçlar üzerinde bazı toplama işleri yapılır.

Fan-out fan-in deseninin diyagramı.

Normal işlevlerle, işlevin kuyruğa birden çok ileti göndererek dağıtım yapmasını sağlayabilirsiniz. Geri dönmek çok daha zor. Normal bir işlevde, kuyrukla tetiklenen işlevlerin ne zaman sona ereceğini izlemek için kod yazar ve ardından işlev çıkışlarını depolarsınız.

Dayanıklı İşlevler uzantısı bu düzeni nispeten basit bir kodla işler:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

İşlevin F2 fan-out çalışması birden çok örneğe dağıtılır. Çalışma, dinamik bir görev listesi kullanılarak izlenir. Task.WhenAll çağrılan tüm işlevlerin bitmesini beklemek için çağrılır. Ardından işlev F2 çıkışları dinamik görev listesinden toplanır ve işleve F3 geçirilir.

await Task.WhenAll çağrısındaki bu otomatik denetim noktası oluşturma, olası bir yol ortası çökmesinin veya yeniden başlatmanın, zaten tamamlanmış bir görevi yeniden başlatma ihtiyacını ortadan kaldırır.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

Fan-out çalışması işlevin F2 birden çok örneğine dağıtılır. Çalışma, dinamik bir görev listesi kullanılarak izlenir. context.df.Task.all Çağrılan tüm işlevlerin tamamlanmasını beklemek için API çağrılır. Ardından işlev F2 çıkışları dinamik görev listesinden toplanır ve işleve F3 geçirilir.

yield context.df.Task.all çağrısında gerçekleşen otomatik denetim noktası oluşturma, olası bir ara kilitlenme veya yeniden başlatma durumunda, zaten tamamlanmış bir görevin yeniden başlatılmasını gerektirmemesini sağlar.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

Fan-out çalışması işlevin F2 birden çok örneğine dağıtılır. Çalışma, dinamik bir görev listesi kullanılarak izlenir. context.task_all Çağrılan tüm işlevlerin tamamlanmasını beklemek için API çağrılır. Ardından işlev F2 çıkışları dinamik görev listesinden toplanır ve işleve F3 geçirilir.

yield context.task_all çağrıda gerçekleşen otomatik denetim noktası, olası bir ara kilitlenmenin veya yeniden başlatmanın zaten tamamlanmış bir görevin yeniden başlatılmasını gerektirmemesini sağlar.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

Fan-out çalışması, F2 işlevinin birden çok örneğine dağıtılır. İşlev çağrısında NoWaitF2 anahtarın kullanımına dikkat edin: Bu anahtar, düzenleyicinin etkinlik tamamlanmasını beklemeden çağırmaya F2 devam etmesine olanak tanır. Çalışma, dinamik bir görev listesi kullanılarak izlenir. Komut Wait-ActivityFunction , çağrılan tüm işlevlerin tamamlanmasını beklemek için çağrılır. Ardından işlev F2 çıkışları dinamik görev listesinden toplanır ve işleve F3 geçirilir.

Wait-ActivityFunction çağrısında gerçekleşen otomatik denetim noktası oluşturma, olası bir süreç ortası kilitlenmesi veya yeniden başlatmanın zaten tamamlanmış bir görevin yeniden başlatılmasını gerektirmemesini sağlar.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

Fan-out çalışması işlevin F2 birden çok örneğine dağıtılır. Çalışma, dinamik bir görev listesi kullanılarak izlenir. ctx.allOf(parallelTasks).await() çağrılan tüm işlevlerin bitmesini beklemek için çağrılır. Ardından işlev F2 çıkışları dinamik görev listesinden toplanır ve orchestrator işlevinin çıkışı olarak döndürülür.

.await() çağrısının ctx.allOf(parallelTasks) üzerinde gerçekleştiği otomatik denetim noktası, beklenmeyen bir süreç geri dönüşümü, zaten tamamlanmış görevlerin yeniden başlatılmasını gerektirmez.

Not

Nadir durumlarda, bir etkinlik işlevi tamamlandıktan sonra ancak tamamlanması düzenleme geçmişine kaydedilmeden önce pencerede bir kilitlenme oluşabilir. Böyle bir durumda, işlem kurtarıldıktan sonra etkinlik fonksiyonu baştan yeniden çalıştırılır.

Tasarım Deseni #3: Eşzamansız HTTP API'ler

Eşzamansız HTTP API deseni, uzun süreli işlemlerin durumunu dış istemcilerle koordine etme sorununu giderir. Bu düzeni uygulamanın yaygın bir yolu, HTTP uç noktasının uzun süredir çalışan işlemi tetiklemesidir. Ardından, istemciye işlemin ne zaman tamamlandığını öğrenmesi için yoklama yapacağı bir durum uç noktasına yönlendirin.

HTTP API desenini gösteren diyagram.

Dayanıklı İşlevler, uzun süre çalışan işlev yürütmeleriyle etkileşime geçmek için yazmanız gereken kodu basitleştirerek ve hatta kaldırarak bu düzen için yerleşik destek sağlar. Örneğin, Dayanıklı İşlevler hızlı başlangıç örnekleri (C#, JavaScript, TypeScript, Python, PowerShell ve Java), yeni orchestrator işlev örneklerini başlatmak için kullanabileceğiniz basit bir REST komutu gösterir. Bir örnek başlatıldıktan sonra uzantı, orchestrator işlev durumunu sorgulayan web kancası HTTP API'lerini kullanıma sunar.

Aşağıdaki örnekte, bir düzenleyici başlatan ve durumunu sorgulayan REST komutları gösterilmektedir. Netlik sağlamak için, bazı protokol ayrıntıları örnekten atlanır.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Dayanıklı İşlevler çalışma zamanı sizin için durumu yönettiğinden, kendi durum izleme mekanizmanızı uygulamanız gerekmez.

Dayanıklı İşlevler uzantısı, uzun süre çalışan düzenlemeleri yöneten yerleşik HTTP API'lerini kullanıma sunar. Alternatif olarak, kendi işlev tetikleyicilerinizi (HTTP, kuyruk veya Azure Event Hubs gibi) ve dayanıklı istemci bağlamasını kullanarak bu düzeni kendiniz uygulayabilirsiniz. Örneğin, sonlandırmayı tetikleme amacıyla bir kuyruk iletisi kullanabilirsiniz. Alternatif olarak, kimlik doğrulaması için oluşturulan anahtarı kullanan yerleşik HTTP API'leri yerine Microsoft Entra kimlik doğrulama ilkesi tarafından korunan bir HTTP tetikleyicisi de kullanabilirsiniz.

Daha fazla bilgi için, Dayanıklı İşlevler uzantısını kullanarak HTTP üzerinden zaman uyumsuz, uzun süre çalışan işlemleri nasıl kullanıma açabileceğinizi açıklayan HTTP özellikleri makalesine bakın.

Desen #4: İzleyici

İzleyici deseni, bir iş akışındaki esnek, yinelenen bir işleme başvurur. Belirli koşullar karşılanıncaya kadar yoklama işlemi örnek olarak verilmiştir. Düzenli bir temizleme işi gibi temel senaryoları ele almak için normal bir zamanlayıcı tetikleyicisi kullanabilirsiniz, ancak aralığı statiktir ve örnek yaşam sürelerini yönetmek karmaşık hale gelir. Esnek yineleme aralıkları oluşturmak, görev yaşam sürelerini yönetmek ve tek bir düzenlemeden birden çok monitör işlemi oluşturmak için Dayanıklı İşlevler kullanabilirsiniz.

İzleyici desenine bir örnek, önceki asenkron HTTP API senaryosunu tersine çevirmektir. Dış bir istemci için uzun süre çalışan bir işlemi izleyebilmek amacıyla bir uç noktayı dışa açmak yerine, uzun süre çalışan bir izleyici bir dış uç noktayı kullanır ve ardından bir durum değişikliği bekler.

monitör desenini gösteren diyagram.

Birkaç kod satırı içinde, rastgele uç noktaları gözlemleyen birden çok izleyici oluşturmak için Dayanıklı İşlevler kullanabilirsiniz. Bir koşul karşılandığında izleyiciler yürütmeyi sonlandırabilir veya başka bir işlev izleyicileri sonlandırmak için dayanıklı düzenleme istemcisini kullanabilir. Belirli bir koşula göre (örneğin, üstel geciktirme) bir monitörün wait aralığını değiştirebilirsiniz.

Aşağıdaki kod temel bir izleyici uygular:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

Bir istek alındığında, bu iş kimliği için yeni bir orkestrasyon örneği oluşturulur. Örnek, ya bir koşul karşılanana kadar ya da zaman aşımı süresi dolana kadar durumu kontrol eder. Dayanıklı zamanlayıcı yoklama aralığını denetler. Daha sonra daha fazla iş gerçekleştirilebilir veya düzenleme sona erebilir.

Desen #5: İnsan etkileşimi

Birçok otomatik işlem bir tür insan etkileşimi içerir. İnsanlar bulut hizmetleri kadar yüksek oranda kullanılabilir ve hızlı yanıt vermediğinden insanları otomatik bir sürece dahil etmek zordur. Otomatikleştirilmiş bir işlem, zaman aşımları ve telafi mantığını kullanarak bu etkileşime izin verebilir.

Onay süreci, insan etkileşimi içeren bir iş süreci örneğidir. Belirli bir dolar tutarını aşan bir gider raporu için yöneticiden onay gerekebilir. Yönetici 72 saat içinde gider raporunu onaylamazsa (yönetici tatile çıkmış olabilir), başka birinden (belki de yöneticinin yöneticisinden) onay almak için bir yükseltme işlemi başlatılır.

İnsan etkileşimi deseninin diyagramı.

Bu örnekteki deseni bir orchestrator işlevi kullanarak uygulayabilirsiniz. Düzenleyici, onay istemek için dayanıklı bir zamanlayıcı kullanır. Zaman aşımı oluşursa düzenleyici yükseltir. Düzenleyici, bir insan etkileşimi tarafından oluşturulan bildirim gibi bir dış olay bekler.

Bu örnekler, insan etkileşimi düzenini göstermek için bir onay süreci oluşturur:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Kalıcı zamanlayıcıyı oluşturmak için context.CreateTimer çalıştırın. Bildirim context.WaitForExternalEvent tarafından alınır. Ardından, Task.WhenAny yükseltme (önce zaman aşımı gerçekleşir) veya onayı işleme (onay zaman aşımından önce alınır) karar vermek için çağrılır.

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Dayanıklı zamanlayıcıyı oluşturmak için context.df.createTimer işlevini çağırın. Bildirim context.df.waitForExternalEvent tarafından alınır. Ardından, context.df.Task.any yükseltme (önce zaman aşımı gerçekleşir) veya onayı işleme (onay zaman aşımından önce alınır) karar vermek için çağrılır.

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Dayanıklı zamanlayıcı oluşturmak için context.create_timer öğesini çağırın. Bildirim context.wait_for_external_event tarafından alınır. Ardından, context.task_any yükseltme (önce zaman aşımı gerçekleşir) veya onayı işleme (onay zaman aşımından önce alınır) karar vermek için çağrılır.

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Dayanıklı zamanlayıcıyı oluşturmak için Start-DurableTimer fonksiyonunu çağırın. Bildirim Start-DurableExternalEventListener tarafından alınır. Ardından, Wait-DurableTask yükseltme (önce zaman aşımı olur) veya onay işleme (onay zaman aşımından önce alınır) karar vermek için çağrılır.

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

ctx.waitForExternalEvent(...).await() yöntem çağrısı, boolean yüküne sahip ApprovalEvent adlı bir olay alana kadar düzenlemeyi duraklatır. Etkinlik alındığında, onay sonucunu işlemek için bir aktivite fonksiyonu çağrılır. Ancak, timeout (72 saat) süresi dolmadan herhangi bir olay alınmazsa, bir TaskCanceledException tetiklenir ve Escalate etkinlik işlevi çağrılır.

Not

Tüketim planında çalışırken dış olayları beklerken harcanan süre için ücret alınmaz.

Dış istemci, yerleşik HTTP API'lerini kullanarak olay bildirimini bekleyen bir düzenleyici işlevine teslim edebilir:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

Bir olay, aynı işlev uygulamasındaki başka bir işlevden dayanıklı orkestrasyon istemcisi kullanılarak da tetiklenebilir.

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Taslak #6: Toplayıcı (durumlu varlıklar)

Altıncı düzen, olay verilerini belirli bir süre boyunca tek ve adreslenebilir bir varlığa toplamaya yöneliktir. Bu düzende, toplanan veriler birden çok kaynaktan gelebilir, toplu olarak teslim edilebilir veya uzun sürelere dağılmış olabilir. Toplayıcının olay verileri geldikçe eylem gerçekleştirmesi ve dış istemcilerin toplanan verileri sorgulaması gerekebilir.

Bir toplayıcıyı gösteren diyagram.

Bu düzeni normal ve durum bilgisi olmayan işlevlerle uygulamaya çalışmayla ilgili karmaşık olan şey, eşzamanlılık denetiminin büyük bir zorluk haline gelmesidir. Aynı anda birden çok iş parçacığının aynı verileri değiştirmesi konusunda endişelenmeniz değil, toplayıcının aynı anda yalnızca tek bir sanal makinede çalıştığından emin olma konusunda da endişelenmeniz gerekir.

Bu düzeni tek bir işlev olarak kolayca uygulamak için Dayanıklı varlıkları kullanabilirsiniz.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

Dayanıklı varlıklar .NET'te sınıf olarak da modellenebilir. bu model, işlem listesi sabitse ve büyük olursa yararlı olabilir. Aşağıdaki örnek, .NET sınıflarını ve yöntemlerini kullanan varlığın Counter eşdeğer bir uygulamasıdır.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Not

Dayanıklı varlıklar şu anda PowerShell'de desteklenmemektedir.

Not

Dayanıklı varlıklar şu anda Java'da desteklenmemektedir.

İstemciler, varlık istemci bağlamasını kullanarak bir varlık işlevi için işlemleri (sinyal olarak da bilinir) sıralayabilir.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Not

Dinamik olarak oluşturulan proxy'ler, varlıkların türü güvenli bir şekilde sinyalle işaretlenmesi için .NET'te de kullanılabilir. Ayrıca istemciler, sinyal göndermeye ek olarak, düzenleme istemci bağlamasında tür açısından güvenli yöntemleri kullanarak varlık işlevinin durumunu da sorgulayabilir.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Varlık işlevleri C#, JavaScript ve Python için Dayanıklı İşlevler 2.0 ve üzeri sürümleriyle kullanılabilir.

Teknoloji

Arka planda Dayanıklı İşlevler uzantısı, GitHub'da kodda iş akışları oluşturmak için kullanılan bir açık kaynak kitaplığı olan Dayanıklı Görev Çerçevesi'nin üzerine kurulmuştur. Azure İşlevleri, Azure Webjobs'un sunucusuz evrimi gibi, Dayanıklı İşlevler dayanıklı görev çerçevesinin sunucusuz evrimidir. Microsoft ve diğer kuruluşlar, görev açısından kritik süreçleri otomatikleştirmek için Dayanıklı Görev Çerçevesi'ni kapsamlı bir şekilde kullanır. Sunucusuz Azure İşlevleri ortamı için doğal bir uyum sağlar.

Kod kısıtlamaları

Güvenilir ve uzun süre çalışan yürütme garantileri sağlamak için orchestrator işlevlerinin izlenmesi gereken bir dizi kodlama kuralı vardır. Daha fazla bilgi için Orchestrator işlev kodu kısıtlamaları'na bkz.

Faturalandırma

Dayanıklı İşlevler, Azure İşlevleri ile aynı şekilde faturalandırılır. Daha fazla bilgi için Azure İşlevleri fiyatlandırma sayfasına bakın. Orchestrator işlevlerini Azure İşlevleri Tüketim planında yürütürken dikkat etmeniz gereken bazı faturalama davranışları vardır. Bu davranışlar hakkında daha fazla bilgi için Dayanıklı İşlevler faturalama makalesine bakın.

Doğrudan içeri atla

Bu dile özgü hızlı başlangıç öğreticilerinden birini tamamlayarak 10 dakikadan kısa bir zaman içinde Dayanıklı İşlevler kullanmaya başlayabilirsiniz:

Bu hızlı başlangıç kılavuzlarında, yerel olarak bir Merhaba dünya kalıcı işlevi oluşturup test edin. Ardından işlev kodunu Azure’da yayımlayacaksınız. Oluşturduğunuz işlev, diğer işlevlere yapılan çağrıları düzenler ve bir araya getirir.

Yayınlar

Dayanıklı İşlevler, Microsoft Research ile işbirliği içinde geliştirilmiştir. Sonuç olarak, Dayanıklı İşlevler ekibi aktif olarak araştırma çalışmaları ve yapıtlar üretir; bunlar şunlardır:

Video tanıtımı

Aşağıdaki videoda Dayanıklı İşlevler avantajları vurgulanmaktadır:

Diğer düzenleme seçenekleri

Dayanıklı İşlevler, Azure İşlevleri için gelişmiş bir uzantıdır ve tüm uygulamalar için uygun olmayabilir. Diğer Azure düzenleme teknolojileriyle karşılaştırma için bkz. Azure İşlevleri ve Azure Logic Apps karşılaştırması.