Azure Functions の Azure IoT Hub トリガー

この記事では、IoT Hub で Azure Functions のバインドを使用する方法について説明します。 IoT Hub のサポートは、Azure Event Hubs のバインドに基づいています。

セットアップと構成の詳細については、概要に関するページをご覧ください。

重要

次のコード サンプルでは、Event Hub API が使用されています (使用されている構文は IoT Hub の関数に適用されるものです)。

関数トリガーを使用して、イベント ハブのイベント ストリームに送信されたイベントに応答します。 トリガーを設定するには、基になるイベント ハブへの読み取りアクセスが必要です。 関数がトリガーされると、その関数に渡されるメッセージが文字列として型指定されます。

従量課金プランと Premium プランに対する Event Hubs のスケーリングの決定は、ターゲットベースのスケーリングによって行われます。 詳細については、「ターゲットベースのスケーリング」を参照してください。

イベント ハブのイベント ストリームに送信されたイベントに、Azure Functions がトリガーを使用して応答する方法については、「Azure で Event Hubs をサーバーレス関数と統合する」を参照してください。

重要

この記事では、タブを使用して、Node.js プログラミング モデルの複数のバージョンに対応しています。 v4 モデルは一般提供されており、JavaScript と TypeScript の開発者にとって、より柔軟で直感的なエクスペリエンスが得られるように設計されています。 v4 モデルの動作の詳細については、Azure Functions Node.js 開発者ガイドを参照してください。 v3 と v4 の違いの詳細については、移行ガイドを参照してください。

Azure Functions では、Python の 2 つのプログラミング モデルがサポートされています。 バインドを定義する方法は、選択したプログラミング モデルによって異なります。

Python v2 プログラミング モデルでは、Python 関数コードでデコレーターを使用してバインドを直接定義できます。 詳細については、「Python 開発者ガイド」を参照してください。

この記事は、両方のプログラミング モデルをサポートしています。

次の例は、イベント ハブに基づいてトリガーされる C# 関数を示しています。この入力メッセージ文字列はログに書き込まれます。

{
    private readonly ILogger<EventHubsFunction> _logger;

    public EventHubsFunction(ILogger<EventHubsFunction> logger)
    {
        _logger = logger;
    }

    [Function(nameof(EventHubFunction))]
    [FixedDelayRetry(5, "00:00:10")]
    [EventHubOutput("dest", Connection = "EventHubConnection")]
    public string EventHubFunction(
        [EventHubTrigger("src", Connection = "EventHubConnection")] string[] input,
        FunctionContext context)
    {
        _logger.LogInformation("First Event Hubs triggered message: {msg}", input[0]);

        var message = $"Output message created at {DateTime.Now}";
        return message;
    }

次の例は、Event Hubs トリガーの TypeScript 関数を示しています。 この関数は、イベント メタデータを読み取り、メッセージをログに記録します。

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(message: unknown, context: InvocationContext): Promise<void> {
    context.log('Event hub function processed message:', message);
    context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
    context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
    context.log('Offset =', context.triggerMetadata.offset);
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: eventHubTrigger1,
});

イベントをまとめて受け取るには、次の例に示すように、cardinalitymany に設定します。

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(messages: unknown[], context: InvocationContext): Promise<void> {
    context.log(`Event hub function processed ${messages.length} messages`);
    for (let i = 0; i < messages.length; i++) {
        context.log('Event hub message:', messages[i]);
        context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
        context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
        context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
    }
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: eventHubTrigger1,
});

次の例は、Event Hubs トリガーの JavaScript 関数を示しています。 この関数は、イベント メタデータを読み取り、メッセージをログに記録します。

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: (message, context) => {
        context.log('Event hub function processed message:', message);
        context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
        context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
        context.log('Offset =', context.triggerMetadata.offset);
    },
});

