Триггер Центра Интернета вещей Azure для Функций Azure

В этой статье объясняется, как работать с привязками Функций Azure для Центра Интернета вещей. Поддержка Центра Интернета вещей реализована на базе привязок Центров событий Azure.

Сведения об установке и настройке см. в обзорной статье.

Внимание

Хотя в следующих примерах кода используется API Центра событий, соответствующий синтаксис применим к функциям Центра Интернета вещей.

Используйте триггер функции для ответа на событие, отправленное в поток событий концентратора событий. Чтобы настроить триггер, необходимо иметь доступ на чтение к базовому концентратору событий. При активации функции в строковом виде вводится сообщение, передаваемое в функцию.

Решения по масштабированию Центров событий для планов потребления и уровня "Премиум" выполняются с помощью масштабирования на основе целевого объекта. Дополнительные сведения см. в разделе "Масштабирование на основе целевого объекта".

Сведения о том, как Функции Azure реагирует на события, отправленные в поток событий концентратора событий с помощью триггеров, см. в статье Интеграция Центров событий с бессерверными функциями в Azure.

Внимание

В этой статье используются вкладки для поддержки нескольких версий модели программирования Node.js. Модель версии 4 общедоступна и предназначена для более гибкого и интуитивно понятного интерфейса для разработчиков JavaScript и TypeScript. Дополнительные сведения о том, как работает модель версии 4, см. в руководстве разработчика по Функции Azure Node.js. Дополнительные сведения о различиях между версиями 3 и 4 см. в руководстве по миграции.

Функции Azure поддерживает две модели программирования для Python. Способ определения привязок зависит от выбранной модели программирования.

Модель программирования Python версии 2 позволяет определять привязки с помощью декораторов непосредственно в коде функции Python. Дополнительные сведения см. в руководстве разработчика Python.

Эта статья поддерживает обе модели программирования.

Пример

В следующем примере показана функция C#, которая активируется на основе центра событий, где входная строка сообщения записывается в журналы:

{
    private readonly ILogger<EventHubsFunction> _logger;

    public EventHubsFunction(ILogger<EventHubsFunction> logger)
    {
        _logger = logger;
    }

    [Function(nameof(EventHubFunction))]
    [FixedDelayRetry(5, "00:00:10")]
    [EventHubOutput("dest", Connection = "EventHubConnection")]
    public string EventHubFunction(
        [EventHubTrigger("src", Connection = "EventHubConnection")] string[] input,
        FunctionContext context)
    {
        _logger.LogInformation("First Event Hubs triggered message: {msg}", input[0]);

        var message = $"Output message created at {DateTime.Now}";
        return message;
    }

В следующем примере показана функция триггера TypeScript концентраторов событий. Функция считывает метаданные события и записывает сообщение в журнал.

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(message: unknown, context: InvocationContext): Promise<void> {
    context.log('Event hub function processed message:', message);
    context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
    context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
    context.log('Offset =', context.triggerMetadata.offset);
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: eventHubTrigger1,
});

Чтобы получить события в пакете, установите значение cardinalitymany, как показано в следующем примере.

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(messages: unknown[], context: InvocationContext): Promise<void> {
    context.log(`Event hub function processed ${messages.length} messages`);
    for (let i = 0; i < messages.length; i++) {
        context.log('Event hub message:', messages[i]);
        context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
        context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
        context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
    }
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: eventHubTrigger1,
});

В следующем примере показана функция JavaScript триггера Центров событий. Функция считывает метаданные события и записывает сообщение в журнал.

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: (message, context) => {
        context.log('Event hub function processed message:', message);
        context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
        context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
        context.log('Offset =', context.triggerMetadata.offset);
    },
});

Чтобы получить события в пакете, установите значение cardinalitymany, как показано в следующем примере.

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: (messages, context) => {
        context.log(`Event hub function processed ${messages.length} messages`);
        for (let i = 0; i < messages.length; i++) {
            context.log('Event hub message:', messages[i]);
            context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
            context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
            context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
        }
    },
});

Вот полный текст кода PowerShell:

param($eventHubMessages, $TriggerMetadata)

Write-Host "PowerShell eventhub trigger function called for message array: $eventHubMessages"

$eventHubMessages | ForEach-Object { Write-Host "Processed message: $_" }

В следующем примере показана привязка триггера Центров событий и функция Python, использующая привязку. Функция считывает метаданные события и записывает сообщение в журнал. Пример зависит от того, используется ли модель программирования Python версии 1 или версии 2.

import logging
import azure.functions as func

app = func.FunctionApp()

@app.function_name(name="EventHubTrigger1")
@app.event_hub_message_trigger(arg_name="myhub", 
                               event_hub_name="<EVENT_HUB_NAME>",
                               connection="<CONNECTION_SETTING>") 
def test_function(myhub: func.EventHubEvent):
    logging.info('Python EventHub trigger processed an event: %s',
                myhub.get_body().decode('utf-8'))

В следующем примере показана привязка триггера центров событий, которая записывает в журнал текст сообщений этого триггера.

