Menangani peristiwa eksternal dalam orkestrasi yang tahan lama

Peristiwa eksternal memungkinkan menjalankan orkestrasi untuk menerima sinyal dari sumber eksternal — seperti persetujuan manusia, panggilan balik webhook, atau sistem lainnya. Artikel ini memperlihatkan kepada Anda cara menunggu, mengirim, dan menangani peristiwa eksternal di orkestrasi tahan lama Anda.

Petunjuk / Saran

Ingin mengirim event ke orkestrasi? Langsung ke Kirim acara.

Fungsi orkestrator dapat menunggu dan mendengarkan peristiwa eksternal. Fitur Durable Functions ini sering berguna untuk menangani interaksi manusia atau pemicu eksternal lainnya.

Nota

Peristiwa eksternal adalah operasi asinkron satu arah. Mereka tidak cocok untuk situasi di mana klien yang mengirim peristiwa membutuhkan respons sinkron dari fungsi orkestrator.

Orkestrasi dapat menunggu dan mendengarkan peristiwa eksternal. Fitur ini sering berguna untuk menangani interaksi manusia atau pemicu eksternal lainnya.

Nota

Peristiwa eksternal adalah operasi asinkron satu arah. Mereka tidak cocok untuk situasi di mana klien yang mengirim peristiwa membutuhkan respons sinkron dari orkestrasi.

Penting

Saat ini, PowerShell Durable Task SDK tidak tersedia.

Tunggu peristiwa eksternal

API "wait-for-external-event" dari pengikatan pemicu orkestrasi memungkinkan fungsi orkestrator untuk menunggu dan mendengarkan peristiwa yang dikirimkan oleh klien eksternal secara asinkron. Fungsi orkestrator mendengarkan mendeklarasikan nama peristiwa dan bentuk data yang diharapkan untuk diterima.

API "wait-for-external-event" memungkinkan orkestrasi untuk menunggu dan mendengarkan peristiwa yang dikirimkan oleh klien eksternal secara asinkron. Orkestrasi pendengaran mendeklarasikan nama dari peristiwa tersebut dan bentuk data yang diharapkannya untuk diterima.

Model pekerja terisolasi

using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;

public class BudgetApproval
{
    private readonly ILogger _logger;

    public BudgetApproval(ILoggerFactory loggerFactory)
    {
        _logger = loggerFactory.CreateLogger<BudgetApproval>();
    }

    [Function("BudgetApproval")]
    public async Task Run(
        [OrchestrationTrigger] TaskOrchestrationContext context)
    {
        bool approved = await context.WaitForExternalEventAsync<bool>("Approval");
        if (approved)
        {
            // approval granted - do the approved action
        }
        else
        {
            // approval denied - send a notification
        }
    }
}

Model dalam proses

[FunctionName("BudgetApproval")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    bool approved = await context.WaitForExternalEvent<bool>("Approval");
    if (approved)
    {
        // approval granted - do the approved action
    }
    else
    {
        // approval denied - send a notification
    }
}

Nota

Jika Anda menggunakan Durable Functions 1.x, gunakan DurableOrchestrationContext alih-alih IDurableOrchestrationContext. Silakan lihat artikel versi Durable Functions untuk detail yang lebih spesifik tentang versinya.

public class BudgetApproval : TaskOrchestrator<object?, bool>
{
    public override async Task<bool> RunAsync(TaskOrchestrationContext context, object? input)
    {
        bool approved = await context.WaitForExternalEvent<bool>("Approval");
        if (approved)
        {
            // approval granted - do the approved action
        }
        else
        {
            // approval denied - send a notification
        }
        return approved;
    }
}

Contoh sebelumnya mendengarkan satu peristiwa tertentu dan mengambil tindakan saat peristiwa diterima.

Anda dapat mendengarkan beberapa peristiwa secara bersamaan, seperti dalam contoh berikut, yang menunggu salah satu dari tiga kemungkinan pemberitahuan peristiwa.

Model pekerja terisolasi

