Apa itu Durable Functions?

Durable Functions adalah ekstensi dari Azure Functions yang memungkinkan Anda menulis fungsi stateful di lingkungan komputasi tanpa server. Ekstensi ini memungkinkan Anda menentukan alur kerja stateful dengan menulis fungsi orkestrator dan entitas stateful dengan menulis fungsi entitas menggunakan model pemrograman Azure Functions. Di belakang layar, ekstensi ini mengelola status, titik pemeriksaan, dan menghidupkan ulang untuk Anda, memungkinkan Anda untuk fokus pada logika bisnis Anda.

Bahasa yang didukung

Durable Functions dirancang untuk bekerja dengan semua bahasa pemrograman Azure Functions tetapi mungkin memiliki persyaratan minimum yang berbeda untuk setiap bahasa. Tabel berikut menunjukkan konfigurasi aplikasi minimum yang didukung:

Tumpukan bahasa komputer Versi Runtime Bahasa Umum Azure Functions Versi pekerja bahasa komputer Versi bundel minimum
.NET / C# / F# Functions 1.0+ Sedang dalam proses
Di luar proses
n/a
JavaScript/TypeScript (V3 prog. model) Functions 2.0+ Node 8+ Paket 2.x
JavaScript/TypeScript (V4 prog. model) Functions 4.25+ Node 18+ 3.15+ bundel
Python Functions 2.0+ Python=3.7 Paket 2.x
Python (V2 prog. model) Functions 4.0+ Python=3.7 3.15+ bundel
PowerShell Functions 3.0+ PowerShell 7+ Paket 2.x
Java Functions 4.0+ Java 8+ Paket 4.x

Penting

Artikel ini menggunakan tab untuk mendukung beberapa versi model pemrograman Node.js. Model v4 umumnya tersedia dan dirancang untuk memiliki pengalaman yang lebih fleksibel dan intuitif untuk pengembang JavaScript dan TypeScript. Untuk detail selengkapnya tentang cara kerja model v4, lihat panduan pengembang Azure Functions Node.js. Untuk mempelajari selengkapnya tentang perbedaan antara v3 dan v4, lihat panduan migrasi.

Penting

Artikel ini menggunakan tab untuk mendukung beberapa versi model pemrograman Python. Model v2 umumnya tersedia dan dirancang untuk menyediakan cara yang lebih sentris kode untuk menulis fungsi melalui dekorator. Untuk detail selengkapnya tentang cara kerja model v2, lihat panduan pengembang Azure Functions Python.

Seperti Azure Functions, ada templat untuk membantu Anda mengembangkan Durable Functions menggunakan Visual Studio, Visual Studio Code, dan portal Azure.

Pola aplikasi

Kasus penggunaan utama Durable Functions adalah menyederhanakan persyaratan koordinasi yang kompleks dan stateful dalam aplikasi tanpa server. Bagian berikut menjelaskan pola aplikasi umum yang dapat mengambil keuntungan dari Durable Functions:

Pola #1: Penautan fungsi

Dalam pola penautan fungsi, serangkai fungsi dijalankan dalam urutan tertentu. Dalam pola ini, output dari satu fungsi diterapkan ke input fungsi lain. Penggunaan antrean antara setiap fungsi memastikan bahwa sistem tetap tahan lama dan dapat diskalakan, meskipun ada aliran kontrol dari satu fungsi ke fungsi berikutnya.

A diagram of the function chaining pattern

Anda dapat menggunakan Durable Functions untuk menerapkan pola penautan fungsi secara ringkas seperti yang ditunjukkan dalam contoh berikut.

Dalam contoh ini, nilai F1, F2, F3, dan F4 adalah nama fungsi lain di dalam aplikasi fungsi yang sama. Anda dapat menerapkan alur kontrol dengan menggunakan konstruksi pengodean imperatif normal. Kode dijalankan dari atas ke bawah. Kode ini dapat menyertakan semantik alur kontrol bahasa komputer yang ada, seperti kondisional dan perulangan. Anda dapat menyertakan logika penanganan kesalahan dalam blok try/catch/finally.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

