你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure Functions 的 Azure IoT 中心触发器

本文介绍如何使用 IoT 中心的 Azure Functions 绑定。 IoT 中心支持基于 Azure 事件中心绑定

有关设置和配置详细信息,请参阅概述

重要

虽然下述代码示例使用事件中心 API,但给定的语法适用于 IoT 中心函数。

使用函数触发器来响应发送到事件中心事件流的事件。 若要设置触发器,必须具有基础事件中心的读取访问权限。 触发函数时,传递给函数的消息充当字符串类型。

面向消耗计划和高级计划的事件中心缩放决策是通过基于目标的缩放完成的。 有关详细信息,请参阅基于目标的缩放

有关 Azure Functions 如何使用触发器响应发送到事件中心事件流的事件的信息,请参阅将事件中心与 Azure 上的无服务器函数集成

重要

本文使用选项卡来支持多个版本的 Node.js 编程模型。 v4 模型已正式发布,旨在为 JavaScript 和 TypeScript 开发人员提供更为灵活和直观的体验。 有关 v4 模型工作原理的更多详细信息,请参阅 Azure Functions Node.js 开发人员指南。 要详细了解 v3 和 v4 之间的差异,请参阅迁移指南

Azure Functions 支持两种 Python 编程模型。 定义绑定的方式取决于选择的编程模型。

使用 Python v2 编程模型,可以直接在 Python 函数代码中使用修饰器定义绑定。 有关详细信息,请参阅 Python 开发人员指南

本文同时支持两个编程模型。

示例

以下示例演示了一个 C# 函数,该函数基于事件中心而触发,其中输入消息字符串会写入到日志中:

{
    private readonly ILogger<EventHubsFunction> _logger;

    public EventHubsFunction(ILogger<EventHubsFunction> logger)
    {
        _logger = logger;
    }

    [Function(nameof(EventHubFunction))]
    [FixedDelayRetry(5, "00:00:10")]
    [EventHubOutput("dest", Connection = "EventHubConnection")]
    public string EventHubFunction(
        [EventHubTrigger("src", Connection = "EventHubConnection")] string[] input,
        FunctionContext context)
    {
        _logger.LogInformation("First Event Hubs triggered message: {msg}", input[0]);

        var message = $"Output message created at {DateTime.Now}";
        return message;
    }

以下示例展示了事件中心触发器 - TypeScript 函数。 此函数将读取事件元数据并记录消息。

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(message: unknown, context: InvocationContext): Promise<void> {
    context.log('Event hub function processed message:', message);
    context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
    context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
    context.log('Offset =', context.triggerMetadata.offset);
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: eventHubTrigger1,
});

若要批量接收事件,请将 cardinality 设置为 many,如以下示例所示。

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(messages: unknown[], context: InvocationContext): Promise<void> {
    context.log(`Event hub function processed ${messages.length} messages`);
    for (let i = 0; i < messages.length; i++) {
        context.log('Event hub message:', messages[i]);
        context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
        context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
        context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
    }
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: eventHubTrigger1,
});

以下示例展示了事件中心触发器 - JavaScript 函数。 此函数将读取事件元数据并记录消息。

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: (message, context) => {
        context.log('Event hub function processed message:', message);
        context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
        context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
        context.log('Offset =', context.triggerMetadata.offset);
    },
});

若要批量接收事件,请将 cardinality 设置为 many,如以下示例所示。

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: (messages, context) => {
        context.log(`Event hub function processed ${messages.length} messages`);
        for (let i = 0; i < messages.length; i++) {
            context.log('Event hub message:', messages[i]);
            context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
            context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
            context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
        }
    },
});

下面是 PowerShell 代码:

param($eventHubMessages, $TriggerMetadata)

Write-Host "PowerShell eventhub trigger function called for message array: $eventHubMessages"

$eventHubMessages | ForEach-Object { Write-Host "Processed message: $_" }

以下示例演示了一个事件中心触发器绑定以及使用该绑定的 Python 函数。 此函数将读取事件元数据并记录消息。 该示例取决于是使用 v1 还是 v2 Python 编程模型

import logging
import azure.functions as func

app = func.FunctionApp()

@app.function_name(name="EventHubTrigger1")
@app.event_hub_message_trigger(arg_name="myhub", 
                               event_hub_name="<EVENT_HUB_NAME>",
                               connection="<CONNECTION_SETTING>") 
def test_function(myhub: func.EventHubEvent):
    logging.info('Python EventHub trigger processed an event: %s',
                myhub.get_body().decode('utf-8'))

