Azure Functions の Azure Event Hubs トリガー

この記事では、Azure Functions で Azure Event Hubs トリガーを使用する方法について説明します。 Azure Functions は、Event Hubs のトリガーおよび出力バインドをサポートしています。

セットアップと構成の詳細については、概要に関するページをご覧ください。

関数トリガーを使用して、イベント ハブのイベント ストリームに送信されたイベントに応答します。 トリガーを設定するには、基になるイベント ハブへの読み取りアクセスが必要です。 関数がトリガーされると、その関数に渡されるメッセージが文字列として型指定されます。

従量課金プランと Premium プランに対する Event Hubs のスケーリングの決定は、ターゲットベースのスケーリングによって行われます。 詳細については、「ターゲットベースのスケーリング」を参照してください。

イベント ハブのイベント ストリームに送信されたイベントに、Azure Functions がトリガーを使用して応答する方法については、「Azure で Event Hubs をサーバーレス関数と統合する」を参照してください。

重要

この記事では、タブを使用して、Node.js プログラミング モデルの複数のバージョンに対応しています。 v4 モデルは一般提供されており、JavaScript と TypeScript の開発者にとって、より柔軟で直感的なエクスペリエンスが得られるように設計されています。 v4 モデルの動作の詳細については、Azure Functions Node.js 開発者ガイドを参照してください。 v3 と v4 の違いの詳細については、移行ガイドを参照してください。

Azure Functions では、Python の 2 つのプログラミング モデルがサポートされています。 バインドを定義する方法は、選択したプログラミング モデルによって異なります。

Python v2 プログラミング モデルでは、Python 関数コードでデコレーターを使用してバインドを直接定義できます。 詳細については、「Python 開発者ガイド」を参照してください。

この記事は、両方のプログラミング モデルをサポートしています。

次の例は、イベント ハブに基づいてトリガーされる C# 関数を示しています。この入力メッセージ文字列はログに書き込まれます。

{
    private readonly ILogger<EventHubsFunction> _logger;

    public EventHubsFunction(ILogger<EventHubsFunction> logger)
    {
        _logger = logger;
    }

    [Function(nameof(EventHubFunction))]
    [FixedDelayRetry(5, "00:00:10")]
    [EventHubOutput("dest", Connection = "EventHubConnection")]
    public string EventHubFunction(
        [EventHubTrigger("src", Connection = "EventHubConnection")] string[] input,
        FunctionContext context)
    {
        _logger.LogInformation("First Event Hubs triggered message: {msg}", input[0]);

        var message = $"Output message created at {DateTime.Now}";
        return message;
    }

次の例は、Event Hubs トリガーの TypeScript 関数を示しています。 この関数は、イベント メタデータを読み取り、メッセージをログに記録します。

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(message: unknown, context: InvocationContext): Promise<void> {
    context.log('Event hub function processed message:', message);
    context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
    context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
    context.log('Offset =', context.triggerMetadata.offset);
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: eventHubTrigger1,
});

イベントをまとめて受け取るには、次の例に示すように、cardinalitymany に設定します。

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(messages: unknown[], context: InvocationContext): Promise<void> {
    context.log(`Event hub function processed ${messages.length} messages`);
    for (let i = 0; i < messages.length; i++) {
        context.log('Event hub message:', messages[i]);
        context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
        context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
        context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
    }
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: eventHubTrigger1,
});

次の例は、Event Hubs トリガーの JavaScript 関数を示しています。 この関数は、イベント メタデータを読み取り、メッセージをログに記録します。

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: (message, context) => {
        context.log('Event hub function processed message:', message);
        context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
        context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
        context.log('Offset =', context.triggerMetadata.offset);
    },
});

イベントをまとめて受け取るには、次の例に示すように、cardinalitymany に設定します。

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: (messages, context) => {
        context.log(`Event hub function processed ${messages.length} messages`);
        for (let i = 0; i < messages.length; i++) {
            context.log('Event hub message:', messages[i]);
            context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
            context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
            context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
        }
    },
});

PowerShell コードを次に示します。

param($eventHubMessages, $TriggerMetadata)

Write-Host "PowerShell eventhub trigger function called for message array: $eventHubMessages"

$eventHubMessages | ForEach-Object { Write-Host "Processed message: $_" }

次の例は、Event Hubs トリガー バインドと、そのバインドが使用される Python 関数を示しています。 この関数は、イベント メタデータを読み取り、メッセージをログに記録します。 この例は、v1 と v2 のどちらの Python プログラミング モデルを使用するかによって異なります。

import logging
import azure.functions as func

app = func.FunctionApp()

@app.function_name(name="EventHubTrigger1")
@app.event_hub_message_trigger(arg_name="myhub", 
                               event_hub_name="<EVENT_HUB_NAME>",
                               connection="<CONNECTION_SETTING>") 
