Поделиться через


Триггер хранилища очередей Azure для Функций Azure

Триггер хранилища очередей запускает функцию при добавлении сообщений в хранилище очередей Azure.

Решения по масштабированию хранилища очередей Azure для планов потребления и уровня "Премиум" выполняются с помощью масштабирования на основе целевого объекта. Дополнительные сведения см. в статье о масштабировании на основе целевого объекта.

Внимание

В этой статье используются вкладки для поддержки нескольких версий модели программирования Node.js. Модель версии 4 общедоступна и предназначена для более гибкого и интуитивно понятного интерфейса для разработчиков JavaScript и TypeScript. Дополнительные сведения о том, как работает модель версии 4, см. в руководстве разработчика по Функции Azure Node.js. Дополнительные сведения о различиях между версиями 3 и 4 см. в руководстве по миграции.

Функции Azure поддерживает две модели программирования для Python. Способ определения привязок зависит от выбранной модели программирования.

Модель программирования Python версии 2 позволяет определять привязки с помощью декораторов непосредственно в коде функции Python. Дополнительные сведения см. в руководстве разработчика Python.

Эта статья поддерживает обе модели программирования.

Пример

Используйте триггер очереди для запуска функции при получении нового элемента очереди. Сообщение очереди предоставляется в качестве входных данных функции.

Функцию C# можно создать с помощью одного из следующих режимов C#:

  • Изолированная рабочая модель: скомпилированная функция C#, которая выполняется в рабочем процессе, изолированном от среды выполнения. Изолированный рабочий процесс необходим для поддержки функций C#, работающих в LTS и не LTS-версиях .NET и платформа .NET Framework. Расширения для изолированных рабочих процессов используют Microsoft.Azure.Functions.Worker.Extensions.* пространства имен.
  • Модель внутрипроцессного процесса: скомпилированная функция C#, которая выполняется в том же процессе, что и среда выполнения Функций. В варианте этой модели функции можно запускать с помощью скриптов C#, которая поддерживается главным образом для редактирования портала C#. Расширения для функций в процессе используют Microsoft.Azure.WebJobs.Extensions.* пространства имен.

В следующем примере показана функция C#, которая выполняет опрос очереди input-queue, а затем записывает несколько сообщений в очередь вывода при каждой обработке элемента очереди.

[Function(nameof(QueueFunction))]
[QueueOutput("output-queue")]
public string[] Run([QueueTrigger("input-queue")] Album myQueueItem, FunctionContext context)
{
    // Use a string array to return more than one message.
    string[] messages = {
        $"Album name = {myQueueItem.Name}",
        $"Album songs = {myQueueItem.Songs.ToString()}"};

    _logger.LogInformation("{msg1},{msg2}", messages[0], messages[1]);

    // Queue Output messages
    return messages;
}

В следующем примере Java показаны триггерные функцию очереди хранилища, которая регистрирует активированное сообщение, помещенное в очередь myqueuename.

@FunctionName("queueprocessor")
public void run(
    @QueueTrigger(name = "msg",
                queueName = "myqueuename",
                connection = "myconnvarname") String message,
    final ExecutionContext context
) {
    context.getLogger().info(message);
}

В следующем примере показана функция триггера TypeScript очереди. Эта функция выполняет опрос очереди myqueue-items, а затем делает запись в журнал при каждой обработке элемента очереди.

import { app, InvocationContext } from '@azure/functions';

export async function storageQueueTrigger1(queueItem: unknown, context: InvocationContext): Promise<void> {
    context.log('Storage queue function processed work item:', queueItem);
    context.log('expirationTime =', context.triggerMetadata.expirationTime);
    context.log('insertionTime =', context.triggerMetadata.insertionTime);
    context.log('nextVisibleTime =', context.triggerMetadata.nextVisibleTime);
    context.log('id =', context.triggerMetadata.id);
    context.log('popReceipt =', context.triggerMetadata.popReceipt);
    context.log('dequeueCount =', context.triggerMetadata.dequeueCount);
}

app.storageQueue('storageQueueTrigger1', {
    queueName: 'myqueue-items',
    connection: 'MyStorageConnectionAppSetting',
    handler: storageQueueTrigger1,
});

В разделе об использовании объясняется queueItem. В разделе Метаданные сообщений показаны все остальные переменные.

В следующем примере показана функция триггера JavaScript очереди. Эта функция выполняет опрос очереди myqueue-items, а затем делает запись в журнал при каждой обработке элемента очереди.

