Azure Functions の Azure Queue storage トリガー

Queue storage トリガーは、メッセージが Azure Queue storage に追加されると関数を実行します。

キュー トリガーを使用して、キューで新しい項目を受け取ったときに関数を開始します。 キュー メッセージは、関数への入力として提供されます。

C# 関数は、次の C# モードのいずれかを使用して作成できます。

  •     インプロセス クラス ライブラリ: Functions ランタイムと同じプロセスで実行されるコンパイル済みの C# 関数。
  • 分離ワーカー プロセス クラス ライブラリ: ランタイムから分離されたワーカー プロセスで実行されるコンパイル済みの C# 関数。 分離ワーカー プロセスは、非 LTS バージョンの .NET および.NET Framework で実行されている C# 関数をサポートするために必要です。
  • C# スクリプト: Azure portal で c# 関数を作成するときに主に使用されます。

次の例は、キュー項目が処理されるたびに キューをポーリングし、ログを書き込む C# 関数を示しています。

public static class QueueFunctions
{
    [FunctionName("QueueTrigger")]
    public static void QueueTrigger(
        [QueueTrigger("myqueue-items")] string myQueueItem, 
        ILogger log)
    {
        log.LogInformation($"C# function processed: {myQueueItem}");
    }
}

次の Java の例は、キュー myqueuename に格納されるトリガーされたメッセージを記録するストレージ キュー トリガー関数を示しています。

@FunctionName("queueprocessor")
public void run(
    @QueueTrigger(name = "msg",
                queueName = "myqueuename",
                connection = "myconnvarname") String message,
    final ExecutionContext context
) {
    context.getLogger().info(message);
}

次の例は、function.json ファイルのキュー トリガー バインドと、そのバインドを使用する JavaScript 関数を示しています。 この関数は、キュー項目が処理されるたびに myqueue-items キューをポーリングし、ログを書き込みます。

function.json ファイルを次に示します。

{
    "disabled": false,
    "bindings": [
        {
            "type": "queueTrigger",
            "direction": "in",
            "name": "myQueueItem",
            "queueName": "myqueue-items",
            "connection":"MyStorageConnectionAppSetting"
        }
    ]
}

これらのプロパティについては、「構成」セクションを参照してください。

Note

name パラメーターは、キュー項目ペイロードを含む JavaScript コードの context.bindings.<name> として反映されます。 このペイロードは、関数に対する 2 番目のパラメーターとしても渡されます。

JavaScript コードを次に示します。

module.exports = async function (context, message) {
    context.log('Node.js queue trigger function processed work item', message);
    // OR access using context.bindings.<name>
    // context.log('Node.js queue trigger function processed work item', context.bindings.myQueueItem);
    context.log('expirationTime =', context.bindingData.expirationTime);
    context.log('insertionTime =', context.bindingData.insertionTime);
    context.log('nextVisibleTime =', context.bindingData.nextVisibleTime);
    context.log('id =', context.bindingData.id);
    context.log('popReceipt =', context.bindingData.popReceipt);
    context.log('dequeueCount =', context.bindingData.dequeueCount);
};

function.json の name プロパティで名前が指定された については、「使用方法」セクションを参照してください。 ここに表示されているその他すべての変数については、「メッセージのメタデータ」セクションを参照してください。

次の例では、トリガーを使用してキュー メッセージを読み取って関数に渡す方法を示します。

ストレージ キュー トリガーは function.json ファイルで定義され、そこで queueTrigger に設定されます。

{
  "bindings": [
    {
      "name": "QueueItem",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "messages",
      "connection": "MyStorageConnectionAppSetting"
    }
  ]
}

Run.ps1 ファイルのコードによってパラメーターが として宣言され、関数でキュー メッセージを読み取ることができるようになります。

# Input bindings are passed in via param block.
param([string] $QueueItem, $TriggerMetadata)

