Поделиться через


Триггер службы "Центры событий Azure" для Функций Azure

Здесь объясняется, как работать с триггерами Центров событий Azure для службы "Функции Azure". Функции Azure поддерживают привязки триггера и выходные привязки для Центров событий.

Сведения об установке и настройке см. в обзорной статье.

Используйте триггер функции для ответа на событие, отправленное в поток событий концентратора событий. Чтобы настроить триггер, необходимо иметь доступ на чтение к базовому концентратору событий. При активации функции в строковом виде вводится сообщение, передаваемое в функцию.

Решения по масштабированию Центров событий для планов потребления и уровня "Премиум" выполняются с помощью масштабирования на основе целевого объекта. Дополнительные сведения см. в разделе "Масштабирование на основе целевого объекта".

Сведения о том, как Функции Azure реагирует на события, отправленные в поток событий концентратора событий с помощью триггеров, см. в статье Интеграция Центров событий с бессерверными функциями в Azure.

Внимание

В этой статье используются вкладки для поддержки нескольких версий модели программирования Node.js. Модель версии 4 общедоступна и предназначена для более гибкого и интуитивно понятного интерфейса для разработчиков JavaScript и TypeScript. Дополнительные сведения о том, как работает модель версии 4, см. в руководстве разработчика по Функции Azure Node.js. Дополнительные сведения о различиях между версиями 3 и 4 см. в руководстве по миграции.

Функции Azure поддерживает две модели программирования для Python. Способ определения привязок зависит от выбранной модели программирования.

Модель программирования Python версии 2 позволяет определять привязки с помощью декораторов непосредственно в коде функции Python. Дополнительные сведения см. в руководстве разработчика Python.

Эта статья поддерживает обе модели программирования.

Пример

В следующем примере показана функция C#, которая активируется на основе центра событий, где входная строка сообщения записывается в журналы:

{
    private readonly ILogger<EventHubsFunction> _logger;

    public EventHubsFunction(ILogger<EventHubsFunction> logger)
    {
        _logger = logger;
    }

    [Function(nameof(EventHubFunction))]
    [FixedDelayRetry(5, "00:00:10")]
    [EventHubOutput("dest", Connection = "EventHubConnection")]
    public string EventHubFunction(
        [EventHubTrigger("src", Connection = "EventHubConnection")] string[] input,
        FunctionContext context)
    {
        _logger.LogInformation("First Event Hubs triggered message: {msg}", input[0]);

        var message = $"Output message created at {DateTime.Now}";
        return message;
    }

В следующем примере показана функция триггера TypeScript концентраторов событий. Функция считывает метаданные события и записывает сообщение в журнал.

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(message: unknown, context: InvocationContext): Promise<void> {
    context.log('Event hub function processed message:', message);
    context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
    context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
    context.log('Offset =', context.triggerMetadata.offset);
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: eventHubTrigger1,
});

Чтобы получить события в пакете, установите значение cardinality many, как показано в следующем примере.

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(messages: unknown[], context: InvocationContext): Promise<void> {
    context.log(`Event hub function processed ${messages.length} messages`);
    for (let i = 0; i < messages.length; i++) {
        context.log('Event hub message:', messages[i]);
        context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
        context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
        context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
    }
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: eventHubTrigger1,
});

В следующем примере показана функция JavaScript триггера Центров событий. Функция считывает метаданные события и записывает сообщение в журнал.

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: (message, context) => {
        context.log('Event hub function processed message:', message);
        context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
        context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
        context.log('Offset =', context.triggerMetadata.offset);
    },
});

Чтобы получить события в пакете, установите значение cardinality many, как показано в следующем примере.

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: (messages, context) => {
        context.log(`Event hub function processed ${messages.length} messages`);
        for (let i = 0; i < messages.length; i++) {
            context.log('Event hub message:', messages[i]);
            context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
            context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
            context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
        }
    },
});

Вот полный текст кода PowerShell:

param($eventHubMessages, $TriggerMetadata)

Write-Host "PowerShell eventhub trigger function called for message array: $eventHubMessages"

$eventHubMessages | ForEach-Object { Write-Host "Processed message: $_" }

В следующем примере показана привязка триггера Центров событий и функция Python, использующая привязку. Функция считывает метаданные события и записывает сообщение в журнал. Пример зависит от того, используется ли модель программирования Python версии 1 или версии 2.

import logging
import azure.functions as func

app = func.FunctionApp()

