共用方式為


Azure 服務匯流排適用于 .NET 的用戶端程式庫 - 7.13.1 版

Azure 服務匯流排可讓您建置利用非同步傳訊模式的應用程式,其使用高度可靠的服務來代理產生者和取用者之間的訊息。 Azure 服務匯流排提供用戶端與伺服器之間的彈性代理傳訊,以及結構化先進先出 (FIFO) 傳訊,以及具有複雜路由的發佈/訂閱功能。 如果您想要深入瞭解Azure 服務匯流排,建議您檢閱:什麼是Azure 服務匯流排?

使用用戶端程式庫進行Azure 服務匯流排以:

  • 傳輸商務資料:利用傳訊進行長期的資訊交換,例如銷售或採購單、日誌或庫存移動。

  • 分離應用程式:改善應用程式和服務的可靠性與延展性,減輕 的傳送者和接收者需要同時上線。

  • 控制訊息的處理方式:針對使用佇列的訊息支援傳統的競爭取用者,或使用主題和訂用帳戶允許每個取用者自己的訊息實例。

  • 實作複雜的工作流程:訊息會話支援需要訊息排序或訊息延遲的案例。

| 原始程式碼套件 (NuGet) | API 參考檔 | 產品檔 | 移轉指南 | 疑難排解指南

開始使用

Prerequisites

  • Microsoft Azure 訂用帳戶:若要使用 Azure 服務,包括Azure 服務匯流排,您需要訂用帳戶。 如果您沒有現有的 Azure 帳戶,您可以在 建立帳戶時註冊免費試用或使用 MSDN 訂閱者權益。

  • 服務匯流排命名空間:若要與Azure 服務匯流排互動,您也必須有可用的命名空間。 如果您不熟悉建立 Azure 資源,建議您遵循使用Azure 入口網站 建立服務匯流排命名空間的逐步指南。 您也可以在該處找到使用 Azure CLI、Azure PowerShell或 Azure Resource Manager (ARM) 範本來建立服務匯流排實體的詳細指示。

  • C# 8.0:Azure 服務匯流排用戶端程式庫會使用 C# 8.0 中引進的新功能。 若要利用 C# 8.0 語法,建議您使用 .NET Core SDK 3.0 或更新 版本與 語言版本latest 進行編譯。

    想要充分利用 C# 8.0 語法的 Visual Studio 使用者必須使用 Visual Studio 2019 或更新版本。 Visual Studio 2019 (包括免費的 Community 版) 可以在這裡下載。 Visual Studio 2017 的使用者可以利用 C# 8 語法,方法是使用 Microsoft.Net.Compilers NuGet 套件 和設定語言版本,不過編輯體驗可能不理想。

    您仍然可以搭配舊版 C# 語言使用程式庫,但必須手動管理非同步可列舉和非同步可處置的成員,而不是受益于新的語法。 您仍然可以以 .NET Core SDK 支援的任何架構版本為目標,包括舊版的 .NET Core 或 .NET Framework。 如需詳細資訊,請參閱: 如何指定目標架構

    重要注意事項: 若要在不修改的情況下建置或執行 範例範例 ,必須使用 C# 8.0。 如果您決定針對其他語言版本進行調整,您仍然可以執行範例。

若要在 Azure 中快速建立所需的服務匯流排資源,並接收其連接字串,您可以按一下下列方式來部署我們的範例範本:

部署至 Azure

安裝套件

使用NuGet安裝適用于 .NET 的 Azure 服務匯流排 用戶端程式庫:

dotnet add package Azure.Messaging.ServiceBus

驗證用戶端

若要讓服務匯流排用戶端程式庫與佇列或主題互動,您必須瞭解如何與其連線和授權。 最簡單的方法是使用連接字串,這會在建立服務匯流排命名空間時自動建立。 如果您不熟悉 Azure 中的共用存取原則,建議您遵循逐步指南來 取得服務匯流排連接字串