# Write out the queue message and metadata to the information log.
Write-Host "PowerShell queue trigger function processed work item: $QueueItem"
Write-Host "Queue item expiration time: $($TriggerMetadata.ExpirationTime)"
Write-Host "Queue item insertion time: $($TriggerMetadata.InsertionTime)"
Write-Host "Queue item next visible time: $($TriggerMetadata.NextVisibleTime)"
Write-Host "ID: $($TriggerMetadata.Id)"
Write-Host "Pop receipt: $($TriggerMetadata.PopReceipt)"
Write-Host "Dequeue count: $($TriggerMetadata.DequeueCount)"

次の例では、トリガーを使用してキュー メッセージを読み取って関数に渡す方法を示します。

ストレージ キュー トリガーは function.json で定義され、そこで type に設定されます。

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "msg",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "messages",
      "connection": "AzureStorageQueuesConnectionString"
    }
  ]
}

_init_.py のコードによってパラメーターが として宣言され、関数でキュー メッセージを読み取ることができるようになります。

import logging
import json

import azure.functions as func

def main(msg: func.QueueMessage):
    logging.info('Python queue trigger function processed a queue item.')

    result = json.dumps({
        'id': msg.id,
        'body': msg.get_body().decode('utf-8'),
        'expiration_time': (msg.expiration_time.isoformat()
                            if msg.expiration_time else None),
        'insertion_time': (msg.insertion_time.isoformat()
                           if msg.insertion_time else None),
        'time_next_visible': (msg.time_next_visible.isoformat()
                              if msg.time_next_visible else None),
        'pop_receipt': msg.pop_receipt,
        'dequeue_count': msg.dequeue_count
    })

    logging.info(result)

属性

インプロセス分離ワーカー プロセスの C# ライブラリはどちらも、QueueTriggerAttribute 属性を使用して関数を定義します。 代わりに、C# スクリプトでは、function.json 構成ファイルを使用します。

C# クラス ライブラリでは、次の例のように、属性のコンストラクターは監視するキューの名前を受け取ります。

[FunctionName("QueueTrigger")]
public static void Run(
    [QueueTrigger("myqueue-items")] string myQueueItem, 
    ILogger log)
{
    ...
}

次の例で示すように、Connection プロパティを設定して、使用するストレージ アカウント接続ストリングを含むアプリ設定を指定できます。

[FunctionName("QueueTrigger")]
public static void Run(
    [QueueTrigger("myqueue-items", Connection = "StorageConnectionAppSetting")] string myQueueItem, 
    ILogger log)
{
    ....
}

注釈

QueueTrigger 注釈を使用すると、関数をトリガーするキューにアクセスできます。 次の例では、message パラメーターを使用して、キュー メッセージを関数で使用できるようにします。

package com.function;
import com.microsoft.azure.functions.annotation.*;
import java.util.Queue;
import com.microsoft.azure.functions.*;

public class QueueTriggerDemo {
    @FunctionName("QueueTriggerDemo")
    public void run(
        @QueueTrigger(name = "message", queueName = "messages", connection = "MyStorageConnectionAppSetting") String message,
        final ExecutionContext context
    ) {
        context.getLogger().info("Queue message: " + message);
    }
}
プロパティ 説明
name 関数シグネチャのパラメーター名を宣言します。 関数がトリガーされると、このパラメーターの値にはキュー メッセージの内容が含められます。
queueName ストレージ アカウントのキュー名を宣言します。
connection ストレージ アカウントの接続文字列を示します。

構成

次の表は、function.json ファイルと QueueTrigger 属性で設定したバインド構成のプロパティを説明しています。

function.json のプロパティ 説明
type queueTrigger に設定する必要があります。 このプロパティは、Azure Portal でトリガーを作成するときに自動で設定されます。
direction function.json ファイルの場合のみ。 in に設定する必要があります。 このプロパティは、Azure Portal でトリガーを作成するときに自動で設定されます。
name 関数コードでキュー項目ペイロードを含む変数の名前。
queueName ポーリングするキューの名前。
connection Azure キューへの接続方法を指定するアプリ設定または設定コレクションの名前。 「接続」を参照してください。

