你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
Azure Functions 的 Azure 服务总线触发器
使用服务总线触发器响应来自服务总线队列或主题的消息。 从扩展版本 3.1.0 开始,可以在启用会话的队列或主题上触发。
有关设置和配置详细信息,请参阅概述。
消耗计划和高级计划的服务总线缩放决策取决于基于目标的缩放。 有关详细信息,请参阅基于目标的缩放。
重要
本文使用选项卡来支持多个版本的 Node.js 编程模型。 v4 模型已正式发布,旨在为 JavaScript 和 TypeScript 开发人员提供更为灵活和直观的体验。 有关 v4 模型工作原理的更多详细信息,请参阅 Azure Functions Node.js 开发人员指南。 要详细了解 v3 和 v4 之间的差异,请参阅迁移指南。
Azure Functions 支持两种 Python 编程模型。 定义绑定的方式取决于选择的编程模型。
使用 Python v2 编程模型,可以直接在 Python 函数代码中使用修饰器定义绑定。 有关详细信息,请参阅 Python 开发人员指南。
本文同时支持两个编程模型。
示例
可使用以下 C# 模式之一来创建 C# 函数:
- 独立辅助角色模型:编译的 C# 函数,该函数在独立于运行时的工作进程中运行。 需要独立工作进程才能支持在 LTS 和非 LTS 版 .NET 和 .NET Framework 上运行的 C# 函数。 独立工作进程函数的扩展使用
Microsoft.Azure.Functions.Worker.Extensions.*
命名空间。 - 进程内模型:编译的 C# 函数,该函数在与 Functions 运行时相同的进程中运行。 在此模型的变体中,可以使用 C# 脚本运行 Functions,该脚本主要用于 C# 门户编辑。 进程内函数的扩展使用
Microsoft.Azure.WebJobs.Extensions.*
命名空间。
重要
对进程内模型的支持将于 2026 年 11 月 10 日结束。 为获得完全支持,强烈建议将应用迁移到独立工作模型。
此代码定义并初始化 ILogger
:
private readonly ILogger<ServiceBusReceivedMessageFunctions> _logger;
public ServiceBusReceivedMessageFunctions(ILogger<ServiceBusReceivedMessageFunctions> logger)
{
_logger = logger;
}
此示例演示接收单个服务总线队列消息并将其写入日志的 C# 函数:
[Function(nameof(ServiceBusReceivedMessageFunction))]
[ServiceBusOutput("outputQueue", Connection = "ServiceBusConnection")]
public string ServiceBusReceivedMessageFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
var outputMessage = $"Output message created at {DateTime.Now}";
return outputMessage;
}
此示例演示一个 C# 函数,该函数接收单个批处理中的多个服务总线队列消息,并将每个消息写入日志:
[Function(nameof(ServiceBusReceivedMessageBatchFunction))]
public void ServiceBusReceivedMessageBatchFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", IsBatched = true)] ServiceBusReceivedMessage[] messages)
{
foreach (ServiceBusReceivedMessage message in messages)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
}
}
此示例演示一个 C# 函数,该函数接收多个服务总线队列消息,将其写入日志,然后将消息处置为已完成:
[Function(nameof(ServiceBusMessageActionsFunction))]
public async Task ServiceBusMessageActionsFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", AutoCompleteMessages = false)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
// Complete the message
await messageActions.CompleteMessageAsync(message);
}
以下 Java 函数使用 @ServiceBusQueueTrigger
中的 @ServiceBusQueueTrigger
注释来说明服务总线队列触发器的配置。 此函数获取放置在队列上的消息,然后将其添加到日志。
@FunctionName("sbprocessor")
public void serviceBusProcess(
@ServiceBusQueueTrigger(name = "msg",
queueName = "myqueuename",
connection = "myconnvarname") String message,
final ExecutionContext context
) {
context.getLogger().info(message);
}
将消息添加到服务总线主题时,也可触发 Java 函数。 以下示例使用 @ServiceBusTopicTrigger
注释来说明触发器配置。
@FunctionName("sbtopicprocessor")
public void run(
@ServiceBusTopicTrigger(
name = "message",
topicName = "mytopicname",
subscriptionName = "mysubscription",
connection = "ServiceBusConnection"
) String message,
final ExecutionContext context
) {
context.getLogger().info(message);
}
以下示例显示了服务总线触发器 TypeScript 函数。 此函数将读取消息元数据并记录服务总线队列消息。
import { app, InvocationContext } from '@azure/functions';
export async function serviceBusQueueTrigger1(message: unknown, context: InvocationContext): Promise<void> {
context.log('Service bus queue function processed message:', message);
context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
context.log('DeliveryCount =', context.triggerMetadata.deliveryCount);
context.log('MessageId =', context.triggerMetadata.messageId);
}
app.serviceBusQueue('serviceBusQueueTrigger1', {
connection: 'MyServiceBusConnection',
queueName: 'testqueue',
handler: serviceBusQueueTrigger1,
});
以下示例显示了服务总线触发器 JavaScript 函数。 此函数将读取消息元数据并记录服务总线队列消息。
const { app } = require('@azure/functions');
app.serviceBusQueue('serviceBusQueueTrigger1', {
connection: 'MyServiceBusConnection',
queueName: 'testqueue',
handler: (message, context) => {
context.log('Service bus queue function processed message:', message);
context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
context.log('DeliveryCount =', context.triggerMetadata.deliveryCount);
context.log('MessageId =', context.triggerMetadata.messageId);
},
});
以下示例演示了 function.json 文件中的服务总线触发器绑定以及使用该绑定的 PowerShell 函数。
下面是 function.json 文件中的绑定数据:
{
"bindings": [
{
"name": "mySbMsg",
"type": "serviceBusTrigger",
"direction": "in",
"topicName": "mytopic",
"subscriptionName": "mysubscription",
"connection": "AzureServiceBusConnectionString"
}
]
}
下面是发送服务总线消息时运行的函数。
param([string] $mySbMsg, $TriggerMetadata)
Write-Host "PowerShell ServiceBus queue trigger function processed message: $mySbMsg"
下面的示例演示如何通过触发器读取服务总线队列消息。 该示例取决于使用的是 v1 还是 v2 Python 编程模型。
import logging
import azure.functions as func
app = func.FunctionApp()
@app.function_name(name="ServiceBusQueueTrigger1")
@app.service_bus_queue_trigger(arg_name="msg",
queue_name="<QUEUE_NAME>",
connection="<CONNECTION_SETTING>")
def test_function(msg: func.ServiceBusMessage):
logging.info('Python ServiceBus queue trigger processed message: %s',
msg.get_body().decode('utf-8'))
下面的示例演示了如何通过触发器读取服务总线队列主题。
import logging
import azure.functions as func
app = func.FunctionApp()
@app.function_name(name="ServiceBusTopicTrigger1")
@app.service_bus_topic_trigger(arg_name="message",
topic_name="TOPIC_NAME",
connection="CONNECTION_SETTING",
subscription_name="SUBSCRIPTION_NAME")
def test_function(message: func.ServiceBusMessage):
message_body = message.get_body().decode("utf-8")
logging.info("Python ServiceBus topic trigger processed message.")
logging.info("Message Body: " + message_body)
特性
进程内和独立工作进程 C# 库都使用 ServiceBusTriggerAttribute 特性来定义函数触发器。 C# 脚本改用 function.json 配置文件,如 C# 脚本指南中所述。
下表说明了可使用此触发器特性设置的属性:
属性 | 说明 |
---|---|
QueueName | 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。 |
TopicName | 要监视的主题的名称。 仅在监视主题的情况下设置,不为队列设置。 |
SubscriptionName | 要监视的订阅的名称。 仅在监视主题的情况下设置,不为队列设置。 |
Connection | 指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接。 |
IsBatched | 消息分批传送。 需要数组或集合类型。 |
IsSessionsEnabled | 如果连接到true 队列或订阅,则为 true 。 否则为 false (默认值)。 |
AutoCompleteMessages | 如果触发器应在成功调用后自动完成消息,则为 true 。 如果不应,例如在代码中处理消息结算时,则为 false 。 如果未显式设置,则行为将基于 host.json 中的 autoCompleteMessages 配置。 |
在本地开发时,需要将应用程序设置添加到 Values
集合中的 local.settings.json 文件中。
修饰符
仅适用于 Python v2 编程模型。
对于使用修饰器定义的 Python v2 功能,支持 service_bus_queue_trigger
上的以下属性:
properties | 说明 |
---|---|
arg_name |
变量的名称,表示函数代码中的队列或主题消息。 |
queue_name |
要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。 |
connection |
指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接。 |
对于使用 function.json 定义的 Python 函数,请参阅“配置”部分。
批注
使用 ServiceBusQueueTrigger
注释可以创建在创建服务总线队列消息时要运行的函数。 可用的配置选项包括以下属性:
属性 | 说明 |
---|---|
name | 变量的名称,表示函数代码中的队列或主题消息。 |
queueName | 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。 |
topicName | 要监视的主题的名称。 仅在监视主题的情况下设置,不为队列设置。 |
subscriptionName | 要监视的订阅的名称。 仅在监视主题的情况下设置,不为队列设置。 |
连接 | 指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接。 |
使用 ServiceBusTopicTrigger
注释可以指定主题和订阅,以便以触发函数的数据为目标。
在本地开发时,请将应用程序设置添加到 Values
集合的 local.settings.json 文件。
有关更多详细信息,请参阅触发器示例。
配置
仅适用于 Python v1 编程模型。
下表说明了可以在传递给方法“app.serviceBusQueue()
”或“app.serviceBusTopic()
”的对象“options
”上设置的属性。
properties | 说明 |
---|---|
queueName | 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。 |
topicName | 要监视的主题的名称。 仅在监视主题的情况下设置,不为队列设置。 |
subscriptionName | 要监视的订阅的名称。 仅在监视主题的情况下设置,不为队列设置。 |
连接 | 指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接。 |
accessRights | 连接字符串的访问权限。 可用值为 manage 和 listen 。 默认值是 manage ,其指示 connection 具有“管理”权限。 如果使用不具有“管理”权限的连接字符串,请将 设置为“listen”。 否则,Functions 运行时可能会在尝试执行需要管理权限的操作时失败。 在 Azure Functions 版本 2.x 及更高版本中,此属性不可用,因为最新版本的服务总线 SDK 不支持管理操作。 |
isSessionsEnabled | 如果连接到true 队列或订阅,则为 true 。 否则为 false (默认值)。 |
autoComplete | 对于非 C# 函数,此项必须为 true ,这意味着触发器应在处理后自动调用 complete,或者你会通过函数代码手动调用 complete。设置为 true 时,触发器会在函数执行成功完成的情况下自动完成该消息,否则会放弃该消息。函数中的异常会导致运行时在后台调用 abandonAsync 。 如果未发生异常,则在后台调用 completeAsync 。 此属性仅在 Azure Functions 2.x 和更高版本中可用。 |
在本地开发时,请将应用程序设置添加到 Values
集合的 local.settings.json 文件。
下表解释了在 function.json 文件中设置的绑定配置属性。
“function.json”属性 | 说明 |
---|---|
type | 必须设置为 serviceBusTrigger 。 在 Azure 门户中创建触发器时,会自动设置此属性。 |
direction | 必须设置为“in”。 在 Azure 门户中创建触发器时,会自动设置此属性。 |
name | 变量的名称,表示函数代码中的队列或主题消息。 |
queueName | 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。 |
topicName | 要监视的主题的名称。 仅在监视主题的情况下设置,不为队列设置。 |
subscriptionName | 要监视的订阅的名称。 仅在监视主题的情况下设置,不为队列设置。 |
连接 | 指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接。 |
accessRights | 连接字符串的访问权限。 可用值为 manage 和 listen 。 默认值是 manage ,其指示 connection 具有“管理”权限。 如果使用不具有“管理”权限的连接字符串,请将 设置为“listen”。 否则,Functions 运行时可能会在尝试执行需要管理权限的操作时失败。 在 Azure Functions 版本 2.x 及更高版本中,此属性不可用,因为最新版本的服务总线 SDK 不支持管理操作。 |
isSessionsEnabled | 如果连接到true 队列或订阅,则为 true 。 否则为 false (默认值)。 |
autoComplete | 对于非 C# 函数,此项必须为 true ,这意味着触发器应在处理后自动调用 complete,或者你会通过函数代码手动调用 complete。设置为 true 时,触发器会在函数执行成功完成的情况下自动完成该消息,否则会放弃该消息。函数中的异常会导致运行时在后台调用 abandonAsync 。 如果未发生异常,则在后台调用 completeAsync 。 此属性仅在 Azure Functions 2.x 和更高版本中可用。 |
在本地开发时,请将应用程序设置添加到 Values
集合的 local.settings.json 文件。
有关完整示例,请参阅示例部分。
使用情况
所有 C# 形式和扩展版本都支持以下参数类型:
类型 | 说明 |
---|---|
System.String | 当消息为简单文本时使用。 |
byte[] | 用于二进制数据消息。 |
Object | 当消息包含 JSON 时,Functions 会尝试将 JSON 数据反序列化为已知的普通旧 CLR 对象类型。 |
特定于消息的参数类型包含其他消息元数据。 服务总线触发器支持的特定类型取决于 Functions 运行时版本、扩展包版本和所使用的 C# 形式。
如果希望函数处理单条消息,服务总线触发器可以绑定到以下类型:
类型 | 说明 |
---|---|
string |
字符串格式的消息。 当消息为简单文本时使用。 |
byte[] |
消息的字节数。 |
JSON 可序列化类型 | 当事件包含 JSON 数据时,Functions 会尝试将 JSON 数据反序列化为普通的旧 CLR 对象 (POCO) 类型。 |
ServiceBusReceivedMessage1 | 消息对象。 绑定到 ServiceBusReceivedMessage 时,还可以选择包含 ServiceBusMessageActions1,2 类型的参数来执行消息解决操作。 |
如果希望函数处理一批消息,服务总线触发器可以绑定到以下类型:
类型 | 说明 |
---|---|
T[] ,其中 T 是单消息类型之一 |
批处理中的事件数组。 每个条目表示一个事件。 绑定到 ServiceBusReceivedMessage[] 时,还可以选择包含 ServiceBusMessageActions1,2 类型的参数来执行消息解决操作。 |
1 要使用这些类型,需要引用 Microsoft.Azure.Functions.Worker.Extensions.ServiceBus 5.14.1 或更高版本以及 SDK 类型绑定的常见依赖项。
2 使用时,将AutoCompleteMessages
触发器属性的属性设置为 false
ServiceBusMessageActions
。 这可以防止运行时在成功调用函数后尝试完成消息。
未定义 Connection
属性时,Functions 会查找名为 AzureWebJobsServiceBus
的应用设置,这是服务总线连接字符串的默认名称。 还可以设置 Connection
属性来指定应用程序设置(其中包含要使用的服务总线连接字符串)的名称。
可以通过 ServiceBusQueueMessage
或 ServiceBusTopicMessage
参数获取传入的服务总线消息。
可以通过在 function.json 文件的名称属性中配置的参数来使用服务总线实例。
队列消息可通过类型为 func.ServiceBusMessage
的参数提供给函数。 服务总线消息作为字符串或 JSON 对象传递到函数中。
有关完整示例,请参阅示例部分。
连接
connection
属性是对环境配置的引用,它指定应用应该如何连接到服务总线。 它可能指定:
如果配置的值既是单个设置的完全匹配,也是其他设置的前缀匹配,则使用完全匹配。
连接字符串
若要获取连接字符串,请执行获取管理凭据中显示的步骤。 必须是服务总线命名空间的连接字符串,不限于特定的队列或主题。
此连接字符串应存储在应用程序设置中,其名称与绑定配置的 connection
属性指定的值匹配。
如果应用设置名称以“AzureWebJobs”开头,则只能指定该名称的余下部分。 例如,如果将 connection
设为“MyServiceBus”,Functions 运行时会查找名为“AzureWebJobsMyServiceBus”的应用设置。 如果将 connection
留空,函数运行时将使用名为“AzureWebJobsServiceBus”的应用设置中的默认服务总线连接字符串。
基于标识的连接
如果使用 5.x 或更高版本的扩展,可以让应用使用 Microsoft Entra 标识,而无需将连接字符串与机密一起使用。 为此,需要定义公共前缀下的设置,该前缀映射到触发器和绑定配置中的 connection
属性。
在此模式下,扩展需要以下属性:
属性 | 环境变量模板 | 说明 | 示例值 |
---|---|---|---|
完全限定的命名空间 | <CONNECTION_NAME_PREFIX>__fullyQualifiedNamespace |
完全限定的服务总线命名空间。 | <service_bus_namespace>.servicebus.windows.net |
可以设置其他属性来自定义连接。 请参阅基于标识的连接的通用属性。
注意
使用 Azure 应用程序配置或 Key Vault 为托管标识连接提供设置时,设置名称应使用有效的键分隔符(例如 :
或 /
)替代 __
,以确保正确解析名称。
例如 <CONNECTION_NAME_PREFIX>:fullyQualifiedNamespace
。
在 Azure Functions 服务中托管时,基于标识的连接将使用托管标识。 默认情况下使用系统分配的标识,但可以使用 credential
和 clientID
属性来指定用户分配的标识。 请注意,不支持为用户分配的标识配置资源 ID。 在其他上下文(如本地开发)中运行时,将改用开发人员标识,尽管可以进行自定义。 请参阅使用基于标识的连接进行本地开发。
向标识授予权限
无论使用何种标识,都必须具有执行所需操作的权限。 对于大多数 Azure 服务,这意味着你需要使用内置角色或者提供这些权限的自定义角色在 Azure RBAC 中分配角色。
重要
某些权限可能由并非所有上下文都需要的目标服务公开。 尽可能遵循最低权限原则,仅授予标识所需的权限。 例如,如果应用只需要从数据源进行读取即可,则使用仅具有读取权限的角色。 分配一个也具有该服务写入权限的角色并不恰当,因为对于读取操作来说,写入是多余的权限。 同样,你也希望确保角色分配的范围仅限于需要读取的资源。
你将需要创建一个角色分配,以便在运行时提供对主题和队列的访问权限。 所有者等管理角色还不够。 下表显示了在正常操作中使用服务总线扩展时建议使用的内置角色。 根据所编写的代码,应用程序可能需要具有其他权限。
绑定类型 | 内置角色示例 |
---|---|
触发器1 | Azure 服务总线数据接收方、Azure 服务总线数据所有者 |
输出绑定 | Azure 服务总线数据发送方 |
1 若要从服务总线主题触发,角色分配需要对服务总线订阅资源具有有效范围。 如果仅包含主题,将发生错误。 某些客户端(例如 Azure 门户)不会将服务总线订阅资源公开为角色分配的范围。 在这种情况下,可以改用 Azure CLI。 若要了解详细信息,请参阅适用于 Azure 服务总线的 Azure 内置角色。
有害消息
无法在 Azure Functions 中控制或配置有害消息处理。 服务总线处理有害消息本身。
PeekLock 行为
Functions 运行时以 PeekLock 模式接收消息。
默认情况下,如果函数成功完成,则运行时会对此消息调用 Complete
;如果函数失败,则调用 Abandon
。 可以通过 host.json
中的 autoCompleteMessages
属性禁用自动完成。
默认情况下,如果函数成功完成,则运行时会对此消息调用 Complete
;如果函数失败,则调用 Abandon
。 可以通过 host.json
中的 autoCompleteMessages
属性或通过触发器特性上的一个属性来禁用自动完成。 如果你的函数代码处理消息结算,则应禁用自动完成。
如果函数的运行时间长于 PeekLock
超时时间,则只要该函数正在运行,就会自动续订锁定。 maxAutoRenewDuration
在 host.json 中可配置,它映射到 serviceBusProcessor.MaxAutoLockRenewalDuration。 此设置的默认值为 5 分钟。
消息元数据
使用特定于消息传送的类型,你可以轻松地检索作为对象属性的元数据。 这些属性取决于 Functions 运行时版本、扩展包版本和所使用的 C# 形式。
这些属性是 ServiceBusReceivedMessage 类的成员。
属性 | 类型 | 说明 |
---|---|---|
ApplicationProperties |
ApplicationProperties |
由发送方设置的属性。 |
ContentType |
string |
发送方和接收方用于实现应用程序特定逻辑的内容类型标识符。 |
CorrelationId |
string |
相关 ID。 |
DeliveryCount |
Int32 |
传递次数。 |
EnqueuedTime |
DateTime |
排队时间 (UTC)。 |
ScheduledEnqueueTimeUtc |
DateTime |
计划的排队时间 (UTC)。 |
ExpiresAt |
DateTime |
到期时间 (UTC)。 |
MessageId |
string |
服务总线可用于标识重复消息的用户定义值(如果启用)。 |
ReplyTo |
string |
对队列地址的回复。 |
Subject |
string |
特定于应用程序的标签,可用于替代 Label 元数据属性。 |
To |
string |
发送到地址。 |