分享方式:


使用 NServiceBus 和 Azure 服務匯流排建置訊息驅動的商務應用程式

NServiceBus 是 Particular Software 所提供的商務傳訊架構。 其是建置在Azure 服務匯流排之上,並可藉由將基礎結構關注點抽象化,協助開發人員專注於商務邏輯。 在本指南中,我們將建置解決方案,以在兩個服務之間交換訊息。 我們也會說明如何自動重試失敗的訊息,並檢閱在 Azure 中裝載這些服務的選項。

注意

本教學課程的程式碼可在 Particular Software Docs 網站上取得。

必要條件

範例假設您已建立 Azure 服務匯流排命名空間

重要

NServiceBus 至少需要標準層。 基本層無法運作。

下載並準備解決方案

  1. Particular Software Docs 網站下載程式碼。 解決方案 SendReceiveWithNservicebus.sln 包含三個專案:

    • Sender:傳送訊息的主控台應用程式
    • Receiver:接收來自傳送者的訊息並回覆的主控台應用程式
    • Shared:類別庫,其中包含傳送者和接收者之間共用的訊息合約

    下列圖表是由 ServiceInsight (來自 Particular Software 的視覺效果和偵錯工具) 產生的,其會顯示訊息流程:

    顯示順序圖表的影像

  2. 在慣用的程式碼編輯器 (例如,Visual Studio 2022) 中開啟 SendReceiveWithNservicebus.sln

  3. 在 Receiver 和 Sender 專案中開啟 appsettings.json,並將 AzureServiceBusConnectionString 設定為 Azure 服務匯流排命名空間的連接字串。

    • 您可以在 Azure 入口網站的 [服務匯流排命名空間]>[設定]>[共用存取原則]>[RootManageSharedAccessKey]>[主要連接字串] 中找到。
    • AzureServiceBusTransport 也有可接受命名空間和權杖認證的建構函式,其在生產環境中會更安全,不過為了本教學課程的目的,將會使用共用存取金鑰連接字串。

定義共用的訊息合約

Shared 類別庫是您定義用來傳送訊息的合約所在位置。 其包含 NServiceBus NuGet 套件的參考,其中包含您可以用來識別訊息的介面。 這些介面不是必要的,但其會提供來自 NServiceBus 的一些額外驗證,並允許程式碼自行記錄。

首先,將檢閱 Ping.cs 類別

public class Ping : NServiceBus.ICommand
{
    public int Round { get; set; }
}

Ping 類別會定義 Sender 傳送至 Receiver 的訊息。 其是實作 NServiceBus.ICommand (來自 NServiceBus 套件的介面) 的簡單 C# 類別。 此訊息是讀取器和 NServiceBus (一種命令) 的訊號,但是有其他方法可識別訊息而不需使用介面

Shared 專案中的另一個訊息類別是 Pong.cs

public class Pong : NServiceBus.IMessage
{
    public string Acknowledgement { get; set; }
}

Pong 也是簡單的 C# 物件,但這個物件實作 NServiceBus.IMessageIMessage 介面代表既不是命令也不是事件的一般訊息,而且常用於回覆。 在我們的範例中,其是 Receiver 會傳回給 Sender 的回覆,來指出已收到訊息。

PingPong 是您將使用的兩種訊息類型。 下一個步驟是將 Sender 設定為使用 Azure 服務匯流排,並傳送 Ping 訊息。

設定傳送者

Sender 是傳送 Ping 訊息的端點。 在這裡,您會將 Sender 設定為使用 Azure 服務匯流排作為傳輸機制,然後建構 Ping 執行個體並加以傳送。

Program.csMain 方法中,您會設定 Sender 端點:

var host = Host.CreateDefaultBuilder(args)
    // Configure a host for the endpoint
    .ConfigureLogging((context, logging) =>
    {
        logging.AddConfiguration(context.Configuration.GetSection("Logging"));

        logging.AddConsole();
    })
    .UseConsoleLifetime()
    .UseNServiceBus(context =>
    {
        // Configure the NServiceBus endpoint
        var endpointConfiguration = new EndpointConfiguration("Sender");

        var connectionString = context.Configuration.GetConnectionString("AzureServiceBusConnectionString");
        // If token credentials are to be used, the overload constructor for AzureServiceBusTransport would be used here
        var routing = endpointConfiguration.UseTransport(new AzureServiceBusTransport(connectionString));
        endpointConfiguration.UseSerialization<SystemJsonSerializer>();

        endpointConfiguration.AuditProcessedMessagesTo("audit");
        routing.RouteToEndpoint(typeof(Ping), "Receiver");

        endpointConfiguration.EnableInstallers();

        return endpointConfiguration;
    })
    .ConfigureServices(services => services.AddHostedService<SenderWorker>())
    .Build();

await host.RunAsync();

在這裡有許多要解壓縮的項目,因此我們將逐步檢閱。

設定端點的主機

裝載和記錄是使用標準 Microsoft 一般主機選項來設定的。 現在,端點已設定為當作主控台應用程式執行,但可以加以修改,只需最少的變更,即可在 Azure Functions 中執行,我們稍後將在本文討論。

設定 NServiceBus 端點

接下來,您會告訴主機搭配 .UseNServiceBus(…) 擴充方法使用 NServiceBus。 此方法會採用回呼函式,傳回將在主機執行時啟動的端點。

在端點設定中,您會針對傳輸指定 AzureServiceBus,並提供來自 appsettings.json 的連接字串。 接下來,您將設定路由,以便將類型為 Ping 的訊息傳送至名為 "Receiver" 的端點。 其可讓 NServiceBus 將訊息分派至目的地的訊息自動化,而不需要接收者的位址。

當端點啟動時,對 EnableInstallers 的呼叫會在 Azure 服務匯流排命名空間中設定拓撲,並視需要建立必要的佇列。 在生產環境中,作業指令碼是建立拓撲的另一個選項。

設定背景服務來傳送訊息

傳送者的最後一個片段是 SenderWorker,這是設定為每秒傳送一則 Ping 訊息的背景服務。

public class SenderWorker : BackgroundService
{
    private readonly IMessageSession messageSession;
    private readonly ILogger<SenderWorker> logger;

    public SenderWorker(IMessageSession messageSession, ILogger<SenderWorker> logger)
    {
        this.messageSession = messageSession;
        this.logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            var round = 0;
            while (!stoppingToken.IsCancellationRequested)
            {
                await messageSession.Send(new Ping { Round = round++ });;

                logger.LogInformation($"Message #{round}");

                await Task.Delay(1_000, stoppingToken);
            }
        }
        catch (OperationCanceledException)
        {
            // graceful shutdown
        }
    }
}

ExecuteAsync 中使用的 IMessageSession 會插入至 SenderWorker,並允許我們在訊息處理常式外部使用 NServiceBus 傳送訊息。 您在 Sender 中設定的路由會指定 Ping 訊息的目的地。 其會保留系統拓撲 (哪些訊息會路由傳送至哪些位址) 作為商務程式碼的個別關注點。

Sender 應用程式也包含 PongHandler。 在討論完 Receiver 之後,您將回到這裡,接著討論。

設定接收者

Receiver 是接聽 Ping 訊息、記錄何時收到訊息,以及回覆傳送者的端點。 在本節中,我們將快速檢閱與 Sender 類似的端點設定,然後將我們的注意力轉向訊息處理常式。

如同傳送者,使用 Microsoft 一般主機將接收者設定為主控台應用程式。 其會使用相同的記錄和端點設定 (Azure 服務匯流排作為訊息傳輸),但名稱不同,以便與傳送者區別:

var endpointConfiguration = new EndpointConfiguration("Receiver");