def test_function(myhub: func.EventHubEvent):
    logging.info('Python EventHub trigger processed an event: %s',
                myhub.get_body().decode('utf-8'))

次の例は、Event Hubs トリガーのメッセージ本文をログに記録する Event Hubs トリガー バインドを示しています。

@FunctionName("ehprocessor")
public void eventHubProcessor(
  @EventHubTrigger(name = "msg",
                  eventHubName = "myeventhubname",
                  connection = "myconnvarname") String message,
       final ExecutionContext context )
       {
          context.getLogger().info(message);
 }

Java 関数ランタイム ライブラリで、その値がイベント ハブに由来するパラメーター上で EventHubTrigger 注釈を使用します。 これらの注釈を使用したパラメーターによって、イベントを受信したときに関数が実行されます。 この注釈は、Java のネイティブ型、POJO、または Optional<T> を使用した null 許容値で使用できます。

次の例は、Date 階層である整形式の BlobOutput パスを提供すると共に、イベントのさらなるイントロスペクションのための SystemProperties およびその他のバインディング オプションの幅広い使用方法を示しています。

package com.example;
import java.util.Map;
import java.time.ZonedDateTime;

import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;

/**
 * Azure Functions with Event Hub trigger.
 * and Blob Output using date in path along with message partition ID
 * and message sequence number from EventHub Trigger Properties
 */
public class EventHubReceiver {

    @FunctionName("EventHubReceiver")
    @StorageAccount("bloboutput")

    public void run(
            @EventHubTrigger(name = "message",
                eventHubName = "%eventhub%",
                consumerGroup = "%consumergroup%",
                connection = "eventhubconnection",
                cardinality = Cardinality.ONE)
            String message,

            final ExecutionContext context,

            @BindingName("Properties") Map<String, Object> properties,
            @BindingName("SystemProperties") Map<String, Object> systemProperties,
            @BindingName("PartitionContext") Map<String, Object> partitionContext,
            @BindingName("EnqueuedTimeUtc") Object enqueuedTimeUtc,

            @BlobOutput(
                name = "outputItem",
                path = "iotevents/{datetime:yy}/{datetime:MM}/{datetime:dd}/{datetime:HH}/" +
                       "{datetime:mm}/{PartitionContext.PartitionId}/{SystemProperties.SequenceNumber}.json")
            OutputBinding<String> outputItem) {

        var et = ZonedDateTime.parse(enqueuedTimeUtc + "Z"); // needed as the UTC time presented does not have a TZ
                                                             // indicator
        context.getLogger().info("Event hub message received: " + message + ", properties: " + properties);
        context.getLogger().info("Properties: " + properties);
        context.getLogger().info("System Properties: " + systemProperties);
        context.getLogger().info("partitionContext: " + partitionContext);
        context.getLogger().info("EnqueuedTimeUtc: " + et);

        outputItem.setValue(message);
    }
}

属性

インプロセス分離ワーカー プロセスの C# ライブラリはどちらも、属性を使用してトリガーを構成します。 C# スクリプトでは、C# スクリプト ガイドで説明されているように、代わりに function.json 構成ファイルを使用します。

EventHubTriggerAttribute を使って、次のプロパティをサポートするトリガーをイベント ハブに対して定義します。

パラメーター 説明
EventHubName イベント ハブの名前。 イベント ハブの名前は接続文字列にも存在し、その値が実行時にこのプロパティをオーバーライドします。 %eventHubName% のようにアプリの設定で参照できます。
ConsumerGroup ハブのイベントのサブスクライブに使用されるコンシューマー グループを設定する、省略可能なプロパティ。 省略した場合は、$Default コンシューマー グループが使用されます。
接続 Event Hubs への接続方法を指定するアプリ設定または設定コレクションの名前。 詳細については、「接続」を参照してください

デコレータ

Python v2 プログラミング モデルにのみ適用されます。

デコレータを使用して定義された Python v2 関数の場合、cosmos_db_trigger に次のプロパティがあります。

プロパティ 説明
arg_name 関数コード内のイベント項目を表す変数の名前。
event_hub_name イベント ハブの名前。 イベント ハブの名前は接続文字列にも存在し、その値が実行時にこのプロパティをオーバーライドします。
connection Event Hubs への接続方法を指定するアプリ設定または設定コレクションの名前。 「接続」を参照してください。

function.json を使用して定義された Python 関数については、[構成] セクションを参照してください。

注釈

Java 関数ランタイム ライブラリでは、EventHubTrigger 注釈を使用します。この注釈は次の設定をサポートしています。

構成

"Python v1 プログラミング モデルにのみ適用されます。"