一旦您有連接字串,就可以使用它來驗證用戶端。

// Create a ServiceBusClient that will authenticate using a connection string
string connectionString = "<connection_string>";
await using var client = new ServiceBusClient(connectionString);

若要查看如何使用 Azure.Identity 進行驗證,請檢視此 範例

若要查看如何起始與自訂端點的連線,請檢視此 範例

ASP.NET Core

若要在 ASP.NET Core應用程式中插入 ServiceBusClient 為相依性,請安裝適用于 ASP.NET Core 套件的 Azure 用戶端程式庫整合。

dotnet add package Microsoft.Extensions.Azure

然後在 方法中 Startup.ConfigureServices 註冊用戶端:

public void ConfigureServices(IServiceCollection services)
{
    services.AddAzureClients(builder =>
    {
        builder.AddServiceBusClient(Configuration.GetConnectionString("ServiceBus"));
    });
  
    services.AddControllers();
}

若要使用上述程式碼,請將此新增至您的設定:

{
  "ConnectionStrings": {
    "ServiceBus": "<connection_string>"
  }
}

如需詳細資訊,請參閱 使用適用于 .NET 的 Azure SDK 進行相依性插入

重要概念

初始化 ServiceBusClient 之後,您就可以與服務匯流排命名空間內的主要資源類型互動,其中多個可以存在,以及發生實際訊息傳輸時,命名空間通常會作為應用程式容器:

  • 佇列:允許傳送和接收訊息。 通常用於點對點通訊。

  • 主題:與佇列相反,主題更適合發佈/訂閱案例。 主題可以傳送至 ,但需要一個訂用帳戶,其中可以平行多個,以取用來源。

  • 用帳戶:從主題取用的機制。 每個訂用帳戶都是獨立的,並接收傳送至主題的每個訊息複本。 規則和篩選可用來量身打造特定訂用帳戶所接收的訊息。

如需這些資源的詳細資訊,請參閱什麼是Azure 服務匯流排?

若要與這些資源互動,您應該熟悉下列 SDK 概念:

  • 服務匯流排用戶端是開發人員與服務匯流排用戶端程式庫互動的主要介面。 它會作為閘道,所有與程式庫的互動都會從中發生。

  • 服務匯流排傳送者的範圍設定為特定佇列或主題,並使用服務匯流排用戶端建立。 傳送者可讓您將訊息傳送至佇列或主題。 它也允許排程可在指定日期傳遞訊息。

  • 服務匯流排接收者的範圍設定為特定佇列或訂用帳戶,並使用服務匯流排用戶端建立。 接收者可讓您從佇列或訂用帳戶接收訊息。 它也允許在接收訊息之後加以解決。 有四種方式可解決訊息:

    • 完成 - 使訊息從佇列或主題中刪除。
    • 放棄 - 釋放接收者對訊息的鎖定,允許其他接收者接收訊息。
    • 延遲 - 以正常方式延遲接收訊息。 若要接收延遲的訊息,必須保留訊息的序號。
    • DeadLetter - 將訊息移至寄不出的信件佇列。 這可防止再次收到訊息。 若要從寄不出的信件佇列接收訊息,需要限定為寄不出的信件佇列的接收者。
  • 服務匯流排會話接收者的範圍設定為啟用特定會話的佇列或訂用帳戶,並使用服務匯流排用戶端建立。 會話接收者幾乎與標準接收者相同,差異在於會話管理作業會公開,這只適用于啟用會話的實體。 這些作業包括取得和設定會話狀態,以及更新會話鎖定。

  • 服務匯流排處理器的範圍設定為特定佇列或訂用帳戶,並使用服務匯流排用戶端建立。 ServiceBusProcessor可以視為一組接收者周圍的抽象概念。 它會使用回呼模型,允許在收到訊息時以及發生例外狀況時指定程式碼。 它提供自動完成已處理的訊息、自動訊息鎖定更新,以及同時執行使用者指定的事件處理常式。 由於其功能集,因此應該是用來撰寫從服務匯流排實體接收之應用程式的移至工具。 針對較複雜的案例,建議使用 ServiceBusReceiver,在此案例中,處理器無法提供直接使用 ServiceBusReceiver 時可預期的精細控制。

  • 服務匯流排會話處理器的範圍設定為啟用特定會話的佇列或訂用帳戶,並使用服務匯流排用戶端建立。 會話處理器幾乎與標準處理器相同,差異在於會話管理作業會公開,這只適用于啟用會話的實體。