由於此端點只會回覆其建立者,且不會啟動新的交談,因此不需要路由設定。 其也如同寄件者一樣不需要背景工作角色,因為其只會在收到訊息時回覆。

Ping 訊息處理常式

Receiver 專案包含名為 PingHandler 的「訊息處理常式」

public class PingHandler : NServiceBus.IHandleMessages<Ping>
{
    private readonly ILogger<PingHandler> logger;

    public PingHandler(ILogger<PingHandler> logger)
    {
        this.logger = logger;
    }

    public async Task Handle(Ping message, IMessageHandlerContext context)
    {
        logger.LogInformation($"Processing Ping message #{message.Round}");

        // throw new Exception("BOOM");

        var reply = new Pong { Acknowledgement = $"Ping #{message.Round} processed at {DateTimeOffset.UtcNow:s}" };

        await context.Reply(reply);
    }
}

現在讓我們忽略已加註的程式碼;稍後當討論從失敗中復原時,我們會回到這裡。

類別會實作 IHandleMessages<Ping>,其定義一種方法:Handle。 此介面會告訴 NServiceBus,當端點收到類型為 Ping 的訊息時,其應該由這個處理常式中的 Handle 方法處理。 Handle 方法會採用訊息本身作為參數,以及採用 IMessageHandlerContext,其允許進一步的傳訊作業,例如回覆、傳送命令或發佈事件。

我們的 PingHandler 很簡單:收到 Ping 訊息時,請記錄訊息詳細資料,並使用新的 Pong 訊息回覆寄件者,後續在寄件者的 PongHandler 中處理。

注意

在 Sender 的設定中,您已指定 Ping 訊息應該路由傳送至 Receiver。 此外,NServiceBus 會將中繼資料新增至訊息,指出訊息的來源。 這就是為什麼您不需要針對 Pong 回覆訊息指定任何路由資料;其會自動路由傳回至其來源:Sender。

在適當地設定 Sender 和 Receiver 後,您現在可以執行解決方案。

執行解決方案

若要啟動解決方案,您必須同時執行 Sender 和 Receiver。 如果您是使用 Visual Studio Code,請啟動 [全部偵錯] 設定。 如果您是使用 Visual Studio,請將解決方案設定為同時啟動 Sender 和 Receiver 專案:

  1. 在 [方案總管] 中,以滑鼠右鍵按一下解決方案
  2. 選取 [設定啟始專案...]
  3. 選取 [多個啟始專案]
  4. 針對 Sender 和 Receiver,在下拉式清單中選取 [啟動]

啟動解決方案。 兩個主控台應用程式即會出現,一個用於 Sender,另一個用於 Receiver。

在 Sender 中,請注意,每秒會分派一則 Ping 訊息,這要歸功於 SenderWorker 背景工作。 Receiver 會顯示每則所接收 Ping 訊息的詳細資料,而 Sender 會在回覆中記錄每則所接收 Pong 訊息的詳細資料。

既然一切正常運作,讓我們將其打破。

復原能力實際操作

錯誤是軟體系統中必然發生的事實。 程式碼失敗是不可避免的,而且其可能會由於各種原因而失敗,例如網路故障、資料庫鎖定、協力廠商 API 中的變更,以及一般的編碼錯誤。

NServiceBus 具有強大的復原能力特性來處理失敗。 當訊息處理常式失敗時,系統會根據預先定義的原則自動重試訊息。 有兩種類型的重試原則:立即重試和延遲重試。 描述其運作方式的最佳方式就是查看其實際操作。 讓我們將重試原則新增至 Receiver 端點:

  1. 開啟 Sender 專案中的 Program.cs
  2. .EnableInstallers 行之後新增下列程式碼:
endpointConfiguration.SendFailedMessagesTo("error");
var recoverability = endpointConfiguration.Recoverability();
recoverability.Immediate(
    immediate =>
    {
        immediate.NumberOfRetries(3);
    });