以下示例演示了一个记录事件中心触发器消息正文的事件中心触发器绑定。

@FunctionName("ehprocessor")
public void eventHubProcessor(
  @EventHubTrigger(name = "msg",
                  eventHubName = "myeventhubname",
                  connection = "myconnvarname") String message,
       final ExecutionContext context )
       {
          context.getLogger().info(message);
 }

Java 函数运行时库中,对其值来自事件中心的参数使用 EventHubTrigger 注释。 带有这些注释的参数会导致函数在事件到达时运行。 可以将此注释与本机 Java 类型、POJO 或使用了 Optional<T> 的可为 null 的值一起使用。

以下示例演示了广泛使用 SystemProperties 和其他绑定选项以进一步自检事件,并提供格式标准的 BlobOutput 路径(日期分层)。

package com.example;
import java.util.Map;
import java.time.ZonedDateTime;

import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;

/**
 * Azure Functions with Event Hub trigger.
 * and Blob Output using date in path along with message partition ID
 * and message sequence number from EventHub Trigger Properties
 */
public class EventHubReceiver {

    @FunctionName("EventHubReceiver")
    @StorageAccount("bloboutput")

    public void run(
            @EventHubTrigger(name = "message",
                eventHubName = "%eventhub%",
                consumerGroup = "%consumergroup%",
                connection = "eventhubconnection",
                cardinality = Cardinality.ONE)
            String message,

            final ExecutionContext context,

            @BindingName("Properties") Map<String, Object> properties,
            @BindingName("SystemProperties") Map<String, Object> systemProperties,
            @BindingName("PartitionContext") Map<String, Object> partitionContext,
            @BindingName("EnqueuedTimeUtc") Object enqueuedTimeUtc,

            @BlobOutput(
                name = "outputItem",
                path = "iotevents/{datetime:yy}/{datetime:MM}/{datetime:dd}/{datetime:HH}/" +
                       "{datetime:mm}/{PartitionContext.PartitionId}/{SystemProperties.SequenceNumber}.json")
            OutputBinding<String> outputItem) {

        var et = ZonedDateTime.parse(enqueuedTimeUtc + "Z"); // needed as the UTC time presented does not have a TZ
                                                             // indicator
        context.getLogger().info("Event hub message received: " + message + ", properties: " + properties);
        context.getLogger().info("Properties: " + properties);
        context.getLogger().info("System Properties: " + systemProperties);
        context.getLogger().info("partitionContext: " + partitionContext);
        context.getLogger().info("EnqueuedTimeUtc: " + et);

        outputItem.setValue(message);
    }
}

特性

进程内独立工作进程 C# 库都使用特性来配置触发器。 C# 脚本改用 function.json 配置文件,如 C# 脚本指南中所述。

使用 EventHubTriggerAttribute 在事件中心上定义触发器,EventHubTriggerAttribute 支持以下属性。

参数 说明
EventHubName 事件中心的名称。 当事件中心名称也出现在连接字符串中时,该值会在运行时覆盖此属性。 可以在应用设置中引用,例如 %eventHubName%
ConsumerGroup 一个可选属性,用于设置使用者组,该组用于订阅事件中心中的事件。 省略时,将使用 $Default 使用者组。
Connection 指定如何连接到事件中心的应用设置或设置集合的名称。 若要了解详细信息,请参阅连接

修饰符

仅适用于 Python v2 编程模型。

对于使用修饰器定义的 Python v2 功能,支持 event_hub_message_trigger 上的以下属性:

properties 说明
arg_name 在函数代码中表示事件项的变量的名称。
event_hub_name 事件中心的名称。 当事件中心名称也出现在连接字符串中时,该值会在运行时覆盖此属性。
connection 指定如何连接到事件中心的应用设置或设置集合的名称。 请参阅连接

对于使用 function.json 定义的 Python 函数,请参阅“配置”部分。

批注

Java 函数运行时库中,使用支持以下设置的 EventHubTrigger 注释:

配置

仅适用于 Python v1 编程模型

下表说明了可以在传递给 app.eventHub() 方法的 options 对象上设置的属性。

properties 说明
eventHubName 事件中心的名称。 当事件中心名称也出现在连接字符串中时,该值会在运行时覆盖此属性。 可以通过应用设置 %eventHubName% 进行引用
consumerGroup 一个可选属性,用于设置使用者组,该组用于订阅事件中心中的事件。 如果将其省略,则会使用 $Default 使用者组。
基数 设为 many 以启用批处理。 如果省略或设为 one,将向函数传递一条消息。
连接 指定如何连接到事件中心的应用设置或设置集合的名称。 请参阅连接