如需更多概念和更深入的討論,請參閱: 服務匯流排進階功能

用戶端存留期

ServiceBusClient、傳送者、接收者和處理器可以安全地快取並使用作為應用程式的存留期,這是定期傳送或接收訊息時的最佳做法。 它們負責有效率地管理網路、CPU 和記憶體使用量,以在閒置期間保持低使用量。

這些類型是可處置的,而且需要呼叫 DisposeAsyncCloseAsync ,以確保已正確清除網路資源和其他 Unmanaged 物件。 請務必注意,當處置實例時 ServiceBusClient ,它會自動關閉並清除使用它建立的任何傳送者、接收者和處理器。

執行緒安全

我們保證所有用戶端實例方法都是安全線程,且彼此獨立 (指導方針) 。 這可確保重複使用用戶端實例的建議一律是安全的,即使是跨執行緒也一樣。

其他概念

用戶端選項 | 診斷 | 嘲笑

範例

傳送和接收訊息

訊息傳送是使用 ServiceBusSender 來執行。 接收是使用 ServiceBusReceiver 來執行。

string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);

// create the sender
ServiceBusSender sender = client.CreateSender(queueName);

// create a message that we can send. UTF-8 encoding is used when providing a string.
ServiceBusMessage message = new ServiceBusMessage("Hello world!");

// send the message
await sender.SendMessageAsync(message);

// create a receiver that we can use to receive the message
ServiceBusReceiver receiver = client.CreateReceiver(queueName);

// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// get the message body as a string
string body = receivedMessage.Body.ToString();
Console.WriteLine(body);

傳送訊息批次

有兩種方式可以一次傳送數個訊息。 執行此作業的第一種方式是使用安全批次處理。 使用安全批次處理,您可以建立 ServiceBusMessageBatch 物件,這可讓您嘗試使用 TryAdd 方法一次將訊息新增至批次。 如果訊息無法容納在批次中, TryAdd 將會傳回 false。

// add the messages that we plan to send to a local queue
Queue<ServiceBusMessage> messages = new Queue<ServiceBusMessage>();
messages.Enqueue(new ServiceBusMessage("First message"));
messages.Enqueue(new ServiceBusMessage("Second message"));
messages.Enqueue(new ServiceBusMessage("Third message"));

// create a message batch that we can send
// total number of messages to be sent to the Service Bus queue
int messageCount = messages.Count;

// while all messages are not sent to the Service Bus queue
while (messages.Count > 0)
{
    // start a new batch
    using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();

    // add the first message to the batch
    if (messageBatch.TryAddMessage(messages.Peek()))
    {
        // dequeue the message from the .NET queue once the message is added to the batch
        messages.Dequeue();
    }
    else
    {
        // if the first message can't fit, then it is too large for the batch
        throw new Exception($"Message {messageCount - messages.Count} is too large and cannot be sent.");
    }

    // add as many messages as possible to the current batch
    while (messages.Count > 0 && messageBatch.TryAddMessage(messages.Peek()))
    {
        // dequeue the message from the .NET queue as it has been added to the batch
        messages.Dequeue();
    }

    // now, send the batch
    await sender.SendMessagesAsync(messageBatch);

    // if there are any remaining messages in the .NET queue, the while loop repeats
}

