你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure Functions 的 Azure 服务总线触发器

使用服务总线触发器响应来自服务总线队列或主题的消息。 从扩展版本 3.1.0 开始,可以在启用会话的队列或主题上触发。

有关设置和配置详细信息,请参阅概述

消耗计划和高级计划的服务总线缩放决策取决于基于目标的缩放。 有关详细信息,请参阅基于目标的缩放

重要

本文使用选项卡来支持多个版本的 Node.js 编程模型。 v4 模型已正式发布,旨在为 JavaScript 和 TypeScript 开发人员提供更为灵活和直观的体验。 有关 v4 模型工作原理的更多详细信息,请参阅 Azure Functions Node.js 开发人员指南。 要详细了解 v3 和 v4 之间的差异,请参阅迁移指南

Azure Functions 支持两种 Python 编程模型。 定义绑定的方式取决于选择的编程模型。

使用 Python v2 编程模型,可以直接在 Python 函数代码中使用修饰器定义绑定。 有关详细信息,请参阅 Python 开发人员指南

本文同时支持两个编程模型。

示例

可使用以下 C# 模式之一来创建 C# 函数:

  • 独立辅助角色模型:编译的 C# 函数,该函数在独立于运行时的工作进程中运行。 需要独立工作进程才能支持在 LTS 和非 LTS 版 .NET 和 .NET Framework 上运行的 C# 函数。 独立工作进程函数的扩展使用 Microsoft.Azure.Functions.Worker.Extensions.* 命名空间。
  • 进程内模型:编译的 C# 函数,该函数在与 Functions 运行时相同的进程中运行。 在此模型的变体中,可以使用 C# 脚本运行 Functions,该脚本主要用于 C# 门户编辑。 进程内函数的扩展使用 Microsoft.Azure.WebJobs.Extensions.* 命名空间。

此代码定义并初始化 ILogger

private readonly ILogger<ServiceBusReceivedMessageFunctions> _logger;

public ServiceBusReceivedMessageFunctions(ILogger<ServiceBusReceivedMessageFunctions> logger)
{
    _logger = logger;
}

此示例演示接收单个服务总线队列消息并将其写入日志的 C# 函数

[Function(nameof(ServiceBusReceivedMessageFunction))]
[ServiceBusOutput("outputQueue", Connection = "ServiceBusConnection")]
public string ServiceBusReceivedMessageFunction(
    [ServiceBusTrigger("queue", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message)
{
    _logger.LogInformation("Message ID: {id}", message.MessageId);
    _logger.LogInformation("Message Body: {body}", message.Body);
    _logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);

    var outputMessage = $"Output message created at {DateTime.Now}";
    return outputMessage;
}

此示例演示一个 C# 函数,该函数接收单个批处理中的多个服务总线队列消息,并将每个消息写入日志:

[Function(nameof(ServiceBusReceivedMessageBatchFunction))]
public void ServiceBusReceivedMessageBatchFunction(
    [ServiceBusTrigger("queue", Connection = "ServiceBusConnection", IsBatched = true)] ServiceBusReceivedMessage[] messages)
{
    foreach (ServiceBusReceivedMessage message in messages)
    {
        _logger.LogInformation("Message ID: {id}", message.MessageId);
        _logger.LogInformation("Message Body: {body}", message.Body);
        _logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
    }
}

此示例演示一个 C# 函数,该函数接收多个服务总线队列消息,将其写入日志,然后将消息处置为已完成:

[Function(nameof(ServiceBusMessageActionsFunction))]
public async Task ServiceBusMessageActionsFunction(
    [ServiceBusTrigger("queue", Connection = "ServiceBusConnection", AutoCompleteMessages = false)]
    ServiceBusReceivedMessage message,
    ServiceBusMessageActions messageActions)
{
    _logger.LogInformation("Message ID: {id}", message.MessageId);
    _logger.LogInformation("Message Body: {body}", message.Body);
    _logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);

    // Complete the message
    await messageActions.CompleteMessageAsync(message);
}

以下 Java 函数使用 @ServiceBusQueueTrigger中的 @ServiceBusQueueTrigger 注释来说明服务总线队列触发器的配置。 此函数获取放置在队列上的消息,然后将其添加到日志。

