次の方法で共有


Azure Functions における Apache Kafka バインドの概要

Azure Functions 用の Kafka 拡張機能を使用すると、出力バインドを使用して Apache Kafka トピックに値を書き込めます。 トリガーを使用して、Kafka トピックのメッセージに応答して関数を呼び出すこともできます。

重要

Kafka バインドは、エラスティック Premium プランおよび 専用 (App Service) プランの Functions でのみ使用できます。 これらは、バージョン 3.x 以降のバージョンの Functions ランタイムでのみサポートされます。

アクション タイプ
新しい Kafka イベントに基づいて関数を実行します。 トリガー
Kafka イベント ストリームに書き込みます。 出力バインド

拡張機能のインストール

インストールする拡張機能 NuGet パッケージは、関数アプリで使用している C# モードによって異なります。

関数は分離された C# ワーカー プロセスで実行されます。 詳しくは、「分離ワーカー プロセスにおける C# Azure Functions の実行のガイド」をご覧ください。

この NuGet パッケージをインストールすることによって、プロジェクトに拡張機能を追加します。

バンドルのインストール

アプリでこのバインド拡張機能を使用できるようにするには、プロジェクトのルートにある host.json ファイルに次の extensionBundle 参照が含まれていることを確認します。

{
    "version": "2.0",
    "extensionBundle": {
        "id": "Microsoft.Azure.Functions.ExtensionBundle",
        "version": "[4.0.0, 5.0.0)"
    }
}

この例では、version[4.0.0, 5.0.0)値は、少なくとも4.0.05.0.0未満のバンドル バージョン (4.x のすべての潜在的なバージョンを含む) を使用するように Functions ホストに指示します。 この表記は、v4.x 拡張機能バンドルの利用可能な最新のマイナー バージョンでアプリを効果的に維持します。

可能であれば、最新の拡張機能バンドルメジャー バージョンを使用し、ランタイムが最新のマイナー バージョンを自動的に維持できるようにする必要があります。 最新のバンドルの内容は、 拡張機能バンドルのリリース ページで確認できます。 詳細については、 Azure Functions 拡張機能バンドルに関するページを参照してください。

ランタイム スケールを有効にする

Kafka のトリガーとバインドを使用するときに関数が Premium プランで適切にスケーリングできるようにするには、ランタイム スケールの監視を有効にする必要があります。

  1. Azure portal の関数アプリで、[ 構成] を選択します。

  2. [ 関数ランタイム設定 ] タブの [ ランタイム スケール監視] で、[オン] を選択します。

    ランタイム スケーリングを有効にするための Azure portal 領域のスクリーンショット。

host.json 設定

このセクションでは、バージョン 3.x 以降でこのバインドに使用可能な構成設定について説明します。 host.json ファイルの設定は、関数アプリ インスタンスのすべての関数に適用されます。 バージョン 3.x 以降のバージョンでの関数アプリ構成設定の詳細については、Azure Functions の host.json のリファレンスに関するページを参照してください。

{
    "version": "2.0",
    "extensions": {
        "kafka": {
            "maxBatchSize": 64,
            "SubscriberIntervalInSeconds": 1,
            "ExecutorChannelCapacity": 1,
            "ChannelFullRetryIntervalInMs": 50
        }
    }
}

プロパティ 既定値 タイプ 説明
ChannelFullRetryIntervalInMs 50 トリガー 容量範囲内のチャネルに項目を追加するときに使用されるサブスクライバーの再試行間隔をミリ秒単位で定義します。
ExecutorChannelCapacity 1 両方 チャネル メッセージ容量を定義します。 容量に達すると、Kafka サブスクライバーは関数が追いつくまで一時停止します。
マックスバッチサイズ 64 トリガー Kafka によってトリガーされる関数を呼び出すときの最大バッチ サイズ。
SubscriberIntervalInSeconds 1 トリガー 関数ごとに、受信メッセージが実行される最小頻度を秒単位で定義します。 メッセージ ボリュームが MaxBatchSize / SubscriberIntervalInSeconds 未満の場合のみ

Apache Kafka C/C++ クライアント ライブラリから継承される次のプロパティは、host.json の kafka セクションでも、トリガーまたは出力バインドとトリガーの両方でサポートされています。

プロパティ 適用対象 librdkafka と同等
AutoCommitIntervalMs トリガー auto.commit.interval.ms
AutoOffsetReset トリガー auto.offset.reset
FetchMaxBytes トリガー fetch.max.bytes
LibkafkaDebug 両方 debug
MaxPartitionFetchBytes トリガー max.partition.fetch.bytes
MaxPollIntervalMs トリガー max.poll.interval.ms
MetadataMaxAgeMs 両方 metadata.max.age.ms
QueuedMinMessages トリガー queued.min.messages
QueuedMaxMessagesKbytes トリガー queued.max.messages.kbytes
ReconnectBackoffMs トリガー reconnect.backoff.max.ms
ReconnectBackoffMaxMs トリガー reconnect.backoff.max.ms
SessionTimeoutMs トリガー session.timeout.ms
SocketKeepaliveEnable 両方 socket.keepalive.enable
StatisticsIntervalMs トリガー statistics.interval.ms

次のステップ