@FunctionName("ehprocessor")
public void eventHubProcessor(
  @EventHubTrigger(name = "msg",
                  eventHubName = "myeventhubname",
                  connection = "myconnvarname") String message,
       final ExecutionContext context )
       {
          context.getLogger().info(message);
 }

В библиотеке среды выполнения функций Java используйте заметку EventHubTrigger для параметров, значения которых будут поступать из центра событий. Параметры с этими заметками запускают функцию, когда происходит событие. Эта заметка может использоваться с собственными типами Java, объектами POJO или значениями, допускающими значения NULL, используя Optional<T>.

В следующем примере показано широкое использование SystemProperties и других параметров привязки для дальнейшего анализа события, а также предоставление правильно сформированного пути BlobOutput, который является иерархическим по дате.

package com.example;
import java.util.Map;
import java.time.ZonedDateTime;

import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;

/**
 * Azure Functions with Event Hub trigger.
 * and Blob Output using date in path along with message partition ID
 * and message sequence number from EventHub Trigger Properties
 */
public class EventHubReceiver {

    @FunctionName("EventHubReceiver")
    @StorageAccount("bloboutput")

    public void run(
            @EventHubTrigger(name = "message",
                eventHubName = "%eventhub%",
                consumerGroup = "%consumergroup%",
                connection = "eventhubconnection",
                cardinality = Cardinality.ONE)
            String message,

            final ExecutionContext context,

            @BindingName("Properties") Map<String, Object> properties,
            @BindingName("SystemProperties") Map<String, Object> systemProperties,
            @BindingName("PartitionContext") Map<String, Object> partitionContext,
            @BindingName("EnqueuedTimeUtc") Object enqueuedTimeUtc,

            @BlobOutput(
                name = "outputItem",
                path = "iotevents/{datetime:yy}/{datetime:MM}/{datetime:dd}/{datetime:HH}/" +
                       "{datetime:mm}/{PartitionContext.PartitionId}/{SystemProperties.SequenceNumber}.json")
            OutputBinding<String> outputItem) {

        var et = ZonedDateTime.parse(enqueuedTimeUtc + "Z"); // needed as the UTC time presented does not have a TZ
                                                             // indicator
        context.getLogger().info("Event hub message received: " + message + ", properties: " + properties);
        context.getLogger().info("Properties: " + properties);
        context.getLogger().info("System Properties: " + systemProperties);
        context.getLogger().info("partitionContext: " + partitionContext);
        context.getLogger().info("EnqueuedTimeUtc: " + et);

        outputItem.setValue(message);
    }
}

Атрибуты

Библиотеки C# в процессе и изолированном рабочем процессе используют атрибут для настройки триггера. Вместо этого скрипт C# использует файл конфигурации function.json, как описано в руководстве по скриптам C#.

Используйте EventHubTriggerAttribute для определения триггера в центре событий, который поддерживает следующие свойства.

Параметры Description
EventHubName Имя концентратора событий. Если имя концентратора событий указано также в строке подключения, такое значение переопределяет это свойство во время выполнения. Можно указывать ссылки в настройках приложения, например %eventHubName%
ConsumerGroup Необязательное свойство, которое используется для задания группы потребителей, используемой для подписки на события в концентраторе. Если аргумент опущен, используется группа потребителей $Default.
Соединение Имя параметра или коллекции параметров приложения, указывающих, как подключиться к центрам событий. Дополнительные сведения см. в разделе Подключения.

Декораторы

Применяется только к модели программирования Python версии 2.

Для функций Python версии 2, определенных с помощью декоратора, в следующих свойствах cosmos_db_trigger:

Свойство Description
arg_name Имя переменной, представляющей элемент события в коде функции.
event_hub_name Имя концентратора событий. Если имя концентратора событий указано также в строке подключения, такое значение переопределяет это свойство во время выполнения.
connection Имя параметра или коллекции параметров приложения, указывающих, как подключиться к центрам событий. См. раздел Подключения.

Сведения о функциях Python, определенных с помощью function.json, см. в разделе "Конфигурация ".

Заметки

В библиотеке среды выполнения функций Java используйте заметку EventHubTrigger, которая поддерживает следующие параметры:

Настройка

Применяется только к модели программирования Python версии 1.

В следующей таблице описываются свойства, которые можно задать для options объекта, переданного методу app.eventHub() .

Свойство Description
eventHubName Имя концентратора событий. Если имя концентратора событий указано также в строке подключения, такое значение переопределяет это свойство во время выполнения. Можно указывать ссылки в настройках приложения%eventHubName%
consumerGroup Необязательное свойство, которое используется для задания группы потребителей, используемой для подписки на события в концентраторе. Если аргумент опущен, используется группа потребителей $Default.
кратность Задайте значение many, чтобы включить пакетную обработку. Если этот параметр отсутствует или имеет значение one, функции передается одно сообщение.
Подключение Имя параметра или коллекции параметров приложения, указывающих, как подключиться к центрам событий. См. раздел Подключения.

В следующей таблице описываются свойства конфигурации триггера, задаваемые в файле function.json, который может отличаться в зависимости от версии.

