你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于 .NET 的 Azure WebJobs 服务总线客户端库 - 版本 5.13.3
此扩展提供从 Azure 函数访问Azure 服务总线的功能。
入门
安装包
使用 NuGet 安装服务总线扩展:
dotnet add package Microsoft.Azure.WebJobs.Extensions.ServiceBus
先决条件
Azure 订阅:若要使用 Azure 服务(包括Azure 服务总线),需要订阅。 如果没有现有的 Azure 帐户,可以在创建帐户时注册免费试用版或使用 Visual Studio 订阅权益。
服务总线命名空间:若要与Azure 服务总线交互,还需要有一个可用的命名空间。 如果不熟悉如何创建 Azure 资源,建议遵循使用 Azure 门户创建服务总线命名空间的分步指南。 还可以在此处找到有关使用 Azure CLI、Azure PowerShell或 Azure 资源管理器 (ARM) 模板创建服务总线实体的详细说明。
若要在 Azure 中快速创建所需的服务总线资源并接收其连接字符串,可以通过单击以下操作部署示例模板:
对客户端进行身份验证
若要使服务总线客户端库与队列或主题交互,需要了解如何与其连接和授权。 执行此操作的最简单方法是使用连接字符串,该连接字符串是在创建服务总线命名空间时自动创建的。 如果不熟悉 Azure 中的共享访问策略,建议按照分步指南获取服务总线连接字符串。
Connection
和 ServiceBusTriggerAttribute
的 ServiceBusAttribute
属性用于指定存储连接字符串的配置属性。 如果未指定,则属性AzureWebJobsServiceBus
应包含连接字符串。
对于本地开发,请使用 local.settings.json
文件存储连接字符串:
{
"Values": {
"<connection_name>": "Endpoint=sb://<service_bus_namespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<access key>"
}
}
部署后,使用应用程序设置设置连接字符串。
基于标识的身份验证
如果环境已启用 托管标识 ,则可以使用它对服务总线扩展进行身份验证。 在执行此操作之前,需要确保已按照Azure Functions开发人员指南中所述配置权限。
若要使用基于标识的身份验证,请 <connection_name>__fullyQualifiedNamespace
提供配置设置。
{
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"<connection_name>__fullyQualifiedNamespace": "<service_bus_namespace>.servicebus.windows.net"
}
}
或者,对于已部署的应用,请在 应用程序设置中设置相同的设置:
<connection_name>__fullyQualifiedNamespace=<service_bus_namespace>.servicebus.windows.net
有关配置基于标识的连接的详细信息,请参阅 此处。
关键概念
服务总线触发器
服务总线触发器允许在将消息发送到服务总线队列或主题时执行函数。
请按照Azure 服务总线触发器教程详细了解服务总线触发器。
服务总线输出绑定
服务总线输出绑定允许函数发送服务总线消息。
请按照输出绑定Azure 服务总线详细了解服务总线绑定。
示例
发送单个消息
可以通过将 属性应用于 ServiceBus
函数返回值,将单个消息发送到队列或主题。 返回值可以是 、 byte[]
或 ServiceBusMessage
类型string
。
[FunctionName("BindingToReturnValue")]
[return: ServiceBus("<queue_or_topic_name>", Connection = "<connection_name>")]
public static string BindToReturnValue([TimerTrigger("0 */5 * * * *")] TimerInfo myTimer)
{
// This value would get stored in Service Bus message body.
// The string would be UTF8 encoded.
return $"C# Timer trigger function executed at: {DateTime.Now}";
}
还可以使用 out
、 byte[]
或 ServiceBusMessage
类型的string
参数。
[FunctionName("BindingToOutputParameter")]
public static void Run(
[TimerTrigger("0 */5 * * * *")] TimerInfo myTimer,
[ServiceBus("<queue_or_topic_name>", Connection = "<connection_name>")] out ServiceBusMessage message)
{
message = new ServiceBusMessage($"C# Timer trigger function executed at: {DateTime.Now}");
}
发送多条消息
若要从单个 Azure 函数调用发送多个消息, ServiceBus
可以将 属性应用于 IAsyncCollector<string>
或 IAsyncCollector<ServiceBusReceivedMessage>
参数。
[FunctionName("BindingToCollector")]
public static async Task Run(
[TimerTrigger("0 */5 * * * *")] TimerInfo myTimer,
[ServiceBus("<queue_or_topic_name>", Connection = "<connection_name>")] IAsyncCollector<ServiceBusMessage> collector)
{
// IAsyncCollector allows sending multiple messages in a single function invocation
await collector.AddAsync(new ServiceBusMessage(new BinaryData($"Message 1 added at: {DateTime.Now}")));
await collector.AddAsync(new ServiceBusMessage(new BinaryData($"Message 2 added at: {DateTime.Now}")));
}
使用绑定到强类型模型
若要将强类型模型类与 ServiceBus 绑定一起使用,请将 ServiceBus
属性应用于模型参数。 这样做将尝试将 反序列化 ServiceBusMessage.Body
为强类型模型。
[FunctionName("TriggerSingleModel")]
public static void Run(
[ServiceBusTrigger("<queue_name>", Connection = "<connection_name>")] Dog dog,
ILogger logger)
{
logger.LogInformation($"Who's a good dog? {dog.Name} is!");
}
使用 ServiceBusSender 发送多条消息
还可以直接绑定到 , ServiceBusSender
以最大程度地控制消息发送。
[FunctionName("BindingToSender")]
public static async Task Run(
[TimerTrigger("0 */5 * * * *")] TimerInfo myTimer,
[ServiceBus("<queue_or_topic_name>", Connection = "<connection_name>")] ServiceBusSender sender)
{
await sender.SendMessagesAsync(new[]
{
new ServiceBusMessage(new BinaryData($"Message 1 added at: {DateTime.Now}")),
new ServiceBusMessage(new BinaryData($"Message 2 added at: {DateTime.Now}"))
});
}
按消息触发器
若要在每次将消息发送到服务总线队列或订阅时运行函数,请将 ServiceBusTrigger
属性应用于 string
、 byte[]
或 ServiceBusReceivedMessage
参数。
[FunctionName("TriggerSingle")]
public static void Run(
[ServiceBusTrigger("<queue_name>", Connection = "<connection_name>")] string messageBodyAsString,
ILogger logger)
{
logger.LogInformation($"C# function triggered to process a message: {messageBodyAsString}");
}
批处理触发器
若要为一批接收的消息运行函数,请将 ServiceBusTrigger
属性应用于 string[]
、 或 ServiceBusReceivedMessage[]
参数。
[FunctionName("TriggerBatch")]
public static void Run(
[ServiceBusTrigger("<queue_name>", Connection = "<connection_name>")] ServiceBusReceivedMessage[] messages,
ILogger logger)
{
foreach (ServiceBusReceivedMessage message in messages)
{
logger.LogInformation($"C# function triggered to process a message: {message.Body}");
logger.LogInformation($"EnqueuedTime={message.EnqueuedTime}");
}
}
消息结算
可以使用 将消息配置为在 ServiceBusOptions
函数执行后自动完成。 如果想要对消息解决进行更多控制,可以使用每条消息触发器和批处理触发器绑定到 MessageActions
。
[FunctionName("BindingToMessageActions")]
public static async Task Run(
[ServiceBusTrigger("<queue_name>", Connection = "<connection_name>")]
ServiceBusReceivedMessage[] messages,
ServiceBusMessageActions messageActions)
{
foreach (ServiceBusReceivedMessage message in messages)
{
if (message.MessageId == "1")
{
await messageActions.DeadLetterMessageAsync(message);
}
else
{
await messageActions.CompleteMessageAsync(message);
}
}
}
会话触发器
若要从启用会话的队列或主题接收消息,可以在 属性上ServiceBusTrigger
设置 IsSessionsEnabled
属性。 使用会话时,除了特定于会话的功能外,还可以绑定到 SessionMessageActions
以访问消息解决方法。
[FunctionName("BindingToSessionMessageActions")]
public static async Task Run(
[ServiceBusTrigger("<queue_name>", Connection = "<connection_name>", IsSessionsEnabled = true)]
ServiceBusReceivedMessage[] messages,
ServiceBusSessionMessageActions sessionActions)
{
foreach (ServiceBusReceivedMessage message in messages)
{
if (message.MessageId == "1")
{
await sessionActions.DeadLetterMessageAsync(message);
}
else
{
await sessionActions.CompleteMessageAsync(message);
}
}
// We can also perform session-specific operations using the actions, such as setting state that is specific to this session.
await sessionActions.SetSessionStateAsync(new BinaryData("<session state>"));
}
绑定到 ReceiveActions
可以从函数调用中接收其他消息。 如果需要根据通过绑定参数传递到函数的初始消息的某些特征,更好地控制在函数调用中要处理的消息数,这可能很有用。 你收到的任何其他消息将受到传递到函数的初始消息的相同 AutoCompleteMessages
和 MaxAutoLockRenewalDuration
配置的约束。 还可以查看消息。 速览消息不受 和 MaxAutoLockRenewalDuration
配置的约束,AutoCompleteMessages
因为这些消息未锁定,因此无法完成。
[FunctionName("BindingToReceiveActions")]
public static async Task Run(
[ServiceBusTrigger("<queue_name>", Connection = "<connection_name>", IsSessionsEnabled = true)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions,
ServiceBusReceiveActions receiveActions)
{
if (message.MessageId == "1")
{
await messageActions.DeadLetterMessageAsync(message);
}
else
{
await messageActions.CompleteMessageAsync(message);
// attempt to receive additional messages in this session
var receivedMessages = await receiveActions.ReceiveMessagesAsync(maxMessages: 10);
// you can also use the receive actions to peek messages
var peekedMessages = await receiveActions.PeekMessagesAsync(maxMessages: 10);
}
}
绑定到 ServiceBusClient
有时你可能想要绑定到触发器正在使用的同一 ServiceBusClient
个 。 如果需要根据收到的消息动态创建发件人,这非常有用。
[FunctionName("BindingToClient")]
public static async Task Run(
[ServiceBus("<queue_or_topic_name>", Connection = "<connection_name>")]
ServiceBusReceivedMessage message,
ServiceBusClient client)
{
ServiceBusSender sender = client.CreateSender(message.To);
await sender.SendMessageAsync(new ServiceBusMessage(message));
}
疑难解答
如果函数触发了未经处理的异常,并且你尚未解决该消息,则扩展将尝试放弃该消息,以便它立即可供再次接收。
有关更多故障排除指南,请参阅监视Azure Functions。
后续步骤
阅读Azure Functions或创建 Azure 函数指南简介。
贡献
有关构建、测试和参与此库的详细信息,请参阅我们的 CONTRIBUTING.md 。
本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 cla.microsoft.com。
此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。