イベントをまとめて受け取るには、次の例に示すように、cardinalitymany に設定します。

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: (messages, context) => {
        context.log(`Event hub function processed ${messages.length} messages`);
        for (let i = 0; i < messages.length; i++) {
            context.log('Event hub message:', messages[i]);
            context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
            context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
            context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
        }
    },
});

PowerShell コードを次に示します。

param($eventHubMessages, $TriggerMetadata)

Write-Host "PowerShell eventhub trigger function called for message array: $eventHubMessages"

$eventHubMessages | ForEach-Object { Write-Host "Processed message: $_" }

次の例は、Event Hubs トリガー バインドと、そのバインドが使用される Python 関数を示しています。 この関数は、イベント メタデータを読み取り、メッセージをログに記録します。 この例は、v1 と v2 のどちらの Python プログラミング モデルを使用するかによって異なります。

import logging
import azure.functions as func

app = func.FunctionApp()

@app.function_name(name="EventHubTrigger1")
@app.event_hub_message_trigger(arg_name="myhub", 
                               event_hub_name="<EVENT_HUB_NAME>",
                               connection="<CONNECTION_SETTING>") 
def test_function(myhub: func.EventHubEvent):
    logging.info('Python EventHub trigger processed an event: %s',
                myhub.get_body().decode('utf-8'))

次の例は、Event Hubs トリガーのメッセージ本文をログに記録する Event Hubs トリガー バインドを示しています。

@FunctionName("ehprocessor")
public void eventHubProcessor(
  @EventHubTrigger(name = "msg",
                  eventHubName = "myeventhubname",
                  connection = "myconnvarname") String message,
       final ExecutionContext context )
       {
          context.getLogger().info(message);
 }

Java 関数ランタイム ライブラリで、その値がイベント ハブに由来するパラメーター上で EventHubTrigger 注釈を使用します。 これらの注釈を使用したパラメーターによって、イベントを受信したときに関数が実行されます。 この注釈は、Java のネイティブ型、POJO、または Optional<T> を使用した null 許容値で使用できます。

次の例は、Date 階層である整形式の BlobOutput パスを提供すると共に、イベントのさらなるイントロスペクションのための SystemProperties およびその他のバインディング オプションの幅広い使用方法を示しています。

package com.example;
import java.util.Map;
import java.time.ZonedDateTime;

import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;

/**
 * Azure Functions with Event Hub trigger.
 * and Blob Output using date in path along with message partition ID
 * and message sequence number from EventHub Trigger Properties
 */
public class EventHubReceiver {

    @FunctionName("EventHubReceiver")
    @StorageAccount("bloboutput")

    public void run(
            @EventHubTrigger(name = "message",
                eventHubName = "%eventhub%",
                consumerGroup = "%consumergroup%",
                connection = "eventhubconnection",
                cardinality = Cardinality.ONE)
            String message,

            final ExecutionContext context,

            @BindingName("Properties") Map<String, Object> properties,
            @BindingName("SystemProperties") Map<String, Object> systemProperties,
            @BindingName("PartitionContext") Map<String, Object> partitionContext,
            @BindingName("EnqueuedTimeUtc") Object enqueuedTimeUtc,

            @BlobOutput(
                name = "outputItem",
                path = "iotevents/{datetime:yy}/{datetime:MM}/{datetime:dd}/{datetime:HH}/" +
                       "{datetime:mm}/{PartitionContext.PartitionId}/{SystemProperties.SequenceNumber}.json")
            OutputBinding<String> outputItem) {

        var et = ZonedDateTime.parse(enqueuedTimeUtc + "Z"); // needed as the UTC time presented does not have a TZ
                                                             // indicator
        context.getLogger().info("Event hub message received: " + message + ", properties: " + properties);
        context.getLogger().info("Properties: " + properties);
        context.getLogger().info("System Properties: " + systemProperties);
        context.getLogger().info("partitionContext: " + partitionContext);
        context.getLogger().info("EnqueuedTimeUtc: " + et);

        outputItem.setValue(message);
    }
}

属性