@FunctionName("sbprocessor")
 public void serviceBusProcess(
    @ServiceBusQueueTrigger(name = "msg",
                             queueName = "myqueuename",
                             connection = "myconnvarname") String message,
   final ExecutionContext context
 ) {
     context.getLogger().info(message);
 }

将消息添加到服务总线主题时,也可触发 Java 函数。 以下示例使用 @ServiceBusTopicTrigger 注释来说明触发器配置。

@FunctionName("sbtopicprocessor")
    public void run(
        @ServiceBusTopicTrigger(
            name = "message",
            topicName = "mytopicname",
            subscriptionName = "mysubscription",
            connection = "ServiceBusConnection"
        ) String message,
        final ExecutionContext context
    ) {
        context.getLogger().info(message);
    }

以下示例显示了服务总线触发器 TypeScript 函数。 此函数将读取消息元数据并记录服务总线队列消息。

import { app, InvocationContext } from '@azure/functions';

export async function serviceBusQueueTrigger1(message: unknown, context: InvocationContext): Promise<void> {
    context.log('Service bus queue function processed message:', message);
    context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
    context.log('DeliveryCount =', context.triggerMetadata.deliveryCount);
    context.log('MessageId =', context.triggerMetadata.messageId);
}

app.serviceBusQueue('serviceBusQueueTrigger1', {
    connection: 'MyServiceBusConnection',
    queueName: 'testqueue',
    handler: serviceBusQueueTrigger1,
});

以下示例显示了服务总线触发器 JavaScript 函数。 此函数将读取消息元数据并记录服务总线队列消息。

const { app } = require('@azure/functions');

app.serviceBusQueue('serviceBusQueueTrigger1', {
    connection: 'MyServiceBusConnection',
    queueName: 'testqueue',
    handler: (message, context) => {
        context.log('Service bus queue function processed message:', message);
        context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
        context.log('DeliveryCount =', context.triggerMetadata.deliveryCount);
        context.log('MessageId =', context.triggerMetadata.messageId);
    },
});

以下示例演示了 function.json 文件中的服务总线触发器绑定以及使用该绑定的 PowerShell 函数

下面是 function.json 文件中的绑定数据:

{
  "bindings": [
    {
      "name": "mySbMsg",
      "type": "serviceBusTrigger",
      "direction": "in",
      "topicName": "mytopic",
      "subscriptionName": "mysubscription",
      "connection": "AzureServiceBusConnectionString"
    }
  ]
}

下面是发送服务总线消息时运行的函数。

param([string] $mySbMsg, $TriggerMetadata)

Write-Host "PowerShell ServiceBus queue trigger function processed message: $mySbMsg"

下面的示例演示如何通过触发器读取服务总线队列消息。 该示例取决于使用的是 v1 还是 v2 Python 编程模型

import logging
import azure.functions as func

app = func.FunctionApp()

@app.function_name(name="ServiceBusQueueTrigger1")
@app.service_bus_queue_trigger(arg_name="msg", 
                               queue_name="<QUEUE_NAME>", 
                               connection="<CONNECTION_SETTING>")
def test_function(msg: func.ServiceBusMessage):
    logging.info('Python ServiceBus queue trigger processed message: %s',
                 msg.get_body().decode('utf-8'))

下面的示例演示了如何通过触发器读取服务总线队列主题。

import logging
import azure.functions as func

app = func.FunctionApp()

@app.function_name(name="ServiceBusTopicTrigger1")
@app.service_bus_topic_trigger(arg_name="message", 
                               topic_name="TOPIC_NAME", 
                               connection="CONNECTION_SETTING", 
                               subscription_name="SUBSCRIPTION_NAME")
def test_function(message: func.ServiceBusMessage):
    message_body = message.get_body().decode("utf-8")
    logging.info("Python ServiceBus topic trigger processed message.")
    logging.info("Message Body: " + message_body)

特性

进程内独立工作进程 C# 库都使用 ServiceBusTriggerAttribute 特性来定义函数触发器。 C# 脚本改用 function.json 配置文件,如 C# 脚本指南中所述。

下表说明了可使用此触发器特性设置的属性:

属性 说明
QueueName 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。
TopicName 要监视的主题的名称。 仅在监视主题的情况下设置,不为队列设置。
SubscriptionName 要监视的订阅的名称。 仅在监视主题的情况下设置,不为队列设置。
Connection 指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接
IsBatched 消息分批传送。 需要数组或集合类型。
IsSessionsEnabled 如果连接到true队列或订阅,则为 true。 否则为 false(默认值)。
AutoCompleteMessages 如果触发器应在成功调用后自动完成消息,则为 true。 如果不应,例如在代码中处理消息结算时,则为 false。 如果未显式设置,则行为将基于 host.json 中的 autoCompleteMessages 配置

在本地开发时,需要将应用程序设置添加到 Values 集合中的 local.settings.json 文件中。

修饰符

仅适用于 Python v2 编程模型。

对于使用修饰器定义的 Python v2 功能,支持 service_bus_queue_trigger 上的以下属性:

properties 说明
arg_name 变量的名称,表示函数代码中的队列或主题消息。
queue_name 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。
connection 指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接

对于使用 function.json 定义的 Python 函数,请参阅“配置”部分。

批注

使用 ServiceBusQueueTrigger 注释可以创建在创建服务总线队列消息时要运行的函数。 可用的配置选项包括以下属性:

属性 说明
name 变量的名称,表示函数代码中的队列或主题消息。
queueName 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。
topicName 要监视的主题的名称。 仅在监视主题的情况下设置,不为队列设置。
subscriptionName 要监视的订阅的名称。 仅在监视主题的情况下设置,不为队列设置。
连接 指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接

使用 ServiceBusTopicTrigger 注释可以指定主题和订阅,以便以触发函数的数据为目标。

在本地开发时,请将应用程序设置添加到 Values 集合的 local.settings.json 文件

有关更多详细信息,请参阅触发器示例

配置

仅适用于 Python v1 编程模型

下表说明了可以在传递给方法“app.serviceBusQueue()”或“app.serviceBusTopic()”的对象“options”上设置的属性。

properties 说明
queueName 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。
topicName 要监视的主题的名称。 仅在监视主题的情况下设置,不为队列设置。
subscriptionName 要监视的订阅的名称。 仅在监视主题的情况下设置,不为队列设置。
连接 指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接
accessRights 连接字符串的访问权限。 可用值为 managelisten。 默认值是 manage,其指示 connection 具有“管理”权限。 如果使用不具有“管理”权限的连接字符串,请将 设置为“listen”。 否则,Functions 运行时可能会在尝试执行需要管理权限的操作时失败。 在 Azure Functions 版本 2.x 及更高版本中,此属性不可用,因为最新版本的服务总线 SDK 不支持管理操作。
isSessionsEnabled 如果连接到true队列或订阅,则为 true。 否则为 false(默认值)。
autoComplete 对于非 C# 函数,此项必须为 true,这意味着触发器应在处理后自动调用 complete,或者你会通过函数代码手动调用 complete。

设置为 true 时,触发器会在函数执行成功完成的情况下自动完成该消息,否则会放弃该消息。

函数中的异常会导致运行时在后台调用 abandonAsync。 如果未发生异常,则在后台调用 completeAsync。 此属性仅在 Azure Functions 2.x 和更高版本中可用。

在本地开发时,请将应用程序设置添加到 Values 集合的 local.settings.json 文件

下表解释了在 function.json 文件中设置的绑定配置属性。

“function.json”属性 说明
type 必须设置为 serviceBusTrigger。 在 Azure 门户中创建触发器时,会自动设置此属性。
direction 必须设置为“in”。 在 Azure 门户中创建触发器时,会自动设置此属性。
name 变量的名称,表示函数代码中的队列或主题消息。
queueName 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。
topicName 要监视的主题的名称。 仅在监视主题的情况下设置,不为队列设置。
subscriptionName 要监视的订阅的名称。 仅在监视主题的情况下设置,不为队列设置。
连接 指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接
accessRights 连接字符串的访问权限。 可用值为 managelisten。 默认值是 manage,其指示 connection 具有“管理”权限。 如果使用不具有“管理”权限的连接字符串,请将 设置为“listen”。 否则,Functions 运行时可能会在尝试执行需要管理权限的操作时失败。 在 Azure Functions 版本 2.x 及更高版本中,此属性不可用,因为最新版本的服务总线 SDK 不支持管理操作。
isSessionsEnabled 如果连接到true队列或订阅,则为 true。 否则为 false(默认值)。
autoComplete 对于非 C# 函数,此项必须为 true,这意味着触发器应在处理后自动调用 complete,或者你会通过函数代码手动调用 complete。