Anda dapat menggunakan context parameter untuk menjalankan fungsi lain berdasarkan nama, parameter lulus, dan output fungsi hasil. Setiap kali kode memanggil await, kerangka kerja Durable Functions memeriksa kemajuan instans fungsi saat ini. Jika proses atau komputer virtual mendaur ulang di tengah-tengah eksekusi, instans fungsi akan melanjutkan dari panggilan await sebelumnya. Untuk informasi selengkapnya, lihat bagian berikutnya, Pola #2: Fan out/fan in.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

Anda dapat menggunakan context.df objek untuk menjalankan fungsi lain berdasarkan nama, parameter lulus, dan output fungsi hasil. Setiap kali kode memanggil yield, kerangka kerja Durable Functions memeriksa kemajuan instans fungsi saat ini. Jika proses atau komputer virtual mendaur ulang di tengah-tengah eksekusi, instans fungsi akan melanjutkan dari panggilan yield sebelumnya. Untuk informasi selengkapnya, lihat bagian berikutnya, Pola #2: Fan out/fan in.

Catatan

Objek context dalam JavaScript mewakili seluruh konteks fungsi. Akses konteks Durable Functions menggunakan properti df pada konteks utama.

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result


main = df.Orchestrator.create(orchestrator_function)

Anda dapat menggunakan context objek untuk menjalankan fungsi lain berdasarkan nama, parameter lulus, dan output fungsi hasil. Setiap kali kode memanggil yield, kerangka kerja Durable Functions memeriksa kemajuan instans fungsi saat ini. Jika proses atau komputer virtual mendaur ulang di tengah-tengah eksekusi, instans fungsi akan melanjutkan dari panggilan yield sebelumnya. Untuk informasi selengkapnya, lihat bagian berikutnya, Pola #2: Fan out/fan in.

Catatan

Objek context di Python mewakili konteks orkestrasi. Akses konteks utama Azure Functions menggunakan properti function_context pada konteks orkestrasi.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

Anda dapat menggunakan Invoke-DurableActivity perintah untuk menjalankan fungsi lain berdasarkan nama, parameter lulus, dan output fungsi hasil. Setiap kali kode memanggil Invoke-DurableActivity tanpa sakelar NoWait, kerangka kerja Durable Functions memeriksa kemajuan instans fungsi saat ini. Jika proses atau komputer virtual mendaur ulang di tengah-tengah eksekusi, instans fungsi akan melanjutkan dari panggilan Invoke-DurableActivity sebelumnya. Untuk informasi selengkapnya, lihat bagian berikutnya, Pola #2: Fan out/fan in.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

Anda dapat menggunakan ctx objek untuk menjalankan fungsi lain berdasarkan nama, parameter lulus, dan output fungsi hasil. Output metode ini adalah Task<V> objek di mana V adalah jenis data yang dikembalikan oleh fungsi yang dipanggil. Setiap kali Anda memanggil Task<V>.await(), kerangka kerja Durable Functions memeriksa kemajuan instans fungsi saat ini. Jika proses tiba-tiba didaur ulang di tengah eksekusi, instans fungsi akan melanjutkan dari panggilan Task<V>.await() sebelumnya. Untuk informasi selengkapnya, lihat bagian berikutnya, Pola #2: Fan out/fan in.

Pola #2: Fan out/fan in

Dalam pola fan out/fan in, Anda menjalankan beberapa fungsi secara paralel, kemudian tunggu semua fungsi selesai. Seringkali, beberapa pekerjaan agregasi dilakukan pada hasil yang dikembalikan dari fungsi.

A diagram of the fan out/fan pattern

Dengan fungsi normal, Anda dapat melakukan fan out dengan meminta fungsi mengirim sejumlah pesan ke antrean. Melakukan fan in jauh lebih rumit. Untuk melakukan fan in dalam fungsi normal, Anda tulis kode untuk melacak kapan fungsi yang dipicu antrean berakhir, lalu simpan output fungsi.