インプロセス分離ワーカー プロセスの C# ライブラリはどちらも、属性を使用してトリガーを構成します。 C# スクリプトでは、C# スクリプト ガイドで説明されているように、代わりに function.json 構成ファイルを使用します。

EventHubTriggerAttribute を使って、次のプロパティをサポートするトリガーをイベント ハブに対して定義します。

パラメーター 説明
EventHubName イベント ハブの名前。 イベント ハブの名前は接続文字列にも存在し、その値が実行時にこのプロパティをオーバーライドします。 %eventHubName% のようにアプリの設定で参照できます。
ConsumerGroup ハブのイベントのサブスクライブに使用されるコンシューマー グループを設定する、省略可能なプロパティ。 省略した場合は、$Default コンシューマー グループが使用されます。
接続 Event Hubs への接続方法を指定するアプリ設定または設定コレクションの名前。 詳細については、「接続」を参照してください

デコレータ

Python v2 プログラミング モデルにのみ適用されます。

デコレータを使用して定義された Python v2 関数の場合、cosmos_db_trigger に次のプロパティがあります。

プロパティ 説明
arg_name 関数コード内のイベント項目を表す変数の名前。
event_hub_name イベント ハブの名前。 イベント ハブの名前は接続文字列にも存在し、その値が実行時にこのプロパティをオーバーライドします。
connection Event Hubs への接続方法を指定するアプリ設定または設定コレクションの名前。 「接続」を参照してください。

function.json を使用して定義された Python 関数については、[構成] セクションを参照してください。

注釈

Java 関数ランタイム ライブラリでは、EventHubTrigger 注釈を使用します。この注釈は次の設定をサポートしています。

構成

"Python v1 プログラミング モデルにのみ適用されます。"

次の表では、app.eventHub() メソッドに渡される options オブジェクトに対して設定できるプロパティについて説明します。

プロパティ 説明
eventHubName イベント ハブの名前。 イベント ハブの名前は接続文字列にも存在し、その値が実行時にこのプロパティをオーバーライドします。 アプリ設定%eventHubName% を介して参照できます
consumerGroup ハブのイベントのサブスクライブに使用されるコンシューマー グループを設定する、省略可能なプロパティ。 省略した場合は、$Default コンシューマー グループが使用されます。
cardinality バッチ処理を有効にするには many に設定します。 省略するか、one に設定した場合、1 つのメッセージが関数に渡されます。
connection Event Hubs への接続方法を指定するアプリ設定または設定コレクションの名前。 「接続」を参照してください。

次の表は、function.json ファイルで設定するトリガー構成のプロパティを説明しています。これらは、ランタイムのバージョンによって異なります。

function.json のプロパティ 説明
type eventHubTrigger に設定する必要があります。 このプロパティは、Azure Portal でトリガーを作成するときに自動で設定されます。
direction in に設定する必要があります。 このプロパティは、Azure Portal でトリガーを作成するときに自動で設定されます。
name 関数コード内のイベント項目を表す変数の名前。
eventHubName イベント ハブの名前。 イベント ハブの名前は接続文字列にも存在し、その値が実行時にこのプロパティをオーバーライドします。 アプリ設定%eventHubName% を介して参照できます
consumerGroup ハブのイベントのサブスクライブに使用されるコンシューマー グループを設定する、省略可能なプロパティ。 省略した場合は、$Default コンシューマー グループが使用されます。
cardinality バッチ処理を有効にするには many に設定します。 省略するか、one に設定した場合、1 つのメッセージが関数に渡されます。
connection Event Hubs への接続方法を指定するアプリ設定または設定コレクションの名前。 「接続」を参照してください。

ローカルで開発する場合は、Values コレクション内の local.settings.json ファイルにアプリケーション設定を追加します。

使用法

Event Hubs トリガーと IoT Hub トリガーのスケーリングの詳細については、「Azure Functions でのイベントの使用」を参照してください。