[Function("Select")]
public async Task Run(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    Task<float> event1 = context.WaitForExternalEventAsync<float>("Event1");
    Task<bool> event2 = context.WaitForExternalEventAsync<bool>("Event2");
    Task<int> event3 = context.WaitForExternalEventAsync<int>("Event3");

    Task winner = await Task.WhenAny(event1, event2, event3);
    if (winner == event1)
    {
        // ...
    }
    else if (winner == event2)
    {
        // ...
    }
    else if (winner == event3)
    {
        // ...
    }
}

Model dalam proses

[FunctionName("Select")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var event1 = context.WaitForExternalEvent<float>("Event1");
    var event2 = context.WaitForExternalEvent<bool>("Event2");
    var event3 = context.WaitForExternalEvent<int>("Event3");

    var winner = await Task.WhenAny(event1, event2, event3);
    if (winner == event1)
    {
        // ...
    }
    else if (winner == event2)
    {
        // ...
    }
    else if (winner == event3)
    {
        // ...
    }
}

Nota

Menggunakan Durable Functions 1.x? Ganti dengan DurableOrchestrationContext alih-alih IDurableOrchestrationContext. Baca artikel tentang versi Durable Functions untuk mempelajari perbedaan versi lainnya.

public class SelectOrchestrator : TaskOrchestrator<object?, object?>
{
    public override async Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
    {
        Task<float> event1 = context.WaitForExternalEvent<float>("Event1");
        Task<bool> event2 = context.WaitForExternalEvent<bool>("Event2");
        Task<int> event3 = context.WaitForExternalEvent<int>("Event3");

        Task winner = await Task.WhenAny(event1, event2, event3);
        if (winner == event1)
        {
            // ...
        }
        else if (winner == event2)
        {
            // ...
        }
        else if (winner == event3)
        {
            // ...
        }
        return null;
    }
}

Contoh sebelumnya mendengarkan salah satu dari beberapa peristiwa. Anda juga dapat menunggu semua peristiwa.

Model pekerja terisolasi

[Function("NewBuildingPermit")]
public async Task Run(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    string applicationId = context.GetInput<string>();

    Task gate1 = context.WaitForExternalEventAsync<object>("CityPlanningApproval");
    Task gate2 = context.WaitForExternalEventAsync<object>("FireDeptApproval");
    Task gate3 = context.WaitForExternalEventAsync<object>("BuildingDeptApproval");

    // all three departments must grant approval before a permit can be issued
    await Task.WhenAll(gate1, gate2, gate3);

    await context.CallActivityAsync("IssueBuildingPermit", applicationId);
}

Model dalam proses

[FunctionName("NewBuildingPermit")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    string applicationId = context.GetInput<string>();

    var gate1 = context.WaitForExternalEvent("CityPlanningApproval");
    var gate2 = context.WaitForExternalEvent("FireDeptApproval");
    var gate3 = context.WaitForExternalEvent("BuildingDeptApproval");

    // all three departments must grant approval before a permit can be issued
    await Task.WhenAll(gate1, gate2, gate3);

    await context.CallActivityAsync("IssueBuildingPermit", applicationId);
}

Nota

Jika Anda menjalankan Durable Functions 1.x, gunakan DurableOrchestrationContext alih-alih IDurableOrchestrationContext. Kunjungi versi Durable Functions untuk perincian lengkap perbedaan versi.

Dalam .NET, jika payload peristiwa tidak dapat dikonversi menjadi jenis yang diharapkan T, maka terjadi pengecualian.

public class NewBuildingPermit : TaskOrchestrator<string, object?>
{
    public override async Task<object?> RunAsync(TaskOrchestrationContext context, string applicationId)
    {
        Task<object?> gate1 = context.WaitForExternalEvent<object?>("CityPlanningApproval");
        Task<object?> gate2 = context.WaitForExternalEvent<object?>("FireDeptApproval");
        Task<object?> gate3 = context.WaitForExternalEvent<object?>("BuildingDeptApproval");

        // all three departments must grant approval before a permit can be issued
        await Task.WhenAll(gate1, gate2, gate3);

        await context.CallActivityAsync("IssueBuildingPermit", applicationId);
        return null;
    }
}