@app.function_name(name="EventHubTrigger1")
@app.event_hub_message_trigger(arg_name="myhub", 
                               event_hub_name="<EVENT_HUB_NAME>",
                               connection="<CONNECTION_SETTING>") 
def test_function(myhub: func.EventHubEvent):
    logging.info('Python EventHub trigger processed an event: %s',
                myhub.get_body().decode('utf-8'))

В следующем примере показана привязка триггера центров событий, которая записывает в журнал текст сообщений этого триггера.

@FunctionName("ehprocessor")
public void eventHubProcessor(
  @EventHubTrigger(name = "msg",
                  eventHubName = "myeventhubname",
                  connection = "myconnvarname") String message,
       final ExecutionContext context )
       {
          context.getLogger().info(message);
 }

В библиотеке среды выполнения функций Java используйте заметку EventHubTrigger для параметров, значения которых будут поступать из центра событий. Параметры с этими заметками запускают функцию, когда происходит событие. Эта заметка может использоваться с собственными типами Java, объектами POJO или значениями, допускающими значения NULL, используя Optional<T>.

В следующем примере показано широкое использование SystemProperties и других параметров привязки для дальнейшего анализа события, а также предоставление правильно сформированного пути BlobOutput, который является иерархическим по дате.

package com.example;
import java.util.Map;
import java.time.ZonedDateTime;

import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;

/**
 * Azure Functions with Event Hub trigger.
 * and Blob Output using date in path along with message partition ID
 * and message sequence number from EventHub Trigger Properties
 */
public class EventHubReceiver {

    @FunctionName("EventHubReceiver")
    @StorageAccount("bloboutput")

    public void run(
            @EventHubTrigger(name = "message",
                eventHubName = "%eventhub%",
                consumerGroup = "%consumergroup%",
                connection = "eventhubconnection",
                cardinality = Cardinality.ONE)
            String message,

            final ExecutionContext context,

            @BindingName("Properties") Map<String, Object> properties,
            @BindingName("SystemProperties") Map<String, Object> systemProperties,
            @BindingName("PartitionContext") Map<String, Object> partitionContext,
            @BindingName("EnqueuedTimeUtc") Object enqueuedTimeUtc,

            @BlobOutput(
                name = "outputItem",
                path = "iotevents/{datetime:yy}/{datetime:MM}/{datetime:dd}/{datetime:HH}/" +
                       "{datetime:mm}/{PartitionContext.PartitionId}/{SystemProperties.SequenceNumber}.json")
            OutputBinding<String> outputItem) {

        var et = ZonedDateTime.parse(enqueuedTimeUtc + "Z"); // needed as the UTC time presented does not have a TZ
                                                             // indicator
        context.getLogger().info("Event hub message received: " + message + ", properties: " + properties);
        context.getLogger().info("Properties: " + properties);
        context.getLogger().info("System Properties: " + systemProperties);
        context.getLogger().info("partitionContext: " + partitionContext);
        context.getLogger().info("EnqueuedTimeUtc: " + et);

        outputItem.setValue(message);
    }
}

Атрибуты

Библиотеки C# в процессе и изолированном рабочем процессе используют атрибут для настройки триггера. Вместо этого скрипт C# использует файл конфигурации function.json, как описано в руководстве по скриптам C#.

Используйте EventHubTriggerAttribute для определения триггера в центре событий, который поддерживает следующие свойства.

Параметры Description
EventHubName Имя концентратора событий. Если имя концентратора событий указано также в строке подключения, такое значение переопределяет это свойство во время выполнения. Можно указывать ссылки в настройках приложения, например %eventHubName%
ConsumerGroup Необязательное свойство, которое используется для задания группы потребителей, используемой для подписки на события в концентраторе. Если аргумент опущен, используется группа потребителей $Default.
Соединение Имя параметра или коллекции параметров приложения, указывающих, как подключиться к центрам событий. Дополнительные сведения см. в разделе Подключения.

Декораторы

Применяется только к модели программирования Python версии 2.

Для функций Python версии 2, определенных с помощью декоратора, в следующих свойствах event_hub_message_trigger:

Свойство Description
arg_name Имя переменной, представляющей элемент события в коде функции.
event_hub_name Имя концентратора событий. Если имя концентратора событий указано также в строке подключения, такое значение переопределяет это свойство во время выполнения.
connection Имя параметра или коллекции параметров приложения, указывающих, как подключиться к центрам событий. См. раздел Подключения.

Сведения о функциях Python, определенных с помощью function.json, см. в разделе "Конфигурация ".

Заметки