设置为 true 时,触发器会在函数执行成功完成的情况下自动完成该消息,否则会放弃该消息。

函数中的异常会导致运行时在后台调用 abandonAsync。 如果未发生异常,则在后台调用 completeAsync。 此属性仅在 Azure Functions 2.x 和更高版本中可用。

在本地开发时,请将应用程序设置添加到 Values 集合的 local.settings.json 文件

有关完整示例,请参阅示例部分

使用情况

所有 C# 形式和扩展版本都支持以下参数类型:

类型 说明
System.String 当消息为简单文本时使用。
byte[] 用于二进制数据消息。
Object 当消息包含 JSON 时,Functions 会尝试将 JSON 数据反序列化为已知的普通旧 CLR 对象类型。

特定于消息的参数类型包含其他消息元数据。 服务总线触发器支持的特定类型取决于 Functions 运行时版本、扩展包版本和所使用的 C# 形式。

如果希望函数处理单条消息,服务总线触发器可以绑定到以下类型:

类型 说明
string 字符串格式的消息。 当消息为简单文本时使用。
byte[] 消息的字节数。
JSON 可序列化类型 当事件包含 JSON 数据时,Functions 会尝试将 JSON 数据反序列化为普通的旧 CLR 对象 (POCO) 类型。
ServiceBusReceivedMessage1 消息对象。

绑定到ServiceBusReceivedMessage时,还可以选择包含 ServiceBusMessageActions1,2 类型的参数来执行消息解决操作。

如果希望函数处理一批消息,服务总线触发器可以绑定到以下类型:

类型 说明
T[],其中 T 是单消息类型之一 批处理中的事件数组。 每个条目表示一个事件。

绑定到ServiceBusReceivedMessage[]时,还可以选择包含 ServiceBusMessageActions1,2 类型的参数来执行消息解决操作。

1 要使用这些类型,需要引用 Microsoft.Azure.Functions.Worker.Extensions.ServiceBus 5.14.1 或更高版本以及 SDK 类型绑定的常见依赖项

2 使用时,将AutoCompleteMessages触发器属性的属性设置为 falseServiceBusMessageActions。 这可以防止运行时在成功调用函数后尝试完成消息。

未定义 Connection 属性时,Functions 会查找名为 AzureWebJobsServiceBus 的应用设置,这是服务总线连接字符串的默认名称。 还可以设置 Connection 属性来指定应用程序设置(其中包含要使用的服务总线连接字符串)的名称。

可以通过 ServiceBusQueueMessageServiceBusTopicMessage 参数获取传入的服务总线消息。

将队列或主题消息作为函数的第一个参数访问。 服务总线消息作为字符串或 JSON 对象传递到函数中。

可以通过在 function.json 文件的名称属性中配置的参数来使用服务总线实例。

队列消息可通过类型为 func.ServiceBusMessage 的参数提供给函数。 服务总线消息作为字符串或 JSON 对象传递到函数中。

有关完整示例,请参阅示例部分

连接

connection 属性是对环境配置的引用,它指定应用应该如何连接到服务总线。 它可能指定:

如果配置的值既是单个设置的完全匹配,也是其他设置的前缀匹配,则使用完全匹配。

连接字符串

若要获取连接字符串,请执行获取管理凭据中显示的步骤。 必须是服务总线命名空间的连接字符串,不限于特定的队列或主题。

此连接字符串应存储在应用程序设置中,其名称与绑定配置的 connection 属性指定的值匹配。

如果应用设置名称以“AzureWebJobs”开头,则只能指定该名称的余下部分。 例如,如果将 connection 设为“MyServiceBus”,Functions 运行时会查找名为“AzureWebJobsMyServiceBus”的应用设置。 如果将 connection 留空,函数运行时将使用名为“AzureWebJobsServiceBus”的应用设置中的默认服务总线连接字符串。

基于标识的连接

如果使用 5.x 或更高版本的扩展,可以让应用使用 Microsoft Entra 标识,而无需将连接字符串与机密一起使用。 为此,需要定义公共前缀下的设置,该前缀映射到触发器和绑定配置中的 connection 属性。

在此模式下,扩展需要以下属性:

属性 环境变量模板 说明 示例值
完全限定的命名空间 <CONNECTION_NAME_PREFIX>__fullyQualifiedNamespace 完全限定的服务总线命名空间。 <service_bus_namespace>.servicebus.windows.net

可以设置其他属性来自定义连接。 请参阅基于标识的连接的通用属性

注意

使用 Azure 应用程序配置Key Vault 为托管标识连接提供设置时,设置名称应使用有效的键分隔符(例如 :/)替代 __,以确保正确解析名称。

例如 <CONNECTION_NAME_PREFIX>:fullyQualifiedNamespace

在 Azure Functions 服务中托管时,基于标识的连接将使用托管标识。 默认情况下使用系统分配的标识,但可以使用 credentialclientID 属性来指定用户分配的标识。 请注意,不支持为用户分配的标识配置资源 ID。 在其他上下文(如本地开发)中运行时,将改用开发人员标识,尽管可以进行自定义。 请参阅使用基于标识的连接进行本地开发

向标识授予权限

无论使用何种标识,都必须具有执行所需操作的权限。 对于大多数 Azure 服务,这意味着你需要使用内置角色或者提供这些权限的自定义角色在 Azure RBAC 中分配角色

重要

某些权限可能由并非所有上下文都需要的目标服务公开。 尽可能遵循最低权限原则,仅授予标识所需的权限。 例如,如果应用只需要从数据源进行读取即可,则使用仅具有读取权限的角色。 分配一个也具有该服务写入权限的角色并不恰当,因为对于读取操作来说,写入是多余的权限。 同样,你也希望确保角色分配的范围仅限于需要读取的资源。

你将需要创建一个角色分配,以便在运行时提供对主题和队列的访问权限。 所有者等管理角色还不够。 下表显示了在正常操作中使用服务总线扩展时建议使用的内置角色。 根据所编写的代码,应用程序可能需要具有其他权限。

绑定类型 内置角色示例
触发器1 Azure 服务总线数据接收方Azure 服务总线数据所有者
输出绑定 Azure 服务总线数据发送方

1 若要从服务总线主题触发,角色分配需要对服务总线订阅资源具有有效范围。 如果仅包含主题,将发生错误。 某些客户端(例如 Azure 门户)不会将服务总线订阅资源公开为角色分配的范围。 在这种情况下,可以改用 Azure CLI。 若要了解详细信息,请参阅适用于 Azure 服务总线的 Azure 内置角色

有害消息

无法在 Azure Functions 中控制或配置有害消息处理。 服务总线处理有害消息本身。

PeekLock 行为

Functions 运行时以 PeekLock 模式接收消息。

默认情况下,如果函数成功完成,则运行时会对此消息调用 Complete;如果函数失败,则调用 Abandon。 可以通过 host.json 中的 autoCompleteMessages 属性禁用自动完成。

默认情况下,如果函数成功完成,则运行时会对此消息调用 Complete;如果函数失败,则调用 Abandon。 可以通过 host.json 中的 autoCompleteMessages 属性或通过触发器特性上的一个属性来禁用自动完成。 如果你的函数代码处理消息结算,则应禁用自动完成。

如果函数的运行时间长于 PeekLock 超时时间,则只要该函数正在运行,就会自动续订锁定。 maxAutoRenewDurationhost.json 中可配置,它映射到 serviceBusProcessor.MaxAutoLockRenewalDuration。 此设置的默认值为 5 分钟。

消息元数据

使用特定于消息传送的类型,你可以轻松地检索作为对象属性的元数据。 这些属性取决于 Functions 运行时版本、扩展包版本和所使用的 C# 形式。

这些属性是 ServiceBusReceivedMessage 类的成员。

属性 类型​​ 说明
ApplicationProperties ApplicationProperties 由发送方设置的属性。
ContentType string 发送方和接收方用于实现应用程序特定逻辑的内容类型标识符。
CorrelationId string 相关 ID。
DeliveryCount Int32 传递次数。
EnqueuedTime DateTime 排队时间 (UTC)。
ScheduledEnqueueTimeUtc DateTime 计划的排队时间 (UTC)。
ExpiresAt DateTime 到期时间 (UTC)。
MessageId string 服务总线可用于标识重复消息的用户定义值(如果启用)。
ReplyTo string 对队列地址的回复。
Subject string 特定于应用程序的标签,可用于替代 Label 元数据属性。
To string 发送到地址。

后续步骤