Свойство в function.json Описание
type Должен иметь значениеeventHubTrigger. Это свойство задается автоматически при создании триггера на портале Azure.
direction Должен иметь значениеin. Это свойство задается автоматически при создании триггера на портале Azure.
name Имя переменной, представляющей элемент события в коде функции.
eventHubName Имя концентратора событий. Если имя концентратора событий указано также в строке подключения, такое значение переопределяет это свойство во время выполнения. Можно указывать ссылки в настройках приложения%eventHubName%
consumerGroup Необязательное свойство, которое используется для задания группы потребителей, используемой для подписки на события в концентраторе. Если аргумент опущен, используется группа потребителей $Default.
кратность Задайте значение many, чтобы включить пакетную обработку. Если этот параметр отсутствует или имеет значение one, функции передается одно сообщение.
Подключение Имя параметра или коллекции параметров приложения, указывающих, как подключиться к центрам событий. См. раздел Подключения.

Если разработка ведется на локальном компьютере, добавьте параметры приложения в файл local.settings.json в коллекции Values.

Использование

Дополнительные сведения о том, как триггеры центров событий и Центр Интернета вещей масштабируются, см. в статье "Использование событий с помощью Функции Azure".

Тип параметра, поддерживаемый выходной привязкой Центров событий, зависит от версии среды выполнения Функций, версии пакета расширения и используемой модальности C#.

Если требуется, чтобы функция обрабатывала одно событие, триггер Центров событий может привязаться к следующим типам:

Тип Описание
string Событие в виде строки. Используется, когда событие является простым текстом.
byte[] Байты события.
Сериализуемые в JSON типы Если событие содержит данные JSON, функции пытаются десериализировать данные JSON в обычный тип объекта CLR (POCO).
Azure.Messaging.EventHubs.EventData1 Объект события.
Если вы переносите все старые версии пакетов SDK центров событий, обратите внимание, что эта версия удаляет поддержку устаревшего Bodyтипа в пользу EventBody.

Если требуется, чтобы функция обрабатывала пакет событий, триггер Центров событий может привязаться к следующим типам:

Тип Описание
string[] Массив событий из пакета в виде строк. Каждая запись представляет одно событие.
EventData[]1 Массив событий из пакета в качестве экземпляров Azure.Messaging.EventHubs.EventData. Каждая запись представляет одно событие.
T[] где T является сериализуемый типJSON 1 Массив событий из пакета в качестве экземпляров пользовательского типа POCO. Каждая запись представляет одно событие.

1 Для использования этих типов необходимо ссылаться на Microsoft.Azure.Functions.Worker.Extensions.EventHubs 5.5.0 или более поздней версии , а также общие зависимости для привязок типов SDK.

Параметр может иметь один из следующих типов:

  • Все собственные типы Java, такие как int, String, byte[].
  • Значения, которые могут принимать значение NULL, являются необязательными.
  • Любой тип POJO.

Для получения дополнительных сведений см. справочник EventHubTrigger.

Метаданные события

Триггер Центров событий предоставляет несколько свойств метаданных. Свойства метаданных можно использовать как часть выражений привязки в других привязках или как параметры в коде. Эти свойства передаются из класса EventData.

Свойство Type Описание
PartitionContext PartitionContext Экземпляр PartitionContext.
EnqueuedTimeUtc DateTime Время попадания в очередь в формате UTC.
Offset string Смещение данных относительно потока разделов концентратора событий. Смещение — это маркер или идентификатор события в потоке Центров событий. Этот идентификатор уникален внутри раздела потока Центров событий.
PartitionKey string Раздел, в который следует отправлять данные события.
Properties IDictionary<String,Object> Свойства пользователя данных события.
SequenceNumber Int64 Регистрационный номер транзакции события в журнале.
SystemProperties IDictionary<String,Object> Свойства системы, включая данные события.

См. примеры кода, в которых используются эти свойства, в предыдущих разделах этой статьи.

Связи

Свойство connection является ссылкой на конфигурацию среды, содержащую имя параметра настройки приложения, содержащей строку подключения. Чтобы получить эту строку подключения, нажмите кнопку Сведения о подключении для пространства имен. Требуется строка подключения для пространства имен Центров событий, а не самого центра событий.

Для активации функции эта строка подключения должна обладать, по крайней мере, правами на чтение.

Эта строка подключения должна храниться в параметре приложения с именем, соответствующим значению, которое указано свойством connection конфигурации привязки.

Примечание.

Подключения на основе удостоверений не поддерживаются триггером центра Интернета вещей. Если необходимо везде использовать управляемые удостоверения, можно использовать маршрутизацию Центра Интернета вещей для отправки данных в управляемый центр событий. Таким образом, проверку подлинности маршрутизации исходящих подключений можно выполнить с помощью управляемого удостоверения, которое событие может считывать из этого центра событий с помощью управляемого удостоверения.

Свойства host.json

В файле host.json содержатся параметры, управляющие реакцией триггера Центров событий на события. Дополнительные сведения о доступных настройках см. в разделе Настройка host.json.

Следующие шаги