Ekstensi Durable Functions menangani pola ini dengan kode yang relatif sederhana:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

Pekerjaan fan-out didistribusikan ke beberapa instans F2 fungsi. Pekerjaan ini dilacak menggunakan daftar tugas dinamis. Task.WhenAll dipanggil untuk menunggu semua fungsi yang dipanggil selesai. Kemudian, F2 output fungsi diagregat dari daftar tugas dinamis dan diteruskan ke fungsi F3.

Pemeriksaan otomatis yang terjadi pada await panggilan Task.WhenAll memastikan bahwa potensi crash atau boot ulang di tengah jalan tidak akan menghidupkan ulang tugas yang sudah selesai.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

Pekerjaan fan-out didistribusikan ke beberapa instans F2 fungsi. Pekerjaan ini dilacak menggunakan daftar tugas dinamis. context.df.Task.all API dipanggil untuk menunggu semua fungsi yang dipanggil selesai. Kemudian, F2 output fungsi diagregat dari daftar tugas dinamis dan diteruskan ke fungsi F3.

Pemeriksaan otomatis yang terjadi pada yield panggilan context.df.Task.all memastikan bahwa potensi crash atau boot ulang di tengah jalan tidak akan menghidupkan ulang tugas yang sudah selesai.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

Pekerjaan fan-out didistribusikan ke beberapa instans F2 fungsi. Pekerjaan ini dilacak menggunakan daftar tugas dinamis. context.task_all API dipanggil untuk menunggu semua fungsi yang dipanggil selesai. Kemudian, F2 output fungsi diagregat dari daftar tugas dinamis dan diteruskan ke fungsi F3.

Pemeriksaan otomatis yang terjadi pada yield panggilan context.task_all memastikan bahwa potensi crash atau boot ulang di tengah jalan tidak akan menghidupkan ulang tugas yang sudah selesai.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

Pekerjaan fan-out didistribusikan ke beberapa instans F2 fungsi. Perhatikan penggunaan pengalih NoWait pada pemanggilan fungsi F2: pengalih ini memungkinkan orkestrator untuk melanjutkan pemanggilan F2 tanpa tujuan penyelesaian aktivitas. Pekerjaan ini dilacak menggunakan daftar tugas dinamis. Perintah Wait-ActivityFunction dipanggil untuk menunggu semua fungsi yang dipanggil selesai. Kemudian, F2 output fungsi diagregat dari daftar tugas dinamis dan diteruskan ke fungsi F3.

Pemeriksaan otomatis yang terjadi pada Wait-ActivityFunction panggilan memastikan bahwa potensi crash atau boot ulang di tengah jalan tidak akan memulai ulang tugas yang sudah selesai.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

Pekerjaan fan-out didistribusikan ke beberapa instans F2 fungsi. Pekerjaan ini dilacak menggunakan daftar tugas dinamis. ctx.allOf(parallelTasks).await() dipanggil untuk menunggu semua fungsi yang dipanggil selesai. Kemudian, output fungsi F2 diagregat dari daftar tugas dinamis dan dikembalikan sebagai output fungsi orkestrator.

Titik pemeriksaan otomatis yang terjadi pada panggilan .await() di ctx.allOf(parallelTasks) memastikan bahwa daur ulang proses yang tidak diharapkan tidak akan menghidupkan ulang tugas yang sudah selesai.

Catatan

Meskipun jarang terjadi, bukan tidak mungkin bahwa crash bisa terjadi di jendela setelah fungsi aktivitas selesai, tetapi sebelum ia sempat disimpan ke dalam riwayat orkestrasi. Jika ini terjadi, fungsi aktivitas akan berjalan ulang dari awal setelah proses pulih.

Pola #3: API HTTP Asinkron

Pola API HTTP asinkron mengatasi masalah pengoordinasian status operasi jangka panjang dengan klien eksternal. Cara umum untuk menerapkan pola ini adalah dengan meminta titik akhir HTTP memicu tindakan jangka panjang tersebut. Kemudian, alihkan klien ke titik akhir status yang klien polling untuk mengetahui kapan operasi selesai.