次の表では、app.eventHub() メソッドに渡される options オブジェクトに対して設定できるプロパティについて説明します。

プロパティ 説明
eventHubName イベント ハブの名前。 イベント ハブの名前は接続文字列にも存在し、その値が実行時にこのプロパティをオーバーライドします。 アプリ設定%eventHubName% を介して参照できます
consumerGroup ハブのイベントのサブスクライブに使用されるコンシューマー グループを設定する、省略可能なプロパティ。 省略した場合は、$Default コンシューマー グループが使用されます。
cardinality バッチ処理を有効にするには many に設定します。 省略するか、one に設定した場合、1 つのメッセージが関数に渡されます。
connection Event Hubs への接続方法を指定するアプリ設定または設定コレクションの名前。 「接続」を参照してください。

次の表は、function.json ファイルで設定するトリガー構成のプロパティを説明しています。これらは、ランタイムのバージョンによって異なります。

function.json のプロパティ 説明
type eventHubTrigger に設定する必要があります。 このプロパティは、Azure Portal でトリガーを作成するときに自動で設定されます。
direction in に設定する必要があります。 このプロパティは、Azure Portal でトリガーを作成するときに自動で設定されます。
name 関数コード内のイベント項目を表す変数の名前。
eventHubName イベント ハブの名前。 イベント ハブの名前は接続文字列にも存在し、その値が実行時にこのプロパティをオーバーライドします。 アプリ設定%eventHubName% を介して参照できます
consumerGroup ハブのイベントのサブスクライブに使用されるコンシューマー グループを設定する、省略可能なプロパティ。 省略した場合は、$Default コンシューマー グループが使用されます。
cardinality バッチ処理を有効にするには many に設定します。 省略するか、one に設定した場合、1 つのメッセージが関数に渡されます。
connection Event Hubs への接続方法を指定するアプリ設定または設定コレクションの名前。 「接続」を参照してください。

ローカルで開発する場合は、Values コレクション内の local.settings.json ファイルにアプリケーション設定を追加します。

使用法

Event Hubs トリガーと IoT Hub トリガーのスケーリングの詳細については、「Azure Functions でのイベントの使用」を参照してください。

Event Hubs 出力バインドでサポートされるパラメーター型は、Functions ランタイムのバージョン、拡張機能パッケージのバージョン、および使用される C# のモダリティによって異なります。

関数で 1 つのイベントを処理するとき、Event Hubs トリガーは次の型にバインドできます。

Type 説明
string イベントを表す文字列。 イベントが単純なテキストのときに使用します。
byte[] イベントのバイト数。
JSON シリアル化可能な型 イベントに JSON データが含まれている場合、Functions は JSON データを単純な従来の CLR オブジェクト (POCO) 型に逆シリアル化しようとします。
Azure.Messaging.EventHubs.EventData1 イベント オブジェクト。
古いバージョンの Event Hubs SDK から移行する場合、このバージョンでは EventBody を優先してレガシの Body 型のサポートが削除されることに注意してください。

関数でイベントのバッチを処理するとき、Event Hubs トリガーは次の型にバインドできます。

Type 説明
string[] バッチのイベントの配列を表す文字列。 各エントリは 1 つのイベントを表します。
EventData[]1 バッチのイベントの配列を表す、Azure.Messaging.EventHubs.EventData のインスタンス。 各エントリは 1 つのイベントを表します。
T[] (T は JSON シリアル化可能な型1) バッチのイベントの配列を表す、カスタム POCO 型のインスタンス。 各エントリは 1 つのイベントを表します。

1 これらの型を使用するには、Microsoft.Azure.Functions.Worker.Extensions.EventHubs 5.5.0 以降SDK 型バインドの一般的な依存関係を参照する必要があります。

パラメーターの型は、次のいずれかになります。

  • Int、String、byte[] などのネイティブ Java 型。
  • Optional を使用する Null 許容値。
  • 任意の POJO 型。

詳細については、EventHubTrigger リファレンスを参照してください。

イベント メタデータ

Event Hubs トリガーには、いくつかのメタデータ プロパティがあります。 メタデータ プロパティは、他のバインドのバインド式の一部として、またはコードのパラメーターとして使用できます。 これらのプロパティは、EventData クラスに由来します。

プロパティ タイプ 説明
PartitionContext PartitionContext PartitionContext のインスタンスです。
EnqueuedTimeUtc DateTime エンキューされた時刻 (UTC)。
Offset string イベント ハブ パーティション ストリームを基準としたデータのオフセット。 オフセットは、Event Hubs ストリーム内のイベントのマーカーまたは識別子です。 この識別子は、Event Hubs ストリームのパーティション内で一意です。
PartitionKey string イベント データを送信するパーティション。
Properties IDictionary<String,Object> イベント データのユーザー プロパティ。
SequenceNumber Int64 イベントの論理シーケンス番号。
SystemProperties IDictionary<String,Object> イベント データなどのシステム プロパティ。