完全な例については、セクションの例を参照してください。

ローカルで開発する場合は、 コレクション内の local.settings.json ファイルにアプリケーション設定を追加します。

使用方法

注意

Azure Functions で想定されているのは base64 でエンコードされた文字列です。 (データを base64 でエンコードされた文字列として準備するために) エンコードの種類を調整する場合、それらはすべて呼び出し元のサービスに実装する必要があります。

Queue トリガーの使用方法は、拡張機能パッケージのバージョンと、関数アプリで使用される C# のモダリティによって異なり、次のいずれかになります。

インプロセス クラス ライブラリは、Functions ランタイムと同じプロセスで実行されるコンパイル済みの C# 関数です。

バージョンを選択すると、モードとバージョンの使用状況の詳細が表示されます。

string paramName のようなメソッド パラメーターを使用してメッセージ データにアクセスします。 paramNameQueueTriggerAttribute で指定された値です。 次の型のいずれにでもバインドできます。

  • プレーンオールド CLR オブジェクト (POCO)
  • string
  • byte[]
  • QueueMessage

オブジェクトにバインドすると、Functions ランタイムにより、JSON ペイロードを、コードに定義されている任意のクラスのインスタンスへと逆シリアル化する処理が試行されます。 QueueMessage の使用例については、拡張機能の GitHub リポジトリに関するページを参照してください。

属性は Connection プロパティを受け取りますが、 Connection を使用してストレージ アカウント接続を指定することもできます。 これは、ライブラリ内の他の関数とは異なるストレージ アカウントを使用する必要がある場合に実行できます。 コンストラクターは、ストレージ接続文字列を含むアプリ設定の名前を受け取ります。 属性は、パラメーター、メソッド、またはクラス レベルで適用できます。 次の例では、クラス レベルとメソッド レベルを示します。

