Azure Functions の Azure Event Hubs トリガー
この記事では、Azure Functions で Azure Event Hubs トリガーを使用する方法について説明します。 Azure Functions は、Event Hubs のトリガーおよび出力バインドをサポートしています。
セットアップと構成の詳細については、概要に関するページをご覧ください。
関数トリガーを使用して、イベント ハブのイベント ストリームに送信されたイベントに応答します。 トリガーを設定するには、基になるイベント ハブへの読み取りアクセスが必要です。 関数がトリガーされると、その関数に渡されるメッセージが文字列として型指定されます。
例
次の例は、Event Hubs トリガーのメッセージ本文をログに記録する C# 関数を示しています。
[FunctionName("EventHubTriggerCSharp")]
public void Run([EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] string myEventHubMessage, ILogger log)
{
log.LogInformation($"C# function triggered to process a message: {myEventHubMessage}");
}
関数コードのイベント メタデータにアクセスするには、EventData オブジェクトにバインドします。 また、メソッド シグネチャ内のバインディング式を使用して、同じプロパティにアクセスすることもできます。 次の例は、同じデータを取得する 2 つの方法を示しています。
[FunctionName("EventHubTriggerCSharp")]
public void Run(
[EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] EventData myEventHubMessage,
DateTime enqueuedTimeUtc,
Int64 sequenceNumber,
string offset,
ILogger log)
{
log.LogInformation($"Event: {Encoding.UTF8.GetString(myEventHubMessage.Body)}");
// Metadata accessed by binding to EventData
log.LogInformation($"EnqueuedTimeUtc={myEventHubMessage.SystemProperties.EnqueuedTimeUtc}");
log.LogInformation($"SequenceNumber={myEventHubMessage.SystemProperties.SequenceNumber}");
log.LogInformation($"Offset={myEventHubMessage.SystemProperties.Offset}");
// Metadata accessed by using binding expressions in method parameters
log.LogInformation($"EnqueuedTimeUtc={enqueuedTimeUtc}");
log.LogInformation($"SequenceNumber={sequenceNumber}");
log.LogInformation($"Offset={offset}");
}
イベントを一括で受け取るには、string
または EventData
を配列にします。
Note
一括で受け取る場合、上記の例のように DateTime enqueuedTimeUtc
を使用してメソッド パラメーターをバインドすることはできないため、各 EventData
オブジェクトからパラメーターを受け取る必要があります。
[FunctionName("EventHubTriggerCSharp")]
public void Run([EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] EventData[] eventHubMessages, ILogger log)
{
foreach (var message in eventHubMessages)
{
log.LogInformation($"C# function triggered to process a message: {Encoding.UTF8.GetString(message.Body)}");
log.LogInformation($"EnqueuedTimeUtc={message.SystemProperties.EnqueuedTimeUtc}");
}
}
次の例は、function.json ファイルの Event Hubs トリガー バインドと、そのバインドが使用される JavaScript 関数を示しています。 この関数は、イベント メタデータを読み取り、メッセージをログに記録します。
function.json ファイルの Event Hubs バインディング データの例を次に示します。これは、Functions ランタイムのバージョン 1.x とそれより後のバージョンとでは異なります。
{
"type": "eventHubTrigger",
"name": "myEventHubMessage",
"direction": "in",
"eventHubName": "MyEventHub",
"connection": "myEventHubReadConnectionAppSetting"
}
JavaScript コードを次に示します。
module.exports = function (context, myEventHubMessage) {
context.log('Function triggered to process a message: ', myEventHubMessage);
context.log('EnqueuedTimeUtc =', context.bindingData.enqueuedTimeUtc);
context.log('SequenceNumber =', context.bindingData.sequenceNumber);
context.log('Offset =', context.bindingData.offset);
context.done();
};
イベントをまとめて受け取るには、次の例に示すように、cardinality
ファイルで cardinality
を many
に設定します。
{
"type": "eventHubTrigger",
"name": "eventHubMessages",
"direction": "in",
"eventHubName": "MyEventHub",
"cardinality": "many",
"connection": "myEventHubReadConnectionAppSetting"
}
JavaScript コードを次に示します。
module.exports = function (context, eventHubMessages) {
context.log(`JavaScript eventhub trigger function called for message array ${eventHubMessages}`);
eventHubMessages.forEach((message, index) => {
context.log(`Processed message ${message}`);
context.log(`EnqueuedTimeUtc = ${context.bindingData.enqueuedTimeUtcArray[index]}`);
context.log(`SequenceNumber = ${context.bindingData.sequenceNumberArray[index]}`);
context.log(`Offset = ${context.bindingData.offsetArray[index]}`);
});
context.done();
};
PowerShell コードを次に示します。
param($eventHubMessages, $TriggerMetadata)
Write-Host "PowerShell eventhub trigger function called for message array: $eventHubMessages"
$eventHubMessages | ForEach-Object { Write-Host "Processed message: $_" }
次の例は、function.json ファイルの Event Hubs トリガー バインドと、そのバインドが使用される Python 関数を示しています。 この関数は、イベント メタデータを読み取り、メッセージをログに記録します。
次の例は、function.json ファイル内の Event Hubs バインディング データを示しています。
{
"type": "eventHubTrigger",
"name": "event",
"direction": "in",
"eventHubName": "MyEventHub",
"connection": "myEventHubReadConnectionAppSetting"
}
Python コードを次に示します。
import logging
import azure.functions as func
def main(event: func.EventHubEvent):
logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
logging.info(f' EnqueuedTimeUtc = {event.enqueued_time}')
logging.info(f' SequenceNumber = {event.sequence_number}')
logging.info(f' Offset = {event.offset}')
# Metadata
for key in event.metadata:
logging.info(f'Metadata: {key} = {event.metadata[key]}')
次の例は、Event Hubs トリガーのメッセージ本文をログに記録する Event Hubs トリガー バインドを示しています。
@FunctionName("ehprocessor")
public void eventHubProcessor(
@EventHubTrigger(name = "msg",
eventHubName = "myeventhubname",
connection = "myconnvarname") String message,
final ExecutionContext context )
{
context.getLogger().info(message);
}
Java 関数ランタイム ライブラリで、その値がイベント ハブに由来するパラメーター上で EventHubTrigger
注釈を使用します。 これらの注釈を使用したパラメーターによって、イベントを受信したときに関数が実行されます。 この注釈は、Java のネイティブ型、POJO、または Optional<T>
を使用した null 許容値で使用できます。
次の例は、Date 階層である整形式の BlobOutput
パスを提供すると共に、イベントのさらなるイントロスペクションのための SystemProperties
およびその他のバインディング オプションの幅広い使用方法を示しています。
package com.example;
import java.util.Map;
import java.time.ZonedDateTime;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
/**
* Azure Functions with Event Hub trigger.
* and Blob Output using date in path along with message partition ID
* and message sequence number from EventHub Trigger Properties
*/
public class EventHubReceiver {
@FunctionName("EventHubReceiver")
@StorageAccount("bloboutput")
public void run(
@EventHubTrigger(name = "message",
eventHubName = "%eventhub%",
consumerGroup = "%consumergroup%",
connection = "eventhubconnection",
cardinality = Cardinality.ONE)
String message,
final ExecutionContext context,
@BindingName("Properties") Map<String, Object> properties,
@BindingName("SystemProperties") Map<String, Object> systemProperties,
@BindingName("PartitionContext") Map<String, Object> partitionContext,
@BindingName("EnqueuedTimeUtc") Object enqueuedTimeUtc,
@BlobOutput(
name = "outputItem",
path = "iotevents/{datetime:yy}/{datetime:MM}/{datetime:dd}/{datetime:HH}/" +
"{datetime:mm}/{PartitionContext.PartitionId}/{SystemProperties.SequenceNumber}.json")
OutputBinding<String> outputItem) {
var et = ZonedDateTime.parse(enqueuedTimeUtc + "Z"); // needed as the UTC time presented does not have a TZ
// indicator
context.getLogger().info("Event hub message received: " + message + ", properties: " + properties);
context.getLogger().info("Properties: " + properties);
context.getLogger().info("System Properties: " + systemProperties);
context.getLogger().info("partitionContext: " + partitionContext);
context.getLogger().info("EnqueuedTimeUtc: " + et);
outputItem.setValue(message);
}
}
属性
インプロセスと分離ワーカー プロセスの C# ライブラリはどちらも、属性を使用してトリガーを構成します。 代わりに、C# スクリプトでは、function.json 構成ファイルを使用します。
C# クラス ライブラリでは、次のプロパティをサポートする EventHubTriggerAttribute を使います。
パラメーター | 説明 |
---|---|
EventHubName | イベント ハブの名前。 イベント ハブの名前は接続文字列にも存在し、その値が実行時にこのプロパティをオーバーライドします。 %eventHubName% のようにアプリの設定で参照できます。 |
ConsumerGroup | ハブのイベントのサブスクライブに使用されるコンシューマー グループを設定する、省略可能なプロパティ。 省略した場合は、$Default コンシューマー グループが使用されます。 |
接続 | Event Hubs への接続方法を指定するアプリ設定または設定コレクションの名前。 詳細については、「接続」を参照してください |
注釈
Java 関数ランタイム ライブラリでは、EventHubTrigger 注釈を使用します。この注釈は次の設定をサポートしています。
構成
次の表は、function.json ファイルで設定するトリガー構成のプロパティを説明しています。これらは、ランタイムのバージョンによって異なります。
function.json のプロパティ | 説明 |
---|---|
type | eventHubTrigger に設定する必要があります。 このプロパティは、Azure Portal でトリガーを作成するときに自動で設定されます。 |
direction | in に設定する必要があります。 このプロパティは、Azure Portal でトリガーを作成するときに自動で設定されます。 |
name | 関数コード内のイベント項目を表す変数の名前。 |
eventHubName | イベント ハブの名前。 イベント ハブの名前は接続文字列にも存在し、その値が実行時にこのプロパティをオーバーライドします。 アプリ設定 を介して参照できます |
consumerGroup | ハブのイベントのサブスクライブに使用されるコンシューマー グループを設定する、省略可能なプロパティ。 省略した場合は、$Default コンシューマー グループが使用されます。 |
cardinality | バッチ処理を有効にするには many に設定します。 省略するか、one に設定した場合、1 つのメッセージが関数に渡されます。 |
connection | Event Hubs への接続方法を指定するアプリ設定または設定コレクションの名前。 「接続」を参照してください。 |
ローカルで開発する場合は、 コレクション内の local.settings.json ファイルにアプリケーション設定を追加します。
使用
Event Hubs トリガーと IoT Hub トリガーのスケールの詳細については、Event Hubs トリガーに関するページを参照してください。
Event Hubs 出力バインドでサポートされるパラメーター型は、Functions ランタイムのバージョン、拡張機能パッケージのバージョン、および使用される C# のモダリティによって異なります。
インプロセス C# クラス ライブラリ関数では、次の型がサポートされています。
- Azure.Messaging.EventHubs.EventData
- String
- Byte array
- プレーンオールド CLR オブジェクト (POCO)
このバージョンの EventData では、EventBody を優先し、レガシ Body
型がサポートされなくなりました。
パラメーターの型は、次のいずれかになります。
- Int、String、byte[] などのネイティブ Java 型。
- Optional を使用する Null 許容値。
- 任意の POJO 型。
詳細については、EventHubTrigger リファレンスを参照してください。
イベント メタデータ
Event Hubs トリガーには、いくつかのメタデータ プロパティがあります。 メタデータ プロパティは、他のバインドのバインド式の一部として、またはコードのパラメーターとして使用できます。 これらのプロパティは、EventData クラスに由来します。
プロパティ | Type | 説明 |
---|---|---|
PartitionContext |
PartitionContext | PartitionContext のインスタンスです。 |
EnqueuedTimeUtc |
DateTime |
エンキューされた時刻 (UTC)。 |
Offset |
string |
イベント ハブ パーティション ストリームを基準としたデータのオフセット。 オフセットは、Event Hubs ストリーム内のイベントのマーカーまたは識別子です。 この識別子は、Event Hubs ストリームのパーティション内で一意です。 |
PartitionKey |
string |
イベント データを送信するパーティション。 |
Properties |
IDictionary<String,Object> |
イベント データのユーザー プロパティ。 |
SequenceNumber |
Int64 |
イベントの論理シーケンス番号。 |
SystemProperties |
IDictionary<String,Object> |
イベント データなどのシステム プロパティ。 |
この記事の前半でこれらのプロパティを使用しているコード例を参照してください。
接続
connection
プロパティは、アプリを Event Hubs に接続する方法を指定する環境構成への参照です。 次が指定されている場合があります。
構成された値が、1 つの設定に完全一致し、プレフィックスがその他の設定とも一致する場合は、完全一致が使用されます。
接続文字列
この接続文字列を取得するには、イベント ハブ自体ではなく、"名前空間" の [接続情報] をクリックします。 接続文字列は、イベントハブ自体ではなく、Event Hubs 名前空間のものである必要があります。
トリガーに使用する場合、接続文字列には、機能を有効にするために少なくとも "読み取り" アクセス許可が必要です。 出力バインディングに使用する場合、接続文字列には、イベント ストリームにメッセージを送信するための "送信" アクセス許可が必要です。
この接続文字列は、バインディング構成の connection
プロパティで指定した値と同じ名前のアプリケーション設定に格納する必要があります。
ID ベースの接続
バージョン 5.x 以上の拡張機能を使用する場合は、シークレットを含む接続文字列の代わりに、アプリで Azure Active Directory ID を使用することができます。 これを行うには、トリガーおよびバインド構成の connection
プロパティにマップされる共通のプレフィックスに設定を定義します。
このモードでは、拡張機能に次のプロパティが必要です。
プロパティ | 環境変数テンプレート | 説明 | 値の例 |
---|---|---|---|
完全修飾名前空間 | <CONNECTION_NAME_PREFIX>__fullyQualifiedNamespace |
完全修飾 Event Hubs の名前空間。 | <event_hubs_namespace>.servicebus.windows.net |
接続をカスタマイズするには、プロパティを追加設定します。 「ID ベース接続に共通のプロパティ」を参照してください。
注意
Azure App Configuration または Key Vault を使用してマネージド ID 接続の設定を指定する場合、__
の代わりに :
や /
などの有効なキー区切り記号を使用して、名前が正しく解決されるようにしなければなりません。
たとえば、「 <CONNECTION_NAME_PREFIX>:fullyQualifiedNamespace
」のように入力します。
Azure Functions サービスでホストされている場合、ID ベースの接続では、マネージド ID が使用されます。 ユーザー割り当て ID を credential
および clientID
プロパティで指定できますが、システム割り当て ID が既定で使用されます。 リソース ID を使用したユーザー割り当て ID の構成はサポートされていないことに注意してください。 ローカル開発などの他のコンテキストで実行する場合は、代わりに開発者 ID が使用されますが、カスタマイズすることもできます。 ID ベースの接続によるローカル開発に関するページをご覧ください。
ID にアクセス許可を付与する
使用されている ID が何であれ、目的のアクションを実行するためのアクセス許可が必要です。 ほとんどの Azure では、これはそれらのアクセス許可を提供する組み込みロールまたはカスタム ロールを使って、Azure RBAC でロールを割り当てる必要があることを意味します。
重要
すべてのコンテキストに必要ではない一部のアクセス許可がターゲット サービスによって公開される場合があります。 可能であれば、最小限の特権の原則に従い、必要な特権だけを ID に付与します。 たとえば、アプリがデータ ソースからの読み取りのみを行う必要がある場合は、読み取りアクセス許可のみを持つロールを使用します。 サービスへの書き込みも可能なロールを割り当てることは、読み取り操作に対するアクセス許可が過剰になるため、不適切です。 同様に、ロールの割り当てが、読み取る必要のあるリソースだけに限定されていることを確認する必要があります。
実行時にイベント ハブへのアクセスを提供するロールの割り当てを作成する必要があります。 ロールの割り当てのスコープは、Event Hubs の名前空間に対するもの、またはイベント ハブ自体になります。 所有者のような管理ロールでは十分ではありません。 次の表は、通常の操作で Event Hubs の拡張機能を使用するときに推奨される組み込みのロールを示しています。 アプリケーションでは、記述したコードに基づいて追加のアクセス許可が必要になる場合があります。
[バインドの種類] | 組み込みロールの例 |
---|---|
トリガー | Azure Event Hubs データ受信者、Azure Event Hubs データ所有者 |
出力バインド | Azure Event Hubs データ送信者 |
host.json 設定
host.json ファイルには、Event Hubs トリガーの動作を制御する設定が含まれています。 使用可能な設定の詳細については、「host.json 設定」を参照してください。