const { app } = require('@azure/functions');

app.storageQueue('storageQueueTrigger1', {
    queueName: 'myqueue-items',
    connection: 'MyStorageConnectionAppSetting',
    handler: (queueItem, context) => {
        context.log('Storage queue function processed work item:', queueItem);
        context.log('expirationTime =', context.triggerMetadata.expirationTime);
        context.log('insertionTime =', context.triggerMetadata.insertionTime);
        context.log('nextVisibleTime =', context.triggerMetadata.nextVisibleTime);
        context.log('id =', context.triggerMetadata.id);
        context.log('popReceipt =', context.triggerMetadata.popReceipt);
        context.log('dequeueCount =', context.triggerMetadata.dequeueCount);
    },
});

В разделе об использовании объясняется queueItem. В разделе Метаданные сообщений показаны все остальные переменные.

В следующем примере показано, как прочитать сообщение очереди, переданное в функцию через триггер.

Триггер очереди хранилища определяется в файле function.json, в котором type настроен на queueTrigger.

{
  "bindings": [
    {
      "name": "QueueItem",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "messages",
      "connection": "MyStorageConnectionAppSetting"
    }
  ]
}

Код в файле Run.ps1 объявляет параметр как $QueueItem, который позволяет считывать сообщение очереди в функции.

# Input bindings are passed in via param block.
param([string] $QueueItem, $TriggerMetadata)

# Write out the queue message and metadata to the information log.
Write-Host "PowerShell queue trigger function processed work item: $QueueItem"
Write-Host "Queue item expiration time: $($TriggerMetadata.ExpirationTime)"
Write-Host "Queue item insertion time: $($TriggerMetadata.InsertionTime)"
Write-Host "Queue item next visible time: $($TriggerMetadata.NextVisibleTime)"
Write-Host "ID: $($TriggerMetadata.Id)"
Write-Host "Pop receipt: $($TriggerMetadata.PopReceipt)"
Write-Host "Dequeue count: $($TriggerMetadata.DequeueCount)"

В следующем примере показано, как прочитать сообщение очереди, переданное в функцию через триггер. Пример зависит от того, используется ли модель программирования Python версии 1 или версии 2.

import logging
import azure.functions as func

app = func.FunctionApp()

@app.function_name(name="QueueFunc")
@app.queue_trigger(arg_name="msg", queue_name="inputqueue",
                   connection="storageAccountConnectionString")  # Queue trigger
@app.queue_output(arg_name="outputQueueItem", queue_name="outqueue",
                 connection="storageAccountConnectionString")  # Queue output binding
def test_function(msg: func.QueueMessage,
                  outputQueueItem: func.Out[str]) -> None:
    logging.info('Python queue trigger function processed a queue item: %s',
                 msg.get_body().decode('utf-8'))
    outputQueueItem.set('hello')

Атрибуты

Библиотеки C# в процессе и изолированном рабочем процессе используют QueueTriggerAttribute для определения функции. Вместо этого скрипт C# использует файл конфигурации function.json, как описано в руководстве по скриптам C#.

В библиотеках классов C# конструктор атрибута принимает имя очереди для отслеживания, как показано в следующем примере:

[Function(nameof(QueueFunction))]
[QueueOutput("output-queue")]
public string[] Run([QueueTrigger("input-queue")] Album myQueueItem, FunctionContext context)

В этом примере также показано задание параметра строки подключения в самом атрибуте.

Заметки

Заметка QueueTrigger предоставляет доступ к очереди, которая запускает функцию. В следующем примере сообщение очереди становится доступным для функции через параметр message.

package com.function;
import com.microsoft.azure.functions.annotation.*;
import java.util.Queue;
import com.microsoft.azure.functions.*;

public class QueueTriggerDemo {
    @FunctionName("QueueTriggerDemo")
    public void run(
        @QueueTrigger(name = "message", queueName = "messages", connection = "MyStorageConnectionAppSetting") String message,
        final ExecutionContext context
    ) {
        context.getLogger().info("Queue message: " + message);
    }
}
Свойство Description
name Объявляет имя параметра в сигнатуре функции. При активации функции значение этого параметра будет иметь содержимое сообщения очереди.
queueName Объявляет имя очереди в учетной записи хранения.
connection Задает строку подключения к учетной записи хранения.

Декораторы

Применяется только к модели программирования Python версии 2.

Для функций Python версии 2, определенных с помощью декораторов, следующие свойства в декораторе queue_trigger определяют триггер хранилища очередей:

Свойство Description
arg_name Объявляет имя параметра в сигнатуре функции. При активации функции значение этого параметра будет иметь содержимое сообщения очереди.
queue_name Объявляет имя очереди в учетной записи хранения.
connection Задает строку подключения к учетной записи хранения.

Сведения о функциях Python, определенных с помощью function.json, см. в разделе "Конфигурация".

Настройка

Применяется только к модели программирования Python версии 1.

В следующей таблице описываются свойства, которые можно задать для options объекта, переданного методу app.storageQueue() .

Свойство Description
queueName Имя очереди для опроса.
Подключение Имя параметра или коллекции параметров приложения, указывающих, как подключиться к очередям Azure. См. раздел Подключения.

В следующей таблице описываются свойства конфигурации привязки, которые задаются в файле function.json и атрибуте QueueTrigger.

Свойство в function.json Описание
type Должен иметь значениеqueueTrigger. Это свойство задается автоматически при создании триггера на портале Azure.
direction Только в файле function.json. Должен иметь значениеin. Это свойство задается автоматически при создании триггера на портале Azure.
name Имя переменной, содержащей полезные данные элемента очереди в коде функции.
queueName Имя очереди для опроса.
Подключение Имя параметра или коллекции параметров приложения, указывающих, как подключиться к очередям Azure. См. раздел Подключения.

Подробные примеры см. в разделе Примеры.

Если разработка ведется на локальном компьютере, добавьте параметры приложения в файл local.settings.json в коллекции Values.

Использование

Примечание.

Функции ожидают строку в кодировке base64. Любая корректировка типа кодирования (для подготовки данных в виде строки в кодировке base64) должна быть реализована в вызывающей службе.

Использование триггера очереди зависит от версии пакета расширения и модальности C#, используемой в приложении-функции, которая может быть одним из следующих режимов:

Изолированная библиотека классов рабочих процессов, скомпилированная функция C# выполняется в процессе, изолированном от среды выполнения.

Выберите версию, чтобы просмотреть сведения об использовании для режима и версии.

Триггер очереди может привязаться к следующим типам:

Тип Описание
string Содержимое сообщения в виде строки. Используется, когда сообщение является простым текстом..
byte[] Байт сообщения.
Сериализуемые в JSON типы Если сообщение очереди содержит данные JSON, функции пытаются десериализировать данные JSON в обычный тип объекта CLR (POCO).
QueueMessage1 Сообщение.
BinaryData1 Байт сообщения.

1 Для использования этих типов необходимо ссылаться на Microsoft.Azure.Functions.Worker.Extensions.Storage.Queues 5.2.0 или более поздней версии , а также общие зависимости для привязок типов ПАКЕТА SDK.

Заметка QueueTrigger предоставляет доступ к сообщению очереди, вызвавшему функцию.

Доступ к элементу очереди в качестве первого аргумента функции. Если полезные данные представлены в виде JSON, значение десериализируется в объект.

Получение доступа к сообщению очереди через параметр string, который соответствует имени, назначенному name параметром привязки в файле function.js.

Получение доступа к сообщению очереди с помощью параметра, типизированного как QueueMessage.

Метаданные

Триггер очереди предоставляет несколько свойств метаданных. Эти свойства можно использовать как часть выражений привязки в других привязках или в качестве параметров в коде для языковых работников, которые предоставляют этот доступ к метаданным сообщения.

Свойства метаданных сообщения являются членами класса CloudQueueMessage .

К свойствам метаданных сообщения можно получить доступ.context.triggerMetadata

К свойствам метаданных сообщения можно получить доступ из переданного $TriggerMetadata параметра.

Свойство Type Описание
QueueTrigger string Полезные данные очереди (если это допустимая строка). Если полезные данные очереди сообщений представлены в виде строки, значение QueueTrigger совпадает со значением переменной, имя которой назначено свойством name в файле function.json.
DequeueCount long Количество раз, когда сообщение было выведено из очереди.
ExpirationTime DateTimeOffset Время истечения срока действия сообщения.
Id string Идентификатор сообщения в очереди.
InsertionTime DateTimeOffset Время, когда сообщение было добавлено в очередь.
NextVisibleTime DateTimeOffset Время, когда сообщение станет видимым в следующий раз.
PopReceipt string Уведомление о получении сообщения.

Следующие свойства метаданных сообщения можно получить из переданного параметра привязки (msg в предыдущих примерах).