この記事の前半でこれらのプロパティを使用しているコード例を参照してください。

接続

connection プロパティは、アプリを Event Hubs に接続する方法を指定する環境構成への参照です。 次が指定されている場合があります。

  • 接続文字列を含むアプリケーション設定の名前
  • まとめて ID ベースの接続を定義する、複数のアプリケーション設定の共有プレフィックスの名前。

構成された値が、1 つの設定に完全一致し、プレフィックスがその他の設定とも一致する場合は、完全一致が使用されます。

接続文字列

この接続文字列を取得するには、イベント ハブ自体ではなく、"名前空間" の [接続情報] をクリックします。 接続文字列は、イベントハブ自体ではなく、Event Hubs 名前空間のものである必要があります。

トリガーに使用する場合、接続文字列には、機能を有効にするために少なくとも "読み取り" アクセス許可が必要です。 出力バインディングに使用する場合、接続文字列には、イベント ストリームにメッセージを送信するための "送信" アクセス許可が必要です。

この接続文字列は、バインディング構成の connection プロパティで指定した値と同じ名前のアプリケーション設定に格納する必要があります。

ID ベースの接続

シークレットを含む接続文字列を使う代わりに、拡張機能のバージョン 5.x 以降を使用している場合は、アプリで Microsoft Entra ID を使用できます。 これを行うには、トリガーおよびバインド構成の connection プロパティにマップされる共通のプレフィックスに設定を定義します。

このモードでは、拡張機能に次のプロパティが必要です。

プロパティ 環境変数テンプレート 説明 値の例
完全修飾名前空間 <CONNECTION_NAME_PREFIX>__fullyQualifiedNamespace 完全修飾 Event Hubs の名前空間。 myeventhubns.servicebus.windows.net

接続をカスタマイズするには、プロパティを追加設定します。 「ID ベース接続に共通のプロパティ」を参照してください。

注意

Azure App Configuration または Key Vault を使用してマネージド ID 接続の設定を指定する場合、__ の代わりに :/ などの有効なキー区切り記号を使用して、名前が正しく解決されるようにしなければなりません。

たとえば、「 <CONNECTION_NAME_PREFIX>:fullyQualifiedNamespace 」のように入力します。

Azure Functions サービスでホストされている場合、ID ベースの接続では、マネージド ID が使用されます。 ユーザー割り当て ID を credential および clientID プロパティで指定できますが、システム割り当て ID が既定で使用されます。 リソース ID を使用したユーザー割り当て ID の構成はサポートされていないことに注意してください。 ローカル開発などの他のコンテキストで実行する場合は、代わりに開発者 ID が使用されますが、カスタマイズすることもできます。 ID ベースの接続によるローカル開発に関するページをご覧ください。

ID にアクセス許可を付与する

使用されている ID が何であれ、目的のアクションを実行するためのアクセス許可が必要です。 ほとんどの Azure では、これはそれらのアクセス許可を提供する組み込みロールまたはカスタム ロールを使って、Azure RBAC でロールを割り当てる必要があることを意味します。

重要

すべてのコンテキストに必要ではない一部のアクセス許可がターゲット サービスによって公開される場合があります。 可能であれば、最小限の特権の原則に従い、必要な特権だけを ID に付与します。 たとえば、アプリがデータ ソースからの読み取りのみを行う必要がある場合は、読み取りアクセス許可のみを持つロールを使用します。 サービスへの書き込みも可能なロールを割り当てることは、読み取り操作に対するアクセス許可が過剰になるため、不適切です。 同様に、ロールの割り当てが、読み取る必要のあるリソースだけに限定されていることを確認する必要があります。

実行時にイベント ハブへのアクセスを提供するロールの割り当てを作成する必要があります。 ロールの割り当てのスコープは、Event Hubs の名前空間に対するもの、またはイベント ハブ自体になります。 所有者のような管理ロールでは十分ではありません。 次の表は、通常の操作で Event Hubs の拡張機能を使用するときに推奨される組み込みのロールを示しています。 アプリケーションでは、記述したコードに基づいて追加のアクセス許可が必要になる場合があります。

[バインドの種類] 組み込みロールの例
トリガー Azure Event Hubs データ受信者Azure Event Hubs データ所有者
出力バインド Azure Event Hubs データ送信者

host.json 設定

host.json ファイルには、Event Hubs トリガーの動作を制御する設定が含まれています。 使用可能な設定の詳細については、「host.json 設定」を参照してください。

次のステップ