A diagram of the HTTP API pattern

Fungsi Tahan Lama menyediakan dukungan bawaan untuk pola ini, menyederhanakan atau bahkan menghapus kode yang perlu Anda tulis untuk berinteraksi dengan eksekusi fungsi yang berjalan lama. Misalnya, sampel mulai cepat Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell, dan Java) menunjukkan perintah REST sederhana yang dapat Anda gunakan untuk memulai instans fungsi orkestrator baru. Setelah instans dimulai, ekstensi mengekspos API HTTP webhook yang mengkueri status fungsi orkestrator.

Contoh berikut menampilkan perintah REST yang memulai orkestrator dan mengkueri statusnya. Sekadar kejelasan, beberapa detail protokol dihilangkan dari contoh ini.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Karena runtime bahasa umum Durable Functions mengelola status untuk Anda, Anda tidak perlu menerapkan mekanisme penelusuran status Anda sendiri.

Ekstensi Durable Functions mengekspos API HTTP bawaan yang mengelola orkestrasi jangka panjang. Anda dapat menerapkan pola ini sendiri dengan menggunakan pemicu fungsi Anda sendiri (seperti HTTP, antrean, atau Azure Event Hubs) dan pengikatan klien yang tahan lama. Misalnya, Anda dapat menggunakan pesan antrean untuk memicu penghentian. Atau, Anda mungkin menggunakan pemicu HTTP yang dilindungi oleh kebijakan autentikasi Microsoft Entra alih-alih API HTTP bawaan yang menggunakan kunci yang dihasilkan untuk autentikasi.

Untuk informasi selengkapnya, lihat artikel Fitur HTTP, yang menjelaskan cara mengekspos proses asinkron jangka panjang melalui HTTP menggunakan ekstensi Durable Functions.

Pola #4: Monitor

Pola monitor mengacu pada proses berulang yang fleksibel dalam alur kerja. Contohnya adalah melakukan polling sampai kondisi tertentu terpenuhi. Anda dapat menggunakan pemicu timer reguler untuk menangani skenario dasar, seperti pekerjaan pembersihan berkala, tetapi intervalnya statik dan mengelola masa pakai instans menjadi kompleks. Anda dapat menggunakan Durable Functions untuk membuat interval pengulangan yang fleksibel, mengelola masa pakai tugas, dan membuat banyak proses monitor dari satu orkestrasi.

Contoh pola monitor adalah membalikkan skenario API HTTP asinkron sebelumnya. Alih-alih mengekspos titik akhir bagi klien eksternal untuk memantau operasi jangka panjang, monitor jangka panjang menggunakan titik akhir eksternal, lalu menunggu perubahan status.

A diagram of the monitor pattern

Dalam beberapa baris kode, Anda dapat menggunakan Durable Functions untuk membuat sejumlah monitor yang mengamati titik akhir arbitrer. Monitor dapat mengakhiri eksekusi ketika kondisi terpenuhi, atau fungsi lain dapat menggunakan klien orkestrasi tahan lama untuk mengakhiri monitor. Anda dapat mengubah waitinterval monitor berdasarkan kondisi tertentu (misalnya, backoff eksponensial.)

Kode berikut akan menerapkan monitor dasar:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

Saat permintaan diterima, instans orkestrasi baru dibuat untuk ID pekerjaan tersebut. Instans melakukan polling status hingga kondisi terpenuhi atau hingga batas waktu berakhir. Timer tahan lama mengontrol interval polling. Kemudian, lebih banyak pekerjaan dapat dilakukan, atau orkestrasi dapat berakhir.

Pola #5: Interaksi manusia

Banyak proses otomatis yang melibatkan semacam interaksi manusia. Melibatkan manusia dalam proses otomatis itu rumit karena orang tidak selalu ada dan responsif seperti layanan cloud. Proses otomatis barangkali memungkinkan interaksi ini dengan menggunakan batas waktu dan logika kompensasi.