[StorageAccount("ClassLevelStorageAppSetting")]
public static class AzureFunctions
{
    [FunctionName("StorageTrigger")]
    [StorageAccount("FunctionLevelStorageAppSetting")]
    public static void Run( //...
{
    ...
}

使用するストレージ アカウントは、次の順序で決定されます。

  • トリガーまたはバインド属性の Connection プロパティ。
  • トリガーまたはバインド属性と同じパラメーターに適用された StorageAccount 属性。
  • 関数に適用される StorageAccount 属性。
  • クラスに適用される StorageAccount 属性。
  • AzureWebJobsStorage アプリケーション設定で定義されている、関数アプリの既定のストレージ アカウント。

QueueTrigger 注釈を使用すると、関数をトリガーしたキュー メッセージにアクセスできます。

キュー項目ペイロードは、context.bindings.<NAME> を介して使用できます。ここで、<NAME>context.bindings.<NAME> で定義されている名前と一致します。 ペイロードが JSON の場合、値はオブジェクトに逆シリアル化されます。

name ファイルのバインドの name パラメーターで指定された名前と一致する文字列パラメーターを使用して、キュー メッセージにアクセスします。

QueueMessage として型指定されたパラメーターを使用して、キュー メッセージにアクセスします。

Metadata

キュー トリガーは、いくつかのメタデータ プロパティを提供します。 これらのプロパティは、他のバインドのバインド式の一部として、またはコードのパラメーターとして使用できます。

これらのプロパティは、CloudQueueMessage クラスのメンバーです。

プロパティ Type 説明
QueueTrigger string キュー ペイロード (有効な文字列の場合)。 キュー メッセージ ペイロードが文字列の場合、QueueTrigger は、QueueTriggername プロパティで指定された変数と同じ値になります。
DequeueCount int このメッセージがデキューされた回数。
ExpirationTime DateTimeOffset メッセージが期限切れになる時刻。
Id string キュー メッセージ ID。
InsertionTime DateTimeOffset メッセージがキューに追加された時刻。
NextVisibleTime DateTimeOffset メッセージが次に表示される時刻。
PopReceipt string メッセージのポップ受信。

接続

connection プロパティは、アプリを Azure キューに接続する方法を指定する環境構成への参照です。 次が指定されている場合があります。

  • 接続文字列を含むアプリケーション設定の名前
  • まとめて ID ベースの接続を定義する、複数のアプリケーション設定の共有プレフィックスの名前。

構成された値が、1 つの設定に完全一致し、プレフィックスがその他の設定とも一致する場合は、完全一致が使用されます。

接続文字列

接続文字列を取得するには、「ストレージ アカウント アクセス キーを管理する」の手順に従います。

この接続文字列は、バインド構成の connection プロパティで指定した値と同じ名前のアプリケーション設定に格納する必要があります。

アプリ設定の名前が "AzureWebJobs" で始まる場合は、ここで名前の残りの部分のみを指定できます。 たとえば、connection を "MyStorage" に設定した場合、Functions ランタイムは "AzureWebJobsMyStorage" という名前のアプリ設定を探します。connection を空のままにした場合、Functions ランタイムは、アプリ設定内の AzureWebJobsStorage という名前の既定のストレージ接続文字列を使用します。

ID ベースの接続

バージョン 5.x 以上の拡張機能を使用する場合は、シークレットを含む接続文字列の代わりに、アプリで Azure Active Directory ID を使用することができます。 これを行うには、トリガーおよびバインド構成の connection プロパティにマップされる共通のプレフィックスに設定を定義します。

connection を "AzureWebJobsStorage" に設定する場合は、「connection」を参照してください。 その他のすべての接続では、拡張機能に次のプロパティが必要です。

プロパティ 環境変数テンプレート 説明 値の例
Queue サービス URI <CONNECTION_NAME_PREFIX>__queueServiceUri1 HTTPS スキームを使用して接続している Queue サービスのデータ プレーン URI。 https://<storage_account_name>.queue.core.windows.net

1 をエイリアスとして使用できます。 両方の形式が指定された場合、queueServiceUri の形式が使用されます。 serviceUri 形式は、全体の接続構成が BLOB、キュー、テーブル間で使用される場合は、指定できません。

接続をカスタマイズするために、追加のプロパティを設定できます。 「ID ベース接続に共通のプロパティ」を参照してください。

Azure Functions サービスでホストされている場合、ID ベースの接続では、マネージド ID が使用されます。 ユーザー割り当て ID を credential および clientID プロパティで指定できますが、システム割り当て ID が既定で使用されます。 リソース ID を使用したユーザー割り当て ID の構成はサポートされていないことに注意してください。 ローカル開発などの他のコンテキストで実行する場合は、代わりに開発者 ID が使用されますが、カスタマイズすることもできます。 ID ベースの接続によるローカル開発に関するページをご覧ください。

ID にアクセス許可を付与する

使用されている ID が何であれ、目的のアクションを実行するためのアクセス許可が必要です。 ほとんどの Azure では、これはそれらのアクセス許可を提供する組み込みロールまたはカスタム ロールを使って、Azure RBAC でロールを割り当てる必要があることを意味します。

重要

すべてのコンテキストに必要ではない一部のアクセス許可がターゲット サービスによって公開される場合があります。 可能であれば、最小限の特権の原則に従い、必要な特権だけを ID に付与します。 たとえば、アプリがデータ ソースからの読み取りのみを行う必要がある場合は、読み取りアクセス許可のみを持つロールを使用します。 サービスへの書き込みも可能なロールを割り当てることは、読み取り操作に対するアクセス許可が過剰になるため、不適切です。 同様に、ロールの割り当てが、読み取る必要のあるリソースだけに限定されていることを確認する必要があります。

実行時にキューへのアクセスを提供するロールの割り当てを作成する必要があります。 所有者のような管理ロールでは十分ではありません。 次の表は、通常の操作で Queue Storage の拡張機能を使用するときに推奨される組み込みロールを示しています。 アプリケーションでは、記述したコードに基づいて追加のアクセス許可が必要になる場合があります。

[バインドの種類] 組み込みロールの例
トリガー ストレージ キュー データ閲覧者ストレージ キュー データのメッセージ プロセッサ
出力バインド ストレージ キュー データ共同作成者ストレージ キュー データのメッセージ送信者

有害メッセージ

キュー トリガー関数が失敗すると、Azure Functions は、その関数を特定のキュー メッセージに対して (最初の試行を含め) 最大 5 回再試行します。 5 回の試行すべてで失敗した場合、Functions ランタイムは、 originalqueuename>-poison という名前のキューにメッセージを追加します。 メッセージのログを取得するか、手動での対処が必要であるという通知を送信することにより有害キューからのメッセージを処理する関数が記述できます。

有害メッセージを手動で処理するには、キュー メッセージの dequeueCount を確認します。

ピーク ロック

ピーク ロック パターンは、キュー トリガーに対して自動的に行われます。 デキューされたメッセージは、非表示とマークされ、Storage サービスが管理するタイムアウトと関連付けられます。

その関数が開始されると、次の条件下でメッセージの処理が開始されます。

  • 関数が成功すると、関数の実行が完了し、メッセージが削除されます。
  • 関数が失敗すると、メッセージの可視性がリセットされます。 リセットされると、関数が次に新しいメッセージを要求するときにメッセージが再処理されます。
  • クラッシュが原因で関数が完了しない場合は、メッセージの可視性の有効期限が切れ、メッセージはキューに再表示されます。

可視性のメカニズムは、Functions ランタイムではなく、Storage サービスによりすべて処理されます。

ポーリング アルゴリズム

キュー トリガーは、アイドル状態のキューのポーリングがストレージ トランザクション コストに与える影響を軽減するために、ランダムな指数バックオフ アルゴリズムを実装します。

アルゴリズムでは次のロジックが使用されます。

  • メッセージが見つかると、ランタイムにより 100 ミリ秒間待機してから、別のメッセージが確認されます
  • メッセージが見つからない場合は、約 200 ミリ秒間待機してからもう一度お試しください。
  • 再試行後もキュー メッセージが取得できなかった場合、待ち時間が最大になるまで再試行が続けられます。既定の最大待ち時間は 1 分間です。
  • 最大待ち時間は、maxPollingInterval内の maxPollingInterval プロパティで構成できます。

ローカル開発の場合、最大ポーリング間隔は既定で 2 秒に設定されます。

Note

従量課金プランで関数アプリをホストする場合の課金については、ランタイムがポーリングに費やした時間は課金されません。

コンカレンシー

待機中のキュー メッセージが複数存在する場合、キュー トリガーはメッセージのバッチを取得し、関数インスタンスを同時に呼び出してそれらを処理します。 既定では、このバッチ サイズは 16 です。 処理されている数が 8 まで減少すると、このランタイムは別のバッチを取得し、それらのメッセージの処理を開始します。 そのため、1 つの仮想マシン (VM) 上で 1 関数あたりに処理されている同時実行メッセージの最大数は 24 です。 この制限は、各 VM 上のキューによってトリガーされる各関数に個別に適用されます。 関数アプリが複数の VM にスケールアウトした場合、各 VM はトリガーを待機し、関数を実行しようとします。 たとえば、関数アプリが 3 つの VM にスケールアウトした場合、キューによってトリガーされる 1 つの関数の同時実行インスタンスの既定の最大数は 72 です。

新しいバッチを取得するためのバッチ サイズとしきい値は、host.json ファイルで構成できます。 関数アプリ内のキューによってトリガーされる関数の並列実行を最小限に抑えたい場合は、このバッチ サイズを 1 に設定できます。 この設定によってコンカレンシーが解消されるのは、関数アプリが 1 つの仮想マシン (VM) 上で実行される場合に限ります。

キュー トリガーは、関数がキュー メッセージを複数回同時に処理することを自動的に防止します。

host.json プロパティ

host.json ファイルには、キュー トリガーの動作を制御する設定が含まれています。 使用可能な設定の詳細については、「host.json 設定」を参照してください。

次のステップ