Свойство Description
body Полезные данные очереди в виде строки.
dequeue_count Количество раз, когда сообщение было выведено из очереди.
expiration_time Время истечения срока действия сообщения.
id Идентификатор сообщения в очереди.
insertion_time Время, когда сообщение было добавлено в очередь.
time_next_visible Время, когда сообщение станет видимым в следующий раз.
pop_receipt Уведомление о получении сообщения.

Связи

Это connection свойство является ссылкой на конфигурацию среды, которая указывает, как приложение должно подключаться к очередям Azure. В данном свойстве может быть указано:

Если настроенное значение одновременно точно соответствует одному параметру и является префиксом для других параметров, то используется точное совпадение.

Connection string

Чтобы получить строку подключения, выполните действия, описанные в разделе Управление ключами доступа к учетной записи хранения.

Эта строка подключения должна храниться в параметре приложения с именем, соответствующим значению, которое указано свойством connection конфигурации привязки.

Если имя параметра приложения начинается с AzureWebJobs, можно указать только остальную часть имени. Например, если для connection задано значение MyStorage, среда выполнения Функций ищет параметр приложения с именем AzureWebJobsMyStorage. Если оставить connection пустым, среда выполнения Функций использует строку подключения по умолчанию служба хранилища из параметр приложения AzureWebJobsStorage.

Подключения на основе удостоверений

Если вы используете расширение версии 5.x или выше (пакет 3.x или более поздней для стеков языка non-.NET), а не используете строка подключения с секретом, приложение может использовать удостоверение Microsoft Entra. Чтобы использовать удостоверение, вы определяете параметры под общим префиксом, который сопоставляется со connection свойством в конфигурации триггера и привязки.

Если вы устанавливаете значение connection "AzureWebJobsStorage", см. статью "Подключение к хранилищу с удостоверением". Для всех других подключений требуются следующие свойства в расширении:

Свойство Шаблон переменной среды Description Пример значения
Универсальный код ресурса (URI) службы очередей <CONNECTION_NAME_PREFIX>__queueServiceUri1 URI плоскости данных службы очередей, к которой вы подключаетесь, с помощью схемы HTTPS. https://<storage_account_name>.queue.core.windows.net

1 <CONNECTION_NAME_PREFIX>__serviceUri можно использовать в качестве псевдонима. Если указаны обе формы, queueServiceUri используется форма. Форму serviceUri нельзя использовать, если общая конфигурация подключения должна использоваться для больших двоичных объектов, очередей и (или) таблиц.

Другие свойства могут быть заданы для настройки подключения. См. раздел Общие свойства подключений на основе удостоверений.

При размещении в службе "Функции Azure" для подключений на основе удостоверений используется управляемое удостоверение. По умолчанию используется назначаемое системой удостоверение, однако вы можете указать назначаемое пользователем удостоверение с помощью свойств credential и clientID. Обратите внимание, что настройка назначаемого пользователем удостоверения с идентификатором ресурса не поддерживается. При выполнении в других контекстах, например при локальной разработке, вместо этого используется удостоверение разработчика, хотя это можно настроить. См. раздел Локальная разработка с использованием подключений на основе удостоверений.

Предоставление разрешения удостоверению

Любое используемое удостоверение должно иметь разрешения на выполнение предполагаемых действий. Для большинства служб Azure это означает, что необходимо назначить роль в Azure RBAC, используя встроенные или настраиваемые роли, которые предоставляют эти разрешения.

Внимание

Иногда целевая служба может предоставлять разрешения, которые не являются обязательными для всех контекстов. Там, где это возможно, придерживайтесь принципа минимальных привилегий, предоставляя удостоверению лишь самые необходимые привилегии. Например, если приложению требуется только возможность чтения из источника данных, используйте роль, которая имеет разрешение только на чтение. Было бы неуместным назначить роль, которая также разрешает запись в эту службу, так как это разрешение не требуется для операции чтения. Соответственно необходимо еще проверить, что область действия назначенной роли ограничена только теми ресурсами, которые необходимо прочитать.

Вам потребуется создать назначение ролей, которое предоставляет доступ к очереди во время выполнения. Ролей управления, таких как Владелец, недостаточно. В следующей таблице показаны встроенные роли, которые рекомендуется использовать вместе с расширением хранилища очередей при обычной работе. Приложению могут потребоваться дополнительные разрешения в зависимости от написанного кода.

Тип привязки Примеры встроенных ролей
Триггер Читатель данных очереди хранилища, Обработчик сообщений данных в очереди хранилища
Выходные привязки Участник для данных очереди хранилища, Отправитель сообщений данных в очередь хранилища