recoverability.Delayed(
    delayed =>
    {
        delayed.NumberOfRetries(2);
        delayed.TimeIncrease(TimeSpan.FromSeconds(5));
    });

在討論此原則的運作方式之前,讓我們看看其實際操作。 在測試復原能力原則之前,您需要模擬錯誤。 開啟 Receiver 專案中的 PingHandler 程式碼,並註銷這一行:

throw new Exception("BOOM");

現在,當 Receiver 處理 Ping 訊息時,其會失敗。 再次啟動解決方案,讓我們看看 Receiver 中發生什麼情況。

使用較不可靠的 PingHandler,所有訊息都會失敗。 您可以看到重試原則開始對這些訊息發揮作用。 第一次訊息失敗時,系統會立即重試,最多三次:

此影像顯示重試訊息最多 3 次的立即重試原則

當然,其會繼續失敗,因此當三次立即重試用光時,延遲重試原則會開始發揮作用,且訊息會延遲 5 秒:

此影像顯示延遲的重試原則,該原則會在嘗試另一回合的立即重試之前,以遞增 5 秒的方式延遲訊息

經過這 5 秒之後,訊息會再次重試三次 (也就是立即重試原則再次發揮作用)。 這些重試也會失敗,而且 NServiceBus 會再次延遲訊息,這次則延遲 10 秒,然後再試一次。

如果在執行完整重試原則之後 PingHandler 仍然無法成功,訊息會放在名為 error 的「集中式」錯誤佇列中,如呼叫 SendFailedMessagesTo 時所定義的。

顯示失敗訊息的影像

集中式錯誤佇列的概念與 Azure 服務匯流排中的無效信件機制不同,其中每個處理佇列都有無效信件佇列。 使用 NServiceBus,Azure 服務匯流排中的無效信件佇列會作為真正的有害訊息佇列,而最終出現在集中式錯誤佇列中的訊息稍後可視需要重新處理。

重試原則有助於解決數種類型的錯誤,這些錯誤本質上通常是暫時性或半暫時性。 也就是說,如果訊息只是在短暫延遲後重新處理,這些錯誤是暫時性且通常會消失不見。 範例包括網路故障、資料庫鎖定,以及協力廠商 API 中斷。

一旦訊息位於錯誤佇列中,您就可以在選擇的工具中檢查訊息的詳細資料,然後決定要使用該工具做什麼。 例如,使用 ServicePulse (Particular Software 提供的監視工具),我們可以檢視訊息詳細資料和失敗原因:

顯示來自特定軟體之 ServicePulse 的影像

檢查詳細資料之後,您可以將訊息傳回其原始佇列進行處理。 您也可以先編輯訊息,再這麼做。 如果錯誤佇列中有多則訊息,因為相同原因而失敗,它們全都可以採取批次的形式傳回至其原始目的地。

接下來,是時候找出在 Azure 中部署解決方案的位置。

在 Azure 中裝載服務的位置

在此範例中,Sender 和 Receiver 端點會設定為當作主控台應用程式執行。 它們也可以裝載於各種 Azure 服務中,包括 Azure Functions、Azure App Service、Azure 容器執行個體、Azure Kubernetes Service 和 Azure VM。 例如,以下是 Sender 端點如何設定為當作 Azure 函式執行:

[assembly: NServiceBusTriggerFunction("Sender")]
public class Program
{
    public static async Task Main()
    {
        var host = new HostBuilder()
            .ConfigureFunctionsWorkerDefaults()
            .UseNServiceBus(configuration =>
            {
                configuration.Routing().RouteToEndpoint(typeof(Ping), "Receiver");
            })
            .Build();

        await host.RunAsync();
    }
}

如需搭配 Azure Functions 使用 NServiceBus 的詳細資訊,請參閱 NServiceBus 文件中的 Azure Functions 與 Azure 服務匯流排

下一步

如需搭配 Azure 服務使用 NServiceBus 的詳細資訊,請參閱下列文章: