Azure Functions の Azure Event Hubs トリガー

この記事では、Azure Functions で Azure Event Hubs トリガーを使用する方法について説明します。 Azure Functions は、Event Hubs のトリガーおよび出力バインドをサポートしています。

セットアップと構成の詳細については、概要に関するページをご覧ください。

関数トリガーを使用して、イベント ハブのイベント ストリームに送信されたイベントに応答します。 トリガーを設定するには、基になるイベント ハブへの読み取りアクセスが必要です。 関数がトリガーされると、その関数に渡されるメッセージが文字列として型指定されます。

次の例は、Event Hubs トリガーのメッセージ本文をログに記録する C# 関数を示しています。

[FunctionName("EventHubTriggerCSharp")]
public void Run([EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] string myEventHubMessage, ILogger log)
{
    log.LogInformation($"C# function triggered to process a message: {myEventHubMessage}");
}

関数コードのイベント メタデータにアクセスするには、EventData オブジェクトにバインドします。 また、メソッド シグネチャ内のバインディング式を使用して、同じプロパティにアクセスすることもできます。 次の例は、同じデータを取得する 2 つの方法を示しています。

[FunctionName("EventHubTriggerCSharp")]
public void Run(
    [EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] EventData myEventHubMessage,
    DateTime enqueuedTimeUtc,
    Int64 sequenceNumber,
    string offset,
    ILogger log)
{
    log.LogInformation($"Event: {Encoding.UTF8.GetString(myEventHubMessage.Body)}");
    // Metadata accessed by binding to EventData
    log.LogInformation($"EnqueuedTimeUtc={myEventHubMessage.SystemProperties.EnqueuedTimeUtc}");
    log.LogInformation($"SequenceNumber={myEventHubMessage.SystemProperties.SequenceNumber}");
    log.LogInformation($"Offset={myEventHubMessage.SystemProperties.Offset}");
    // Metadata accessed by using binding expressions in method parameters
    log.LogInformation($"EnqueuedTimeUtc={enqueuedTimeUtc}");
    log.LogInformation($"SequenceNumber={sequenceNumber}");
    log.LogInformation($"Offset={offset}");
}

イベントを一括で受け取るには、string または EventData を配列にします。

Note

一括で受け取る場合、上記の例のように DateTime enqueuedTimeUtc を使用してメソッド パラメーターをバインドすることはできないため、各 EventData オブジェクトからパラメーターを受け取る必要があります。

[FunctionName("EventHubTriggerCSharp")]
public void Run([EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] EventData[] eventHubMessages, ILogger log)
{
    foreach (var message in eventHubMessages)
    {
        log.LogInformation($"C# function triggered to process a message: {Encoding.UTF8.GetString(message.Body)}");
        log.LogInformation($"EnqueuedTimeUtc={message.SystemProperties.EnqueuedTimeUtc}");
    }
}

次の例は、function.json ファイルの Event Hubs トリガー バインドと、そのバインドが使用される JavaScript 関数を示しています。 この関数は、イベント メタデータを読み取り、メッセージをログに記録します。

function.json ファイルの Event Hubs バインディング データの例を次に示します。これは、Functions ランタイムのバージョン 1.x とそれより後のバージョンとでは異なります。

{
  "type": "eventHubTrigger",
  "name": "myEventHubMessage",
  "direction": "in",
  "eventHubName": "MyEventHub",
  "connection": "myEventHubReadConnectionAppSetting"
}

JavaScript コードを次に示します。

module.exports = function (context, myEventHubMessage) {
    context.log('Function triggered to process a message: ', myEventHubMessage);
    context.log('EnqueuedTimeUtc =', context.bindingData.enqueuedTimeUtc);
    context.log('SequenceNumber =', context.bindingData.sequenceNumber);
    context.log('Offset =', context.bindingData.offset);

    context.done();
};

イベントをまとめて受け取るには、次の例に示すように、cardinality ファイルで cardinalitymany に設定します。

{
  "type": "eventHubTrigger",
  "name": "eventHubMessages",
  "direction": "in",
  "eventHubName": "MyEventHub",
  "cardinality": "many",
  "connection": "myEventHubReadConnectionAppSetting"
}

JavaScript コードを次に示します。

module.exports = function (context, eventHubMessages) {
    context.log(`JavaScript eventhub trigger function called for message array ${eventHubMessages}`);

    eventHubMessages.forEach((message, index) => {
        context.log(`Processed message ${message}`);
        context.log(`EnqueuedTimeUtc = ${context.bindingData.enqueuedTimeUtcArray[index]}`);
        context.log(`SequenceNumber = ${context.bindingData.sequenceNumberArray[index]}`);
        context.log(`Offset = ${context.bindingData.offsetArray[index]}`);
    });

    context.done();
};

PowerShell の完全な例は保留中です。

次の例は、function.json ファイルの Event Hubs トリガー バインドと、そのバインドが使用される Python 関数を示しています。 この関数は、イベント メタデータを読み取り、メッセージをログに記録します。

次の例は、function.json ファイル内の Event Hubs バインディング データを示しています。

{
  "type": "eventHubTrigger",
  "name": "event",
  "direction": "in",
  "eventHubName": "MyEventHub",
  "connection": "myEventHubReadConnectionAppSetting"
}

Python コードを次に示します。

import logging
import azure.functions as func


def main(event: func.EventHubEvent):
    logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
    logging.info(f'  EnqueuedTimeUtc = {event.enqueued_time}')
    logging.info(f'  SequenceNumber = {event.sequence_number}')
    logging.info(f'  Offset = {event.offset}')

    # Metadata
    for key in event.metadata:
        logging.info(f'Metadata: {key} = {event.metadata[key]}')

次の例は、Event Hubs トリガーのメッセージ本文をログに記録する Event Hubs トリガー バインドを示しています。

@FunctionName("ehprocessor")
public void eventHubProcessor(
  @EventHubTrigger(name = "msg",
                  eventHubName = "myeventhubname",
                  connection = "myconnvarname") String message,
       final ExecutionContext context )
       {
          context.getLogger().info(message);
 }

Java 関数ランタイム ライブラリで、その値がイベント ハブに由来するパラメーター上で EventHubTrigger 注釈を使用します。 これらの注釈を使用したパラメーターによって、イベントを受信したときに関数が実行されます。 この注釈は、Java のネイティブ型、POJO、または Optional<T> を使用した null 許容値で使用できます。

次の例は、Date 階層である整形式の BlobOutput パスを提供すると共に、イベントのさらなるイントロスペクションのための SystemProperties およびその他のバインディング オプションの幅広い使用方法を示しています。

package com.example;
import java.util.Map;
import java.time.ZonedDateTime;

import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;

/**
 * Azure Functions with Event Hub trigger.
 * and Blob Output using date in path along with message partition ID
 * and message sequence number from EventHub Trigger Properties
 */
public class EventHubReceiver {

    @FunctionName("EventHubReceiver")
    @StorageAccount("bloboutput")

    public void run(
            @EventHubTrigger(name = "message",
                eventHubName = "%eventhub%",
                consumerGroup = "%consumergroup%",
                connection = "eventhubconnection",
                cardinality = Cardinality.ONE)
            String message,

            final ExecutionContext context,

            @BindingName("Properties") Map<String, Object> properties,
            @BindingName("SystemProperties") Map<String, Object> systemProperties,
            @BindingName("PartitionContext") Map<String, Object> partitionContext,
            @BindingName("EnqueuedTimeUtc") Object enqueuedTimeUtc,

            @BlobOutput(
                name = "outputItem",
                path = "iotevents/{datetime:yy}/{datetime:MM}/{datetime:dd}/{datetime:HH}/" +
                       "{datetime:mm}/{PartitionContext.PartitionId}/{SystemProperties.SequenceNumber}.json")
            OutputBinding<String> outputItem) {

        var et = ZonedDateTime.parse(enqueuedTimeUtc + "Z"); // needed as the UTC time presented does not have a TZ
                                                             // indicator
        context.getLogger().info("Event hub message received: " + message + ", properties: " + properties);
        context.getLogger().info("Properties: " + properties);
        context.getLogger().info("System Properties: " + systemProperties);
        context.getLogger().info("partitionContext: " + partitionContext);
        context.getLogger().info("EnqueuedTimeUtc: " + et);

        outputItem.setValue(message);
    }
}

属性

