Azure Functions における Azure Event Hubs のトリガーとバインド
この記事では、Azure Functions で Azure Event Hubs のバインドを使用する方法について説明します。 Azure Functions は、イベント ハブのトリガーおよび出力バインドをサポートしています。
アクション | Type |
---|---|
Event Hubs のイベント ストリームに送信されたイベントに応答する | トリガー |
イベント ストリームにイベントを書き込む | 出力バインド |
拡張機能のインストール
インストールする拡張機能 NuGet パッケージは、関数アプリで使用している C# モードによって異なります。
関数は分離された C# ワーカー プロセスで実行されます。 詳しくは、「分離ワーカー プロセスにおける C# Azure Functions の実行のガイド」をご覧ください。
拡張機能の機能性は、拡張機能のバージョンによって異なります。
このバージョンでは、シークレットではなく ID を使用して接続する機能が導入されています。 マネージド ID を使用して関数アプリを構成するチュートリアルについては、ID ベースの接続を使用した関数アプリの作成に関 するチュートリアルを参照してください。
NuGet パッケージ バージョン 5.x をインストールすることによって、プロジェクトに拡張機能を追加します。
バンドルのインストール
Event Hubs 拡張機能は、host.json プロジェクト ファイルで指定される拡張機能バンドルの一部です。 このバンドルは、バインドのバージョンを変更するために、またはバンドルがまだインストールされていない場合に変更が必要になることがあります。 詳細については、「拡張機能のバンドル」を参照してください。
このバージョンでは、シークレットではなく ID を使用して接続する機能が導入されています。 マネージド ID を使用して関数アプリを構成するチュートリアルについては、ID ベースの接続を使用した関数アプリの作成に関 するチュートリアルを参照してください。
このバージョンの拡張機能は、host.json
ファイルで次のコードを追加するか、または置き換えることによって、プレビュー拡張機能バンドル v3 から追加できます。
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[3.3.0, 4.0.0)"
}
}
詳細については、ユーザーの更新に関するページを参照してください。
バインドの種類
.NET でサポートされるバインドの種類は、拡張機能のバージョンと C# 実行モードの両方によって異なります。これは次のいずれかになります。
分離ワーカー プロセス クラス ライブラリでコンパイルされた C# 関数は、ランタイムから分離されたプロセスで実行されます。
バージョンを選択すると、モードとバージョンのバインドの種類の詳細が表示されます。
分離ワーカー プロセスは、以下の表に従ってパラメーターの型をサポートします。 Azure.Messaging.EventHubs の型へのバインドのサポートはプレビュー段階です。
Event Hubs トリガー
関数で 1 つのイベントを処理するとき、Event Hubs トリガーは次の型にバインドできます。
Type | 説明 |
---|---|
string |
イベントを表す文字列。 イベントが単純なテキストのときに使用します。 |
byte[] |
イベントのバイト数。 |
JSON シリアル化可能な型 | イベントに JSON データが含まれている場合、Functions は JSON データを単純な従来の CLR オブジェクト (POCO) 型に逆シリアル化しようとします。 |
Azure.Messaging.EventHubs.EventData1 | イベント オブジェクト。 古いバージョンの Event Hubs SDK から移行する場合、このバージョンでは EventBody を優先してレガシの Body 型のサポートが削除されることに注意してください。 |
関数でイベントのバッチを処理するとき、Event Hubs トリガーは次の型にバインドできます。
Type | 説明 |
---|---|
string[] |
バッチのイベントの配列を表す文字列。 各エントリは 1 つのイベントを表します。 |
EventData[] 1 |
バッチのイベントの配列を表す、Azure.Messaging.EventHubs.EventData のインスタンス。 各エントリは 1 つのイベントを表します。 |
T[] (T は JSON シリアル化可能な型1) |
バッチのイベントの配列を表す、カスタム POCO 型のインスタンス。 各エントリは 1 つのイベントを表します。 |
1 これらの型を使用するには、Microsoft.Azure.Functions.Worker.Extensions.EventHubs 5.5.0 以降と SDK 型バインドの一般的な依存関係を参照する必要があります。
イベント ハブ出力バインド
関数で 1 つのイベントを書き込むとき、Event Hubs 出力バインドは次の型にバインドできます。
Type | 説明 |
---|---|
string |
イベントを表す文字列。 イベントが単純なテキストのときに使用します。 |
byte[] |
イベントのバイト数。 |
JSON シリアル化可能な型 | イベントを表すオブジェクト。 Functions は、単純な従来の CLR オブジェクト (POCO) 型を JSON データにシリアル化しようとします。 |
関数で複数のイベントを書き込むとき、Event Hubs 出力バインドは次の型にバインドできます。
Type | 説明 |
---|---|
T[] (T は単一のイベントの種類の 1 つ) |
複数のイベントを含む配列。 各エントリは 1 つのイベントを表します。 |
その他の出力シナリオでは、Microsoft.Azure.EventHubs から直接型を作成して使用します。
host.json 設定
host.json ファイルには、Event Hubs トリガーの動作を制御する設定が含まれています。 構成は、拡張機能のバージョンによって異なります。
{
"version": "2.0",
"extensions": {
"eventHubs": {
"maxEventBatchSize" : 100,
"minEventBatchSize" : 25,
"maxWaitTime" : "00:05:00",
"batchCheckpointFrequency" : 1,
"prefetchCount" : 300,
"transportType" : "amqpWebSockets",
"webProxy" : "https://proxyserver:8080",
"customEndpointAddress" : "amqps://company.gateway.local",
"targetUnprocessedEventThreshold" : 75,
"initialOffsetOptions" : {
"type" : "fromStart",
"enqueuedTimeUtc" : ""
},
"clientRetryOptions":{
"mode" : "exponential",
"tryTimeout" : "00:01:00",
"delay" : "00:00:00.80",
"maximumDelay" : "00:01:00",
"maximumRetries" : 3
}
}
}
}
プロパティ | Default | 説明 |
---|---|---|
maxEventBatchSize2 | 100 | 単一の呼び出しでバッチに含まれるイベントの最大数。 1 以上を指定してください。 |
minEventBatchSize1 | 1 | バッチで必要な最小イベント数。 最小値は、関数が複数のイベントを受信していて、maxEventBatchSize より小さくする必要がある場合にのみ適用されます。最小サイズは厳密には保証されません。 部分的なバッチは、 maxWaitTime が経過する前に完全なバッチを準備できない場合にディスパッチされます。 部分的なバッチは、スケーリングが行われた後の関数の最初の呼び出しでも行われる可能性があります。 |
maxWaitTime1 | 00:01:00 | 関数を呼び出す前に、トリガーがバッチの入力を待機する最大間隔。 待機時間は、minEventBatchSize が 1 より大きい場合にのみ考慮され、それ以外の場合は無視されます。 待機時間が経過する前に使用可能イベントが minEventBatchSize 未満の場合、関数は部分的なバッチで呼び出されます。 許容される最長待機時間は 10 分です。注: この間隔は、関数が呼び出される正確なタイミングを厳密に保証するものではありません。 タイマーの精度により、わずかな誤差が生じます。 スケーリングが行われると、部分的なバッチでの最初の呼び出しがより迅速に行われるか、構成された待機時間の最大 2 倍かかる場合があります。 |
batchCheckpointFrequency | 1 | イベント ハブのチェックポイントを作成する前に処理するバッチの数。 |
prefetchCount | 300 | Event Hubs から要求され、ローカル キャッシュに保持されるイベントの数。ネットワーク操作の待機を避けるために、読み取りを許可します |
transportType | amqpTcp | Event Hubs との通信に使用されるプロトコルとトランスポート。 使用可能なオプション: amqpTcp 、amqpWebSockets |
webProxy | null | Web ソケット経由の Event Hubs との通信に使用されるプロキシ。 プロキシを amqpTcp トランスポートと併用することはできません。 |
customEndpointAddress | null | Event Hubs への接続を確立するときに使用するアドレス。アプリケーション ゲートウェイまたはホスト環境に必要なその他のパス経由で、ネットワーク要求をルーティングすることができます。 カスタム エンドポイントのアドレスを使用する場合は、イベント ハブの完全修飾名前空間が引き続き必要であり、明示的に、または接続文字列を使用して指定する必要があります。 |
targetUnprocessedEventThreshold1 | null | 関数インスタンスごとに必要な未処理のイベントの数。 このしきい値は、ターゲット ベースのスケーリングで使用され、maxEventBatchSize オプションから推定される既定のスケーリングしきい値をオーバーライドします。 設定すると、未処理のイベント数の合計がこの値で除算され、必要な関数インスタンスの数が決定されます。 インスタンス数は、バランスの取れたパーティション分散を作成する数値に切り上げられます。 |
initialOffsetOptions/type | fromStart | チェックポイントがストレージに存在しない場合に処理を開始するイベント ストリーム内の位置。 すべてのパーティションに適用されます。 詳細については、OffsetType のドキュメントに関する記事を参照してください。 使用可能なオプション: fromStart 、fromEnd 、fromEnqueuedTime |
initialOffsetOptions/enqueuedTimeUtc | null | ストリームにエンキューされたイベントの処理を開始する時刻を指定します。 initialOffsetOptions/type が fromEnqueuedTime として構成されている場合、この設定は必須です。 DateTime.Parse() でサポートされている形式の時刻がサポートされます (2020-10-26T20:31Z など)。 明確にするためには、タイムゾーンも指定する必要があります。 タイムゾーンが指定されなかった場合は、関数アプリを実行しているマシンのローカル タイムゾーン (Azure で実行されている場合は UTC) が使用されます。 |
clientRetryOptions/mode | exponential | 再試行の遅延を計算するために使用する方法です。 指数モードでは、バックオフ戦略に基づいて再試行が行われます。この場合、再試行のたびに待機する期間が長くなります。 固定モードでは、一定の間隔で再試行が行われ、遅延の時間はそれぞれ一定です。 使用可能なオプション: exponential 、fixed |
clientRetryOptions/tryTimeout | 00:01:00 | 試行ごとに Event Hubs 操作の完了を待機する最大期間です。 |
clientRetryOptions/delay | 00:00:00.80 | 再試行の間に適用する遅延またはバックオフ係数です。 |
clientRetryOptions/maximumDelay | 00:00:01 | 再試行の間の許容される最大の遅延です。 |
clientRetryOptions/maximumRetries | 3 | 関連する操作が失敗したと判断するまでの再試行の最大回数です。 |
1minEventBatchSize
と maxWaitTime
を使用するには、v5.3.0 以降のバージョンの Microsoft.Azure.WebJobs.Extensions.EventHubs
パッケージが必要です。
2 Microsoft.Azure.WebJobs.Extensions.EventHubs
パッケージの v6.0.0 で既定のmaxEventBatchSize
が変更されました。 以前のバージョンでは、これは 10 でした。
clientRetryOptions
は、Functions ホストと Event Hubs の間の操作 (イベントのフェッチやイベントの送信など) を再試行するために使用されます。 個々の関数に再試行ポリシーを適用する方法については、Azure Functions のエラー処理と再試行に関するガイダンスを参照してください。
Azure Functions 2.x 以降の host.json のリファレンスについては、Azure Functions の host.json のリファレンスに関する記事を参照してください。