В библиотеке среды выполнения функций Java используйте заметку EventHubTrigger, которая поддерживает следующие параметры:

Настройка

Применяется только к модели программирования Python версии 1.

В следующей таблице описываются свойства, которые можно задать для options объекта, переданного методу app.eventHub() .

Свойство Description
eventHubName Имя концентратора событий. Если имя концентратора событий указано также в строке подключения, такое значение переопределяет это свойство во время выполнения. Ссылаться на него можно с помощью параметров приложения %eventHubName%.
consumerGroup Необязательное свойство, которое используется для задания группы потребителей, используемой для подписки на события в концентраторе. Если аргумент опущен, используется группа потребителей $Default.
кратность Задайте значение many, чтобы включить пакетную обработку. Если этот параметр отсутствует или имеет значение one, функции передается одно сообщение.
Подключение Имя параметра или коллекции параметров приложения, указывающих, как подключиться к центрам событий. См. раздел Подключения.

В следующей таблице описываются свойства конфигурации триггера, задаваемые в файле function.json, который может отличаться в зависимости от версии.

Свойство в function.json Описание
type Должен иметь значениеeventHubTrigger. Это свойство задается автоматически при создании триггера на портале Azure.
direction Должен иметь значениеin. Это свойство задается автоматически при создании триггера на портале Azure.
name Имя переменной, представляющей элемент события в коде функции.
eventHubName Имя концентратора событий. Если имя концентратора событий указано также в строке подключения, такое значение переопределяет это свойство во время выполнения. Ссылаться на него можно с помощью параметров приложения %eventHubName%.
consumerGroup Необязательное свойство, которое используется для задания группы потребителей, используемой для подписки на события в концентраторе. Если аргумент опущен, используется группа потребителей $Default.
кратность Задайте значение many, чтобы включить пакетную обработку. Если этот параметр отсутствует или имеет значение one, функции передается одно сообщение.
Подключение Имя параметра или коллекции параметров приложения, указывающих, как подключиться к центрам событий. См. раздел Подключения.

Если разработка ведется на локальном компьютере, добавьте параметры приложения в файл local.settings.json в коллекции Values.

Использование

Дополнительные сведения о том, как триггеры центров событий и Центр Интернета вещей масштабируются, см. в статье "Использование событий с помощью Функции Azure".

Тип параметра, поддерживаемый выходной привязкой Центров событий, зависит от версии среды выполнения Функций, версии пакета расширения и используемой модальности C#.

Если требуется, чтобы функция обрабатывала одно событие, триггер Центров событий может привязаться к следующим типам:

Тип Описание
string Событие в виде строки. Используется, когда событие является простым текстом.
byte[] Байты события.
Сериализуемые в JSON типы Если событие содержит данные JSON, функции пытаются десериализировать данные JSON в обычный тип объекта CLR (POCO).
Azure.Messaging.EventHubs.EventData1 Объект события.
Если вы переносите все старые версии пакетов SDK центров событий, обратите внимание, что эта версия удаляет поддержку устаревшего Body типа в пользу EventBody.

Если требуется, чтобы функция обрабатывала пакет событий, триггер Центров событий может привязаться к следующим типам:

Тип Описание
string[] Массив событий из пакета в виде строк. Каждая запись представляет одно событие.
EventData[] 1 Массив событий из пакета в качестве экземпляров Azure.Messaging.EventHubs.EventData. Каждая запись представляет одно событие.
T[] где T является сериализуемый типJSON 1 Массив событий из пакета в качестве экземпляров пользовательского типа POCO. Каждая запись представляет одно событие.

1 Для использования этих типов необходимо ссылаться на Microsoft.Azure.Functions.Worker.Extensions.EventHubs 5.5.0 или более поздней версии , а также общие зависимости для привязок типов SDK.

Параметр может иметь один из следующих типов:

  • Все собственные типы Java, такие как int, String, byte[].
  • Значения, которые могут принимать значение NULL, являются необязательными.
  • Любой тип POJO.

Для получения дополнительных сведений см. справочник EventHubTrigger.

Метаданные события

Триггер Центров событий предоставляет несколько свойств метаданных. Свойства метаданных можно использовать как часть выражений привязки в других привязках или как параметры в коде. Эти свойства передаются из класса EventData.