インプロセス分離プロセスの C# ライブラリはどちらも、属性を使用してトリガーを構成します。 代わりに、C# スクリプトでは、function.json 構成ファイルを使用します。

C# クラス ライブラリでは、次のプロパティをサポートする EventHubTriggerAttribute を使います。

パラメーター 説明
EventHubName イベント ハブの名前。 イベント ハブの名前は接続文字列にも存在し、その値が実行時にこのプロパティをオーバーライドします。 %eventHubName% のようにアプリの設定で参照できます。
ConsumerGroup ハブのイベントのサブスクライブに使用されるコンシューマー グループを設定する、省略可能なプロパティ。 省略した場合は、$Default コンシューマー グループが使用されます。
接続 Event Hubs への接続方法を指定するアプリ設定または設定コレクションの名前。 詳細については、「接続」を参照してください

注釈

Java 関数ランタイム ライブラリでは、EventHubTrigger 注釈を使用します。この注釈は次の設定をサポートしています。

構成

次の表は、function.json ファイルで設定するトリガー構成のプロパティを説明しています。これらは、ランタイムのバージョンによって異なります。

function.json のプロパティ 説明
type eventHubTrigger に設定する必要があります。 このプロパティは、Azure Portal でトリガーを作成するときに自動で設定されます。
direction in に設定する必要があります。 このプロパティは、Azure Portal でトリガーを作成するときに自動で設定されます。
name 関数コード内のイベント項目を表す変数の名前。
eventHubName イベント ハブの名前。 イベント ハブの名前は接続文字列にも存在し、その値が実行時にこのプロパティをオーバーライドします。 アプリ設定 を介して参照できます
consumerGroup ハブのイベントのサブスクライブに使用されるコンシューマー グループを設定する、省略可能なプロパティ。 省略した場合は、$Default コンシューマー グループが使用されます。
cardinality バッチ処理を有効にするには many に設定します。 省略するか、one に設定した場合、1 つのメッセージが関数に渡されます。
connection Event Hubs への接続方法を指定するアプリ設定または設定コレクションの名前。 「接続」を参照してください。

ローカルで開発する場合は、 コレクション内の local.settings.json ファイルにアプリケーション設定を追加します。

使用

Event Hubs トリガーと IoT Hub トリガーのスケールの詳細については、Event Hubs トリガーに関するページを参照してください。

Event Hubs 出力バインドでサポートされるパラメーター型は、Functions ランタイムのバージョン、拡張機能パッケージのバージョン、および使用される C# のモダリティによって異なります。

インプロセス C# クラス ライブラリ関数では、次の型がサポートされています。

このバージョンの EventData では、EventBody を優先し、レガシ Body 型がサポートされなくなりました。

パラメーターの型は、次のいずれかになります。

  • Int、String、byte[] などのネイティブ Java 型。
  • Optional を使用する Null 許容値。
  • 任意の POJO 型。

詳細については、EventHubTrigger リファレンスを参照してください。

イベント メタデータ

Event Hubs トリガーには、いくつかのメタデータ プロパティがあります。 メタデータ プロパティは、他のバインドのバインド式の一部として、またはコードのパラメーターとして使用できます。 これらのプロパティは、EventData クラスに由来します。

プロパティ Type 説明
PartitionContext PartitionContext PartitionContext のインスタンスです。
EnqueuedTimeUtc DateTime エンキューされた時刻 (UTC)。
Offset string イベント ハブ パーティション ストリームを基準としたデータのオフセット。 オフセットは、Event Hubs ストリーム内のイベントのマーカーまたは識別子です。 この識別子は、Event Hubs ストリームのパーティション内で一意です。
PartitionKey string イベント データを送信するパーティション。
Properties IDictionary<String,Object> イベント データのユーザー プロパティ。
SequenceNumber Int64 イベントの論理シーケンス番号。
SystemProperties IDictionary<String,Object> イベント データなどのシステム プロパティ。

この記事の前半でこれらのプロパティを使用しているコード例を参照してください。

接続

connection プロパティは、アプリを Event Hubs に接続する方法を指定する環境構成への参照です。 次が指定されている場合があります。

  • 接続文字列を含むアプリケーション設定の名前
  • まとめて ID ベースの接続を定義する、複数のアプリケーション設定の共有プレフィックスの名前。