Event Hubs 出力バインドでサポートされるパラメーター型は、Functions ランタイムのバージョン、拡張機能パッケージのバージョン、および使用される C# のモダリティによって異なります。

関数で 1 つのイベントを処理するとき、Event Hubs トリガーは次の型にバインドできます。

Type 説明
string イベントを表す文字列。 イベントが単純なテキストのときに使用します。
byte[] イベントのバイト数。
JSON シリアル化可能な型 イベントに JSON データが含まれている場合、Functions は JSON データを単純な従来の CLR オブジェクト (POCO) 型に逆シリアル化しようとします。
Azure.Messaging.EventHubs.EventData1 イベント オブジェクト。
古いバージョンの Event Hubs SDK から移行する場合、このバージョンでは EventBody を優先してレガシの Body 型のサポートが削除されることに注意してください。

関数でイベントのバッチを処理するとき、Event Hubs トリガーは次の型にバインドできます。

Type 説明
string[] バッチのイベントの配列を表す文字列。 各エントリは 1 つのイベントを表します。
EventData[]1 バッチのイベントの配列を表す、Azure.Messaging.EventHubs.EventData のインスタンス。 各エントリは 1 つのイベントを表します。
T[] (T は JSON シリアル化可能な型1) バッチのイベントの配列を表す、カスタム POCO 型のインスタンス。 各エントリは 1 つのイベントを表します。

1 これらの型を使用するには、Microsoft.Azure.Functions.Worker.Extensions.EventHubs 5.5.0 以降SDK 型バインドの一般的な依存関係を参照する必要があります。

パラメーターの型は、次のいずれかになります。

  • Int、String、byte[] などのネイティブ Java 型。
  • Optional を使用する Null 許容値。
  • 任意の POJO 型。

詳細については、EventHubTrigger リファレンスを参照してください。

イベント メタデータ

Event Hubs トリガーには、いくつかのメタデータ プロパティがあります。 メタデータ プロパティは、他のバインドのバインド式の一部として、またはコードのパラメーターとして使用できます。 これらのプロパティは、EventData クラスに由来します。

プロパティ タイプ 説明
PartitionContext PartitionContext PartitionContext のインスタンスです。
EnqueuedTimeUtc DateTime エンキューされた時刻 (UTC)。
Offset string イベント ハブ パーティション ストリームを基準としたデータのオフセット。 オフセットは、Event Hubs ストリーム内のイベントのマーカーまたは識別子です。 この識別子は、Event Hubs ストリームのパーティション内で一意です。
PartitionKey string イベント データを送信するパーティション。
Properties IDictionary<String,Object> イベント データのユーザー プロパティ。
SequenceNumber Int64 イベントの論理シーケンス番号。
SystemProperties IDictionary<String,Object> イベント データなどのシステム プロパティ。

この記事の前半でこれらのプロパティを使用しているコード例を参照してください。

接続

connection プロパティは、接続文字列を含むアプリケーション設定の名前を含む環境構成への参照です。 名前空間[接続情報] ボタンを選択すると、この接続文字列を取得できます。 接続文字列は、イベントハブ自体ではなく、Event Hubs 名前空間のものである必要があります。

この接続文字列には、機能をアクティブにするために少なくとも "読み取り" アクセス許可が必要です。

この接続文字列は、バインド構成の connection プロパティで指定した値と同じ名前のアプリケーション設定に格納する必要があります。

Note

ID ベースの接続は、IoT Hub トリガーではサポートされていません。 マネージド ID をエンドツーエンドで使用する必要がある場合は、代わりに IoT Hub ルーティングを使用して、制御するイベント ハブにデータを送信できます。 そうすることで、マネージド ID を使用して送信ルーティングを認証でき、マネージド ID を使用してそのイベント ハブからイベントを読み取ることができます。

host.json プロパティ

host.json ファイルには、Event Hub トリガーの動作を制御する設定が含まれています。 使用可能な設定の詳細については、「host.json 設定」を参照してください。

次のステップ