Proses persetujuan adalah contoh proses bisnis yang melibatkan interaksi manusia. Persetujuan dari manajer mungkin diperlukan untuk laporan pengeluaran yang melebihi jumlah dolar tertentu. Jika manajer tidak menyetujui laporan pengeluaran dalam waktu 72 jam (mungkin manajer pergi berlibur), proses eskalasi dimulai untuk mendapatkan persetujuan dari orang lain (mungkin manajer manajer).

A diagram of the human interaction pattern

Anda dapat menerapkan pola dalam contoh ini dengan menggunakan fungsi orkestrator. Orkestrator menggunakan timer tahan lama untuk meminta persetujuan. Orkestrator melakukan eskalasi jika batas waktu habis. Orkestrator menunggu kejadian eksternal, seperti pemberitahuan yang dibuat dari interaksi manusia.

Contoh-contoh ini membuat suatu proses persetujuan untuk menunjukkan pola interaksi manusia:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Untuk membuat timer tahan lama, hubungi context.CreateTimer. Pemberitahuan diterima oleh context.WaitForExternalEvent. Kemudian, Task.WhenAny dipanggil untuk memutuskan apakah akan melakukan eskalasi (batas waktu sudah habis) atau memproses persetujuan (persetujuan sudah diterima sebelum waktu habis).

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Untuk membuat timer tahan lama, hubungi context.df.createTimer. Pemberitahuan diterima oleh context.df.waitForExternalEvent. Kemudian, context.df.Task.any dipanggil untuk memutuskan apakah akan melakukan eskalasi (batas waktu sudah habis) atau memproses persetujuan (persetujuan sudah diterima sebelum waktu habis).

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Untuk membuat timer tahan lama, hubungi context.create_timer. Pemberitahuan diterima oleh context.wait_for_external_event. Kemudian, context.task_any dipanggil untuk memutuskan apakah akan melakukan eskalasi (batas waktu sudah habis) atau memproses persetujuan (persetujuan sudah diterima sebelum waktu habis).

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Untuk membuat timer tahan lama, hubungi Start-DurableTimer. Pemberitahuan diterima oleh Start-DurableExternalEventListener. Kemudian, Wait-DurableTask dipanggil untuk memutuskan apakah akan melakukan eskalasi (batas waktu sudah habis) atau memproses persetujuan (persetujuan sudah diterima sebelum waktu habis).

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

Pemanggilan metode ctx.waitForExternalEvent(...).await() menjeda orkestrasi hingga menerima peristiwa bernama ApprovalEvent, yang memiliki payload boolean. Jika peristiwa diterima, fungsi aktivitas dipanggil untuk memproses hasil persetujuan. Namun, jika tidak ada peristiwa yang diterima sebelum timeout (72 jam) kedaluwarsa, TaskCanceledException dimunculkan dan fungsi aktivitas Escalate dipanggil.

Catatan

Tidak ada biaya untuk waktu yang dihabiskan menunggu peristiwa eksternal saat berjalan dalam paket Konsumsi.

Klien eksternal dapat mengirimkan pemberitahuan kejadian ke fungsi orkestrator yang menunggu dengan menggunakan API HTTP bawaan:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

Kejadian juga dapat diberitakan menggunakan klien orkestrasi tahan lama dari fungsi lain di aplikasi fungsi yang sama:

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Pola #6: Agregator (entitas stateful)

Pola keenam adalah tentang pengumpulan data kejadian selama periode waktu tertentu ke dalam satu entitas yang dapat ditangani. Dalam pola ini, data yang dikumpulkan mungkin berasal dari beberapa sumber, mungkin dikirimkan dalam batch, atau mungkin tersebar dalam jangka waktu yang lama. Agregator mungkin perlu mengambil tindakan pada data peristiwa saat tiba, dan klien eksternal mungkin perlu mengkueri data agregat.

Aggregator diagram