Свойство Type Описание
PartitionContext PartitionContext Экземпляр PartitionContext.
EnqueuedTimeUtc DateTime Время попадания в очередь в формате UTC.
Offset string Смещение данных относительно потока разделов концентратора событий. Смещение — это маркер или идентификатор события в потоке Центров событий. Этот идентификатор уникален внутри раздела потока Центров событий.
PartitionKey string Раздел, в который следует отправлять данные события.
Properties IDictionary<String,Object> Свойства пользователя данных события.
SequenceNumber Int64 Регистрационный номер транзакции события в журнале.
SystemProperties IDictionary<String,Object> Свойства системы, включая данные события.

См. примеры кода, в которых используются эти свойства, в предыдущих разделах этой статьи.

Связи

Свойство connection является ссылкой на конфигурацию среды, которая указывает, как приложение должно подключаться к центрам событий. В данном свойстве может быть указано:

Если настроенное значение одновременно точно соответствует одному параметру и является префиксом для других параметров, то используется точное совпадение.

Connection string

Получите эту строку подключения, нажав кнопку Сведения о подключении для пространства имен, а не сам центр событий. Требуется строка подключения для пространства имен Центров событий, а не самого центра событий.

При использовании для триггеров строка подключения должна иметь по крайней мере разрешения на чтение для активации функции. При использовании для выходных привязок строка подключения должна иметь разрешения на отправку для отправки сообщений в поток событий.

Эта строка подключения должна храниться в параметре приложения с именем, соответствующим значению, которое указано свойством connection конфигурации привязки.

Подключения на основе удостоверений

Если вы используете расширение версии 5.x или выше, вместо использования строка подключения с секретом, вы можете использовать удостоверение Microsoft Entra. Для этого необходимо определить параметры под общим префиксом, который соответствует свойству connection в конфигурации триггера и привязки.

В этом режиме для расширения требуются следующие свойства:

Свойство Шаблон переменной среды Description Пример значения
Пространство полных имен <CONNECTION_NAME_PREFIX>__fullyQualifiedNamespace Пространство полных имен центров событий. myeventhubns.servicebus.windows.net

Для настройки подключения можно задать дополнительные свойства. См. раздел Общие свойства подключений на основе удостоверений.

Примечание.

При использовании Конфигурации приложений Azure или Key Vault для предоставления параметров подключений Управляемого удостоверения имена параметров должны использовать допустимый разделитель ключей, например : или /, вместо __, чтобы обеспечить правильное разрешение имен.

Например, <CONNECTION_NAME_PREFIX>:fullyQualifiedNamespace.

При размещении в службе "Функции Azure" для подключений на основе удостоверений используется управляемое удостоверение. По умолчанию используется назначаемое системой удостоверение, однако вы можете указать назначаемое пользователем удостоверение с помощью свойств credential и clientID. Обратите внимание, что настройка назначаемого пользователем удостоверения с идентификатором ресурса не поддерживается. При выполнении в других контекстах, например при локальной разработке, вместо этого используется удостоверение разработчика, хотя это можно настроить. См. раздел Локальная разработка с использованием подключений на основе удостоверений.

Предоставление разрешения удостоверению

Любое используемое удостоверение должно иметь разрешения на выполнение предполагаемых действий. Для большинства служб Azure это означает, что необходимо назначить роль в Azure RBAC, используя встроенные или настраиваемые роли, которые предоставляют эти разрешения.

Внимание

Иногда целевая служба может предоставлять разрешения, которые не являются обязательными для всех контекстов. Там, где это возможно, придерживайтесь принципа минимальных привилегий, предоставляя удостоверению лишь самые необходимые привилегии. Например, если приложению требуется только возможность чтения из источника данных, используйте роль, которая имеет разрешение только на чтение. Было бы неуместным назначить роль, которая также разрешает запись в эту службу, так как это разрешение не требуется для операции чтения. Соответственно необходимо еще проверить, что область действия назначенной роли ограничена только теми ресурсами, которые необходимо прочитать.

Вам потребуется создать назначение ролей, которое предоставляет доступ к центру событий во время выполнения. Область назначения роли может быть для пространства имен Центров событий или самого концентратора событий. Ролей управления, таких как Владелец, недостаточно. В следующей таблице показаны встроенные роли, которые рекомендуется использовать вместе с расширением центра событий при обычной работе. Приложению могут потребоваться дополнительные разрешения в зависимости от написанного кода.

Тип привязки Примеры встроенных ролей
Триггер Получатель данных Центров событий Azure, Владелец данных Центров событий Azure
Выходные привязки Отправитель данных Центров событий Azure

Параметры файла host.json

В файле host.json содержатся параметры, управляющие реакцией триггера Центров событий на событие. Дополнительные сведения о доступных настройках см. в разделе Настройка host.json.

Следующие шаги