第二種方式會 SendMessagesAsync 使用接受 IEnumerable 的多 ServiceBusMessage 載。 使用此方法時,我們會嘗試將所有提供的訊息放入要傳送至服務的單一訊息批次中。 如果訊息太大而無法容納在單一批次中,作業將會擲回例外狀況。

IList<ServiceBusMessage> messages = new List<ServiceBusMessage>();
messages.Add(new ServiceBusMessage("First"));
messages.Add(new ServiceBusMessage("Second"));
// send the messages
await sender.SendMessagesAsync(messages);

接收一批訊息

// create a receiver that we can use to receive the messages
ServiceBusReceiver receiver = client.CreateReceiver(queueName);

// the received message is a different type as it contains some service set properties
// a batch of messages (maximum of 2 in this case) are received
IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await receiver.ReceiveMessagesAsync(maxMessages: 2);

// go through each of the messages received
foreach (ServiceBusReceivedMessage receivedMessage in receivedMessages)
{
    // get the message body as a string
    string body = receivedMessage.Body.ToString();
}

完成訊息

為了從佇列或訂用帳戶中移除訊息,我們可以呼叫 CompleteAsync 方法。

string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);

// create the sender
ServiceBusSender sender = client.CreateSender(queueName);

// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage("Hello world!");

// send the message
await sender.SendMessageAsync(message);

// create a receiver that we can use to receive and settle the message
ServiceBusReceiver receiver = client.CreateReceiver(queueName);

// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// complete the message, thereby deleting it from the service
await receiver.CompleteMessageAsync(receivedMessage);

放棄訊息

放棄訊息會釋放接收者的鎖定,這可讓此或其他接收者接收訊息。

ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// abandon the message, thereby releasing the lock and allowing it to be received again by this or other receivers
await receiver.AbandonMessageAsync(receivedMessage);

延遲訊息

延遲訊息會防止使用 或 ReceiveMessagesAsync 方法再次 ReceiveMessageAsync 收到訊息。 相反地,有個別的方法, ReceiveDeferredMessageAsync 以及 ReceiveDeferredMessagesAsync 接收延遲的訊息。

ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// defer the message, thereby preventing the message from being received again without using
// the received deferred message API.
await receiver.DeferMessageAsync(receivedMessage);

// receive the deferred message by specifying the service set sequence number of the original
// received message
ServiceBusReceivedMessage deferredMessage = await receiver.ReceiveDeferredMessageAsync(receivedMessage.SequenceNumber);

寄不出的信件訊息

寄不出的信件訊息類似于延遲,主要差異在於訊息在收到特定次數之後,服務會自動寄不出的信件。 應用程式可以根據需求選擇手動寄不出的信件訊息。 當訊息寄不出的信件時,實際上會移至原始佇列的子佇列。 請注意, ServiceBusReceiver 會用來接收來自寄不出的信件子佇列的訊息,不論主要佇列是否已啟用會話。

ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// Dead-letter the message, thereby preventing the message from being received again without receiving from the dead letter queue.
// We can optionally pass a dead letter reason and dead letter description to further describe the reason for dead-lettering the message.
await receiver.DeadLetterMessageAsync(receivedMessage, "sample reason", "sample description");

// receive the dead lettered message with receiver scoped to the dead letter queue.
ServiceBusReceiver dlqReceiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions
{
    SubQueue = SubQueue.DeadLetter
});
ServiceBusReceivedMessage dlqMessage = await dlqReceiver.ReceiveMessageAsync();

// The reason and the description that we specified when dead-lettering the message will be available in the received dead letter message.
string reason = dlqMessage.DeadLetterReason;
string description = dlqMessage.DeadLetterErrorDescription;

如需詳細資訊,請參閱 ServiceBus 寄不出的信件佇列概觀

使用處理器