Dalam .NET, jika payload peristiwa tidak dapat dikonversi menjadi jenis yang diharapkan T, maka terjadi pengecualian.

API "wait-for-external-event" menunggu tanpa batas waktu untuk beberapa input. Anda dapat membongkar aplikasi fungsi dengan aman saat menunggu. Jika dan ketika peristiwa tiba untuk instans orkestrasi ini, instans terbangun secara otomatis dan segera memproses peristiwa.

Nota

Jika aplikasi fungsi Anda menggunakan Paket Konsumsi, tidak ada biaya yang dikenakan saat fungsi orkestrator menunggu tugas peristiwa eksternal, tidak peduli seberapa lama waktu tunggu tersebut.

Seperti halnya Fungsi Aktivitas, peristiwa eksternal memiliki jaminan pengiriman minimal satu kali. Ini berarti bahwa, dalam kondisi tertentu (seperti menghidupkan ulang, skaling, gangguan, dll.), aplikasi Anda mungkin menerima duplikat dari suatu peristiwa eksternal yang sama. Oleh karena itu, kami menyarankan agar peristiwa eksternal berisi semacam ID yang memungkinkannya diduplikasi secara manual di orkestrator.

API "wait-for-external-event" menunggu tanpa batas waktu untuk beberapa input. Anda dapat dengan aman menghentikan pekerja saat menunggu. Jika dan ketika suatu peristiwa terjadi untuk instans orkestrasi ini, instans tersebut diaktifkan secara otomatis dan segera memprosesnya.

Peristiwa eksternal memiliki jaminan pengiriman paling tidak sekali. Ini berarti bahwa, dalam kondisi tertentu (seperti menghidupkan ulang, skaling, gangguan, dll.), aplikasi Anda mungkin menerima duplikat dari suatu peristiwa eksternal yang sama. Oleh karena itu, kami menyarankan agar peristiwa eksternal berisi beberapa jenis ID yang memungkinkannya dihilangkan duplikasinya secara manual dalam orkestrasi.

Kirim acara

Anda dapat menggunakan API "raise-event" yang ditentukan oleh pengikatan klien orkestrasi untuk mengirim peristiwa eksternal ke orkestrasi. Anda juga dapat menggunakan API HTTP bawaan pemicu peristiwa untuk mengirim peristiwa eksternal ke orkestrasi.

Peristiwa yang dipicu mencakup instanceID, eventName, dan eventData sebagai parameter. Fungsi orkestrator menangani peristiwa ini menggunakan wait-for-external-event API. eventName harus cocok di kedua ujung proses pengiriman dan penerimaan agar event dapat diproses. Data peristiwa juga harus dapat diserialisasikan JSON.

Secara internal, mekanisme "raise-event" mengantrekan pesan yang diambil oleh fungsi orkestrator yang menunggu. Jika instans tidak menunggu nama peristiwa yang ditentukan, pesan peristiwa ditambahkan ke antrean di dalam memori. Jika instans orkestrasi kemudian mulai mendengarkan nama peristiwa tersebut instans tersebut akan memeriksa antrean untuk pesan peristiwa.

Nota

Jika tidak ada instans orkestrasi dengan ID instans yang ditentukan, pesan peristiwa akan dibuang.

Di bawah ini adalah contoh fungsi yang dipicu oleh antrean, yang mengirimkan sebuah peristiwa "Persetujuan" kepada instans fungsi orkestrator. ID instans orkestrasi berasal dari isi pesan antrean.

Anda dapat menggunakan API "raise-event" pada klien Durable Task untuk mengirim peristiwa eksternal ke orkestrasi.

Peristiwa yang dinaikkan mencakup ID instans, eventName, dan eventData sebagai parameter. Orkestrasi menangani peristiwa ini menggunakan API "wait-for-external-event" . eventName harus cocok di kedua sisi pengiriman dan penerimaan agar event dapat diproses. Data peristiwa juga harus dapat diserialisasikan JSON.

