Azure Functions における Apache Kafka バインドの概要

Azure Functions 用の Kafka 拡張機能を使用すると、出力バインドを使用して Apache Kafka トピックに値を書き込めます。 トリガーを使用して、Kafka トピックのメッセージに応答して関数を呼び出すこともできます。

重要

Kafka バインドは、エラスティック Premium プランおよび 専用 (App Service) プランの Functions でのみ使用できます。 これらは、バージョン 3.x 以降のバージョンの Functions ランタイムでのみサポートされます。

アクション Type
新しい Kafka イベントに基づいて関数を実行します。 トリガー
Kafka イベント ストリームに書き込みます。 出力バインド

拡張機能のインストール

インストールする拡張機能 NuGet パッケージは、関数アプリで使用している C# モードによって異なります。

関数は分離された C# ワーカー プロセスで実行されます。 詳しくは、「分離ワーカー プロセスにおける C# Azure Functions の実行のガイド」をご覧ください。

この NuGet パッケージをインストールすることによって、プロジェクトに拡張機能を追加します。

バンドルのインストール

Kafka 拡張機能は、host.json プロジェクト ファイルで指定される拡張機能バンドルの一部です。 Functions バージョン 3.x 以降を対象とするプロジェクトを作成する場合は、このバンドルが既にインストールされている必要があります。 詳細については、「拡張機能のバンドル」を参照してください。

ランタイム スケールを有効にする

Kafka のトリガーとバインドを使用するときに関数が Premium プランで適切にスケーリングできるようにするには、ランタイム スケールの監視を有効にする必要があります。

Azure portal の関数アプリで [構成] を選択し、[関数のランタイム設定] タブで [ランタイム スケールの監視][オン] にします。

ランタイム スケーリングを有効にする Azure portal パネルのスクリーンショット。

host.json 設定

このセクションでは、バージョン 3.x 以降でこのバインドに使用可能な構成設定について説明します。 host.json ファイルの設定は、関数アプリ インスタンスのすべての関数に適用されます。 バージョン 3.x 以降のバージョンでの関数アプリ構成設定の詳細については、Azure Functions の host.json のリファレンスに関するページを参照してください。

{
    "version": "2.0",
    "extensions": {
        "kafka": {
            "maxBatchSize": 64,
            "SubscriberIntervalInSeconds": 1,
            "ExecutorChannelCapacity": 1,
            "ChannelFullRetryIntervalInMs": 50
        }
    }
}

プロパティ Default Type 説明
ChannelFullRetryIntervalInMs 50 トリガー 容量範囲内のチャネルに項目を追加するときに使用されるサブスクライバーの再試行間隔をミリ秒単位で定義します。
ExecutorChannelCapacity 1 両方 チャネル メッセージ容量を定義します。 容量に達すると、Kafka サブスクライバーは関数が追いつくまで一時停止します。
MaxBatchSize 64 トリガー Kafka によってトリガーされる関数を呼び出すときの最大バッチ サイズ。
SubscriberIntervalInSeconds 1 トリガー 関数ごとに、受信メッセージが実行される最小頻度を秒単位で定義します。 メッセージ ボリュームが MaxBatchSize / SubscriberIntervalInSeconds 未満の場合のみ

Apache Kafka C/C++ クライアント ライブラリから継承される次のプロパティは、host.json の kafka セクションでも、トリガーまたは出力バインドとトリガーの両方でサポートされています。

プロパティ 適用対象 librdkafka と同等
AutoCommitIntervalMs トリガー auto.commit.interval.ms
AutoOffsetReset トリガー auto.offset.reset
FetchMaxBytes トリガー fetch.max.bytes
LibkafkaDebug 両方 debug
MaxPartitionFetchBytes トリガー max.partition.fetch.bytes
MaxPollIntervalMs トリガー max.poll.interval.ms
MetadataMaxAgeMs 両方 metadata.max.age.ms
QueuedMinMessages トリガー queued.min.messages
QueuedMaxMessagesKbytes トリガー queued.max.messages.kbytes
ReconnectBackoffMs トリガー reconnect.backoff.max.ms
ReconnectBackoffMaxMs トリガー reconnect.backoff.max.ms
SessionTimeoutMs トリガー session.timeout.ms
SocketKeepaliveEnable 両方 socket.keepalive.enable
StatisticsIntervalMs トリガー statistics.interval.ms

次のステップ