ServiceBusProcessor您可以將 視為一組接收者周圍的抽象概念。 它會使用回呼模型,允許在收到訊息時以及發生例外狀況時指定程式碼。 它提供自動完成已處理的訊息、自動訊息鎖定更新,以及同時執行使用者指定的事件處理常式。 由於其功能集,因此應該是用來撰寫從服務匯流排實體接收之應用程式的移至工具。 針對處理器無法直接使用 ServiceBusReceiver 時預期的精細控制項,建議使用 ServiceBusReceiver 進行更複雜的案例。

string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);

// create the sender
ServiceBusSender sender = client.CreateSender(queueName);

// create a set of messages that we can send
ServiceBusMessage[] messages = new ServiceBusMessage[]
{
    new ServiceBusMessage("First"),
    new ServiceBusMessage("Second")
};

// send the message batch
await sender.SendMessagesAsync(messages);

// create the options to use for configuring the processor
var options = new ServiceBusProcessorOptions
{
    // By default or when AutoCompleteMessages is set to true, the processor will complete the message after executing the message handler
    // Set AutoCompleteMessages to false to [settle messages](/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock) on your own.
    // In both cases, if the message handler throws an exception without settling the message, the processor will abandon the message.
    AutoCompleteMessages = false,

    // I can also allow for multi-threading
    MaxConcurrentCalls = 2
};

// create a processor that we can use to process the messages
await using ServiceBusProcessor processor = client.CreateProcessor(queueName, options);

// configure the message and error handler to use
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;

async Task MessageHandler(ProcessMessageEventArgs args)
{
    string body = args.Message.Body.ToString();
    Console.WriteLine(body);

    // we can evaluate application logic and use that to determine how to settle the message.
    await args.CompleteMessageAsync(args.Message);
}

Task ErrorHandler(ProcessErrorEventArgs args)
{
    // the error source tells me at what point in the processing an error occurred
    Console.WriteLine(args.ErrorSource);
    // the fully qualified namespace is available
    Console.WriteLine(args.FullyQualifiedNamespace);
    // as well as the entity path
    Console.WriteLine(args.EntityPath);
    Console.WriteLine(args.Exception.ToString());
    return Task.CompletedTask;
}

// start processing
await processor.StartProcessingAsync();

// since the processing happens in the background, we add a Console.ReadKey to allow the processing to continue until a key is pressed.
Console.ReadKey();

使用 Azure.Identity 進行驗證

Azure 身分識別程式庫提供簡單的 Azure Active Directory 驗證支援。

// Create a ServiceBusClient that will authenticate through Active Directory
string fullyQualifiedNamespace = "yournamespace.servicebus.windows.net";
await using var client = new ServiceBusClient(fullyQualifiedNamespace, new DefaultAzureCredential());

使用會話

會話 提供群組相關訊息的機制。 若要使用會話,您必須使用已啟用會話的實體。

疑難排解

請參閱 服務匯流排疑難排解指南

下一步

除了討論的簡介案例之外,Azure 服務匯流排用戶端程式庫提供其他案例的支援,以協助利用Azure 服務匯流排服務的完整功能集。 為了協助探索其中一些案例,服務匯流排用戶端程式庫提供範例專案,作為常見案例的圖例。 如需詳細資訊,請參閱 讀我檔案範例

參與

此專案歡迎參與和提供建議。 大部分的參與都要求您同意「參與者授權合約 (CLA)」,宣告您有權且確實授與我們使用投稿的權利。 如需詳細資料,請前往 https://cla.microsoft.com

當您提交提取要求時,CLA Bot 會自動判斷您是否需要提供 CLA,並適當地裝飾 PR (例如標籤、註解)。 請遵循 bot 提供的指示。 您只需要使用我們的 CLA 在所有存放庫上執行此動作一次。

此專案採用 Microsoft Open Source Code of Conduct (Microsoft 開放原始碼管理辦法)。 如需詳細資訊,請參閱管理辦法常見問題集,如有任何其他問題或意見請連絡 opencode@microsoft.com

如需詳細資訊,請參閱我們的 參與指南

曝光數