Secara internal, mekanisme "raise-event" mengantrekan pesan yang kemudian diambil oleh orkestrasi yang menunggu. Jika instans tidak menunggu nama peristiwa yang ditentukan, pesan peristiwa ditambahkan ke antrean dalam memori. Jika instans orkestrasi nanti mulai mendengarkan nama peristiwa tersebut, instans tersebut akan memeriksa antrean untuk pesan peristiwa.

Nota

Jika tidak ada instans orkestrasi dengan atribut instanceID yang ditentukan, pesan peristiwa akan dibuang.

Di bawah ini adalah contoh yang mengirim peristiwa "Persetujuan" ke instans orkestrasi.

Model pekerja terisolasi

using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask.Client;

public class ApprovalQueueProcessor
{
    [Function("ApprovalQueueProcessor")]
    public async Task Run(
        [QueueTrigger("approval-queue")] string instanceId,
        [DurableClient] DurableTaskClient client)
    {
        await client.RaiseEventAsync(instanceId, "Approval", true);
    }
}

Model dalam proses

[FunctionName("ApprovalQueueProcessor")]
public static async Task Run(
    [QueueTrigger("approval-queue")] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    await client.RaiseEventAsync(instanceId, "Approval", true);
}

Nota

Untuk Durable Functions 1.x, gunakan atribut OrchestrationClient dan jenis parameter DurableOrchestrationClient sebagai gantinya. Periksa artikel versi Durable Functions untuk semua perubahan khusus versi.

await client.RaiseEventAsync(instanceId, "Approval", true);

Secara internal, API "raise-event" mengantrekan pesan yang diambil oleh orkestrasi yang menunggu. Jika instans tidak menunggu nama peristiwa yang ditentukan, pesan peristiwa ditambahkan ke buffer dalam memori. Jika instans orkestrasi nanti mulai mendengarkan nama peristiwa tersebut, instans orkestrasi akan memeriksa buffer untuk pesan peristiwa dan memicu tugas yang menunggunya.

Nota

Jika tidak ada instans orkestrasi dengan ID instans yang ditentukan, pesan peristiwa akan dibuang.

HTTP

Berikut ini adalah contoh permintaan HTTP yang memicu peristiwa pada instans orkestrasi Approval.

POST /runtime/webhooks/durabletask/instances/MyInstanceId/raiseEvent/Approval&code=XXX
Content-Type: application/json

"true"

Dalam hal ini, ID instans dikodekan secara permanen sebagai MyInstanceId.

Praktik terbaik

Ingatlah praktik terbaik berikut saat bekerja dengan peristiwa eksternal:

Menggunakan nama peristiwa unik untuk deduplikasi

Peristiwa eksternal memiliki jaminan pengiriman paling tidak sekali. Dalam kondisi langka tertentu (yang dapat terjadi selama mulai ulang, penskalaan, atau crash), aplikasi Anda mungkin menerima duplikat peristiwa eksternal yang sama. Kami menyarankan agar peristiwa eksternal berisi ID unik yang memungkinkannya dideduplikasi secara manual di orkestrator.

Nota

Penyedia penyimpanan MSSQL menggunakan peristiwa eksternal dan memperbarui status orkestrator secara transaksional, sehingga tidak ada risiko peristiwa duplikat dengan backend tersebut, tidak seperti penyedia Azure Storage. Namun, masih disarankan agar peristiwa eksternal memiliki nama unik sehingga kode portabel di seluruh backend.

Gunakan batas waktu untuk menghindari waktu tunggu yang tidak terbatas

wait-for-external-event API menunggu tanpa batas waktu secara default. Dalam sebagian besar skenario dunia nyata — seperti persetujuan manusia - Anda harus membandingkan timer tahan lama dengan peristiwa eksternal sehingga proses orkestrasi Anda dapat mengambil tindakan (meningkatkan prioritas, menolak, mencoba kembali) jika peristiwa tidak tiba dalam batas waktu.

Untuk panduan lengkap dengan sampel kode, lihat Interaksi manusia dan batas waktu.

Langkah berikutnya