下表说明了在 function.json 文件中设置的触发器配置属性,这些属性因运行时版本而异。

function.json 属性 说明
type 必须设置为 eventHubTrigger。 在 Azure 门户中创建触发器时,会自动设置此属性。
direction 必须设置为 in。 在 Azure 门户中创建触发器时,会自动设置此属性。
name 在函数代码中表示事件项的变量的名称。
eventHubName 事件中心的名称。 当事件中心名称也出现在连接字符串中时,该值会在运行时覆盖此属性。 可以通过应用设置 %eventHubName% 进行引用
consumerGroup 一个可选属性,用于设置使用者组,该组用于订阅事件中心中的事件。 如果将其省略,则会使用 $Default 使用者组。
基数 设为 many 以启用批处理。 如果省略或设为 one,将向函数传递一条消息。
连接 指定如何连接到事件中心的应用设置或设置集合的名称。 请参阅连接

在本地开发时,需要将应用程序设置添加到 Values 集合中的 local.settings.json 文件中。

使用情况

要详细了解事件中心触发器和 IoT 中心触发器的缩放方式,请参阅通过 Azure Functions 使用事件

事件中心输出绑定支持的参数类型取决于所用的 Functions 运行时版本、扩展包版本以及 C# 模态。

如果你希望函数处理单个事件,可将事件中心触发器绑定到以下类型:

类型 说明
string 字符串形式的事件。 当事件为简单文本时使用。
byte[] 事件的字节数。
JSON 可序列化类型 当事件包含 JSON 数据时,Functions 会尝试将 JSON 数据反序列化为普通的旧 CLR 对象 (POCO) 类型。
Azure.Messaging.EventHubs.EventData1 事件对象。
如果要从事件中心 SDK 的任何旧版本迁移,请注意,此版本将放弃对旧 Body 类型的支持,转而支持 EventBody

如果你希望函数处理批量事件,可将事件中心触发器绑定到以下类型:

类型 说明
string[] 批处理中的事件数组,以字符串形式表示。 每个条目表示一个事件。
EventData[] 1 批处理中的事件数组,以 Azure.Messaging.EventHubs.EventData 的实例形式表示。 每个条目表示一个事件。
T[],其中 T 是 JSON 可序列化类型1 批处理中的事件数组,以自定义 POCO 类型的实例形式表示。 每个条目表示一个事件。

1 若要使用这些类型,需要引用 Microsoft.Azure.Functions.Worker.Extensions.EventHubs 5.5.0 或更高版本以及 SDK 类型绑定的常见依赖项

参数类型可以为以下类型之一:

  • 任意的本机 Java 类型,例如 int、String、byte[]。
  • 使用可选的可为 null 的值。
  • 任意 POJO 类型。

若要了解详细信息,请参阅 EventHubTrigger 参考。

事件元数据

事件中心触发器提供了几个元数据属性。 元数据属性可在其他绑定中用作绑定表达式的一部分,或者用作代码中的参数。 属性来自 EventData 类。

属性 类型​​ 说明
PartitionContext PartitionContext PartitionContext 实例。
EnqueuedTimeUtc DateTime 排队时间 (UTC)。
Offset string 数据相对于事件中心分区流的偏移量。 偏移量是事件中心流中的事件的标记或标识符。 该标识符在事件中心流的分区中是惟一的。
PartitionKey string 事件数据应该发送到的分区。
Properties IDictionary<String,Object> 事件数据的用户属性。
SequenceNumber Int64 事件的逻辑序列号。
SystemProperties IDictionary<String,Object> 系统属性,包括事件数据。

请参阅在本文的前面部分使用这些属性的代码示例

连接

connection 属性是对包含应用程序设置(其中包含连接字符串)名称的环境配置的引用。 可以通过选择命名空间对应的“连接信息”按钮来获取此连接字符串。 连接字符串必须用于事件中心命名空间,而不是事件中心本身。

该连接字符串必须至少拥有“读取”权限才可激活函数。

此连接字符串应存储在应用程序设置中,其名称与绑定配置的 connection 属性指定的值匹配。

注意

IoT 中心触发器不支持基于标识的连接。 如果需要端到端使用托管标识,可以改用 IoT 中心路由将数据发送到你控制的事件中心。 这样,便可以使用托管标识对出站路由进行身份验证,并可以使用托管标识从该事件中心读取事件。

host.json 属性

host.json 文件包含控制事件中心触发器行为的设置。 有关可用设置的详细信息,请参阅 host.json 设置部分。

后续步骤