構成された値が、1 つの設定に完全一致し、プレフィックスがその他の設定とも一致する場合は、完全一致が使用されます。

接続文字列

この接続文字列を取得するには、イベント ハブ自体ではなく、"名前空間" の [接続情報] をクリックします。 接続文字列は、イベントハブ自体ではなく、Event Hubs 名前空間のものである必要があります。

トリガーに使用する場合、接続文字列には、機能を有効にするために少なくとも "読み取り" アクセス許可が必要です。 出力バインディングに使用する場合、接続文字列には、イベント ストリームにメッセージを送信するための "送信" アクセス許可が必要です。

この接続文字列は、バインディング構成の connection プロパティで指定した値と同じ名前のアプリケーション設定に格納する必要があります。

ID ベースの接続

バージョン 5.x 以上の拡張機能を使用する場合は、シークレットを含む接続文字列の代わりに、アプリで Azure Active Directory ID を使用することができます。 これを行うには、トリガーおよびバインド構成の connection プロパティにマップされる共通のプレフィックスに設定を定義します。

このモードでは、拡張機能に次のプロパティが必要です。

Note

現在、従量課金プランで使用するには、指定する環境変数に AzureWebJobs というプレフィックスを付ける必要があります。 Premium プランの場合、このプレフィックスは必要ありません。

プロパティ 環境変数テンプレート 説明 値の例
完全修飾名前空間 AzureWebJobs<CONNECTION_NAME_PREFIX>__fullyQualifiedNamespace 完全修飾 Event Hubs の名前空間。 <event_hubs_namespace>.servicebus.windows.net

接続をカスタマイズするには、プロパティを追加設定します。 「ID ベース接続に共通のプロパティ」を参照してください。

注意

Azure App Configuration または Key Vault を使用してマネージド ID 接続の設定を指定する場合、__ の代わりに :/ などの有効なキー区切り記号を使用して、名前が正しく解決されるようにしなければなりません。

たとえば、「 <CONNECTION_NAME_PREFIX>:fullyQualifiedNamespace 」のように入力します。

Azure Functions サービスでホストされている場合、ID ベースの接続では、マネージド ID が使用されます。 ユーザー割り当て ID を credential および clientID プロパティで指定できますが、システム割り当て ID が既定で使用されます。 リソース ID を使用したユーザー割り当て ID の構成はサポートされていないことに注意してください。 ローカル開発などの他のコンテキストで実行する場合は、代わりに開発者 ID が使用されますが、カスタマイズすることもできます。 ID ベースの接続によるローカル開発に関するページをご覧ください。

ID にアクセス許可を付与する

使用されている ID が何であれ、目的のアクションを実行するためのアクセス許可が必要です。 これらのアクセス許可を提供する組み込みロールまたはカスタム ロールを使用して、Azure RBAC でロールを割り当てる必要があります。

重要

すべてのコンテキストに必要ではない一部のアクセス許可がターゲット サービスによって公開される場合があります。 可能であれば、最小限の特権の原則に従い、必要な特権だけを ID に付与します。 たとえば、アプリがデータ ソースからの読み取りのみを行う必要がある場合は、読み取りアクセス許可のみを持つロールを使用します。 サービスへの書き込みも可能なロールを割り当てることは、読み取り操作に対するアクセス許可が過剰になるため、不適切です。 同様に、ロールの割り当てが、読み取る必要のあるリソースだけに限定されていることを確認する必要があります。

実行時にイベント ハブへのアクセスを提供するロールの割り当てを作成する必要があります。 ロールの割り当てのスコープは、イベント ハブ自体ではなく、Event Hubs 名前空間に対するものである必要があります。 所有者のような管理ロールでは十分ではありません。 次の表は、通常の操作で Event Hubs の拡張機能を使用するときに推奨される組み込みのロールを示しています。 アプリケーションでは、記述したコードに基づいて追加のアクセス許可が必要になる場合があります。

[バインドの種類] 組み込みロールの例
トリガー Azure Event Hubs データ受信者Azure Event Hubs データ所有者
出力バインド Azure Event Hubs データ送信者

host.json 設定

host.json ファイルには、Event Hubs トリガーの動作を制御する設定が含まれています。 使用可能な設定の詳細については、「host.json 設定」を参照してください。

次のステップ