Сообщения о сбое

При сбое функции триггера очереди по умолчанию служба "Функции Azure" выполняет ее для заданного сообщения очереди еще пять раз (включая первую попытку). В случае сбоя всех пяти попыток среда выполнения функций добавляет сообщение в очередь с именем <имя_первоначальной_очереди>-poison. Можно написать функции для обработки сообщений из очереди подозрительных сообщений путем внесения их в журнал или отправки уведомления о необходимости ручного вмешательства.

Для обработки сообщений о сбоях вручную проверьте значение dequeueCount сообщения очереди.

Блокировка быстрого редактирования

Шаблон отслеживания блокировки выполняется автоматически для триггеров очереди, используя механизм видимости, предоставляемый службой хранилища. Так как сообщения удаляются триггерной функцией, они помечены как невидимые. Выполнение триггерной функции очереди может иметь один из этих результатов в сообщении в очереди:

  • Выполнение функции завершается успешно, и сообщение удаляется из очереди.
  • Выполнение функции завершается сбоем, и узел Функций обновляет видимость сообщения на visibilityTimeout основе параметра в файле host.json. Время ожидания видимости по умолчанию равно нулю, что означает, что сообщение немедленно появится в очереди для повторной обработки. visibilityTimeout Используйте параметр, чтобы отложить повторную обработку сообщений, которые не удалось обработать. Этот параметр времени ожидания применяется ко всем активированным функциям очереди в приложении-функции.
  • Узел функций завершает работу во время выполнения функции. При возникновении этого редкого события узел не может применить visibilityTimeout его к обрабатываемого сообщения. Вместо этого сообщение остается с интервалом времени ожидания по умолчанию 10 минут, заданным службой хранилища. Через 10 минут сообщение снова появится в очереди для повторной обработки. Это время ожидания по умолчанию, определенное службой, не может быть изменено.

Алгоритм опроса

В триггере очереди реализован алгоритм случайной экспоненциальной отсрочки, который позволяет уменьшить влияние опроса очереди ожидающих задач на затраты на транзакции хранилища.

В алгоритме заложена следующая логика:

  • При обнаружении сообщения среда выполнения ожидает 100 миллисекунд, а затем проверяет наличие другого сообщения.
  • Если новое сообщение не обнаружено, примерно через 200 миллисекунд начинается повторный поиск.
  • После последующих неудачных попыток получения сообщения очереди время ожидания продолжает увеличиваться, пока не достигнет максимального времени ожидания, по умолчанию — одна минута.
  • Максимальное время ожидания настраивается при помощи свойства maxPollingInterval в файле host.json.

Во время локальной разработки максимальный интервал опроса по умолчанию составляет 2 секунды.

Примечание.

Что касается выставления счетов при размещении приложений-функций в плане потребления, с вас не взимается плата за время, потраченное на опрос средой выполнения.

Параллелизм

При наличии нескольких сообщений, ожидающих в очереди, триггер очереди извлекает пакет сообщений и в параллельном режиме вызывает экземпляры функций для обработки. По умолчанию в пакете содержится 16 сообщений. Когда число обрабатываемых сообщений снижается до 8, среда выполнения получает следующий пакет и начинает обработку содержащихся в нем сообщений. Поэтому максимальное количество сообщений, одновременно обрабатываемых каждой функцией на одной виртуальной машине, равно 24. Это ограничение применяется отдельно к каждой функции, активируемой с помощью очереди, на каждой виртуальной машине. Если приложение-функция масштабируется до нескольких виртуальных машин, каждая виртуальная машина ожидает триггеров и пытается запустить функции. Например, если приложение-функция масштабируется на 3 виртуальные машины, то максимальное количество параллельных экземпляров одной функции, активируемой с помощью очереди, равно 72.

Размер пакета и порог для получения нового пакета настраиваются в файле host.json. Если в приложении-функции требуется свести к минимуму параллельное выполнение функций, активируемых с помощью очереди, вы можете задать для размера пакета значение 1. Этот параметр позволяет исключить параллелизм только при условии, что приложение-функция выполняется на одной виртуальной машине.

Триггер очереди автоматически не дает функции одновременно обрабатывать сообщения очереди несколько раз.

Свойства host.json

В файле host.json содержатся параметры, управляющие поведением очереди триггера. Дополнительные сведения о доступных настройках см. в разделе Настройка host.json.

Следующие шаги