Hal rumit ketika mencoba menerapkan pola ini dengan fungsi stateless normal adalah kontrol untuk konkurensi menjadi tantangan besar. Selain memikirkan tentang banyaknya alur yang mengubah data yang sama pada waktu bersamaan, Anda juga perlu memastikan bahwa agregator hanya berjalan di satu komputer virtual pada satu waktu.

Anda dapat menggunakan Entitas Tahan Lama untuk menerapkan pola ini sebagai satu fungsi dengan mudah.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

Entitas Tahan Lama juga dapat diubah sebagai kelas di .NET. Model ini dapat berguna jika daftar operasi sudah diperbaiki dan menjadi besar. Contoh berikut adalah penerapan setara Counter entitas menggunakan kelas dan metode .NET.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Catatan

Entitas tahan lama saat ini tidak didukung di PowerShell.

Catatan

Entitas tahan lama saat ini tidak didukung di Java.

Klien dapat melakukan pengantrean operasi untuk (atau "pemberian sinyal") fungsi entitas menggunakan pengikatan klien entitas.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Catatan

Proksi yang dibuat secara dinamis juga tersedia di .NET untuk memberi sinyal ke entitas dengan cara yang aman jenis. Dan selain memberi sinyal, klien juga dapat mengkueri status fungsi entitas menggunakan metode aman jenis pada pengikatan klien orkestrasi.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Catatan

Entitas tahan lama saat ini tidak didukung di PowerShell.

Catatan

Entitas tahan lama saat ini tidak didukung di Java.

Fungsi entitas tersedia dalam Durable Functions 2.0 ke atas untuk C#, JavaScript, dan Python.

Teknologi

Di belakang layar, ekstensi Durable Functions dibangun di atas Durable Task Framework, pustaka sumber terbuka di GitHub yang digunakan untuk menyusun alur kerja dalam kode. Seperti Azure Functions yang merupakan hasil evolusi tanpa server dari Azure WebJobs, Durable Functions adalah hasil evolusi tanpa server dari Durable Task Framework. Microsoft dan organisasi lain menggunakan Durable Task Framework secara ekstensif untuk mengotomatisasi proses misi penting. Ini sangat cocok untuk lingkungan tanpa server Azure Functions.

Batasan kode

Untuk memberikan jaminan eksekusi yang andal dan tahan lama, fungsi orkestrator memiliki seperangkat aturan pengodean yang harus diikuti. Untuk informasi selengkapnya, lihat artikel Batasan kode fungsi orkestrator.

Billing

Durable Functions memiliki penagihan yang sama dengan Azure Functions. Untuk informasi selengkapnya, lihat Harga Azure Functions. Saat menjalankan fungsi orkestrator dalam Rencana konsumsi Azure Functions, ada beberapa perilaku penagihan yang perlu diperhatikan. Untuk informasi selengkapnya tentang perilaku ini, lihat artikel Penagihan Durable Functions.

Lompat ke

Anda dapat memulai Durable Functions dalam waktu kurang dari 10 menit dengan menyelesaikan salah satu tutorial mulai cepat bahasa komputer tertentu ini:

Dalam mulai cepat ini, Anda secara lokal membuat dan menguji fungsi tahan lama "halo dunia". Kemudian Anda terbitkan kode fungsi ke Azure. Fungsi yang Anda buat ini mengatur dan menyatukan panggilan ke fungsi lain.

Publikasi

Durable Functions dikembangkan bekerja sama dengan Microsoft Research. Akibatnya, tim Durable Functions secara aktif menghasilkan makalah penelitian dan artefak; Ini termasuk:

Pelajari lebih lanjut

Video berikut menyorot manfaat Durable Functions:

Karena Durable Functions adalah ekstensi lanjutan untuk Azure Functions, ekstensi tersebut tidak cocok untuk semua aplikasi. Untuk perbandingan dengan teknologi orkestrasi Azure lainnya, lihat Membandingkan Azure Functions dan Azure Logic Apps.

Langkah berikutnya