Azure Functions 的 Apache Kafka 觸發程序
您可以使用 Azure Functions 中的 Apache Kafka 觸發程式來執行函式程式碼,以回應 Kafka 主題中的訊息。 您也可以使用 Kafka 輸出系結 ,從函式寫入主題。 如需安裝和組態詳細數據的詳細資訊,請參閱 Azure Functions 的 Apache Kafka 系結概觀。
重要
Kafka 系結僅適用於彈性進階方案和專用 (App Service) 方案中的 Functions。 只有 3.x 版和更新版本的 Functions 運行時間才支援它們。
範例
觸發程式的使用方式取決於函式應用程式中所使用的 C# 形式,這可以是下列其中一種模式:
已編譯 C# 函式的隔離背景工作進程類別庫會在與運行時間隔離的進程中執行。
您使用的屬性取決於特定事件提供者。
下列範例示範 C# 函式,以 Kafka 事件的形式讀取和記錄 Kafka 訊息:
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
若要接收批次中的事件,請使用字串陣列做為輸入,如下列範例所示:
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
下列函式會記錄 Kafka 事件的訊息和標頭:
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
如需一組完整的工作 .NET 範例,請參閱 Kafka擴充功能存放庫。
注意
如需一組對等的 TypeScript 範例,請參閱 Kafka 擴充功能存放庫
function.json檔案的特定屬性取決於您的事件提供者,在這些範例中為 Confluent 或 Azure 事件中樞。 下列範例顯示讀取和記錄 Kafka 訊息之函式的 Kafka 觸發程式。
下列function.json會定義特定提供者的觸發程式:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "saslSsl",
"authenticationMode": "plain",
"consumerGroup" : "$Default",
"dataType": "string"
}
]
}
然後,下列程式代碼會在觸發函式時執行:
module.exports = async function (context, event) {
// context.log.info(event)
context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};
若要在批次中接收事件,請將 cardinality
function.json 檔案中的 值設定為 many
,如下列範例所示:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%"
}
]
}
下列程式代碼接著會剖析事件陣列,並記錄事件資料:
module.exports = async function (context, events) {
function print(event) {
var eventJson = JSON.parse(event)
context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
}
events.map(print);
};
下列程式代碼也會記錄標頭資料:
module.exports = async function (context, event) {
function print(kevent) {
var keventJson = JSON.parse(kevent)
context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
context.log.info(`Headers for this message:`)
let headers = keventJson.Headers;
headers.forEach(element => {
context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`)
});
}
event.map(print);
};
您可以定義傳遞至觸發程式之事件的一般 Avro 架構 。 下列function.json會使用一般 Avro 架構來定義特定提供者的觸發程式:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaAvroGenericSingle",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
然後,下列程式代碼會在觸發函式時執行:
module.exports = async function (context, event) {
context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};
如需一組完整的 JavaScript 範例,請參閱 Kafka 擴充功能存放庫。
function.json檔案的特定屬性取決於您的事件提供者,在這些範例中為 Confluent 或 Azure 事件中樞。 下列範例顯示讀取和記錄 Kafka 訊息之函式的 Kafka 觸發程式。
下列function.json會定義特定提供者的觸發程式:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
然後,下列程式代碼會在觸發函式時執行:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
若要在批次中接收事件,請將 cardinality
function.json 檔案中的 值設定為 many
,如下列範例所示:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
下列程式代碼接著會剖析事件陣列,並記錄事件資料:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
$event = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $event.Value"
}
下列程式代碼也會記錄標頭資料:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
foreach ($kafkaEvent in $kafkaEvents) {
$kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
Write-Output "Headers for this message:"
foreach ($header in $kevent.Headers) {
$DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
$Key = $header.Key
Write-Output "Key: $Key Value: $DecodedValue"
}
}
您可以定義傳遞至觸發程式之事件的一般 Avro 架構 。 下列function.json會使用一般 Avro 架構來定義特定提供者的觸發程式:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvent",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
然後,下列程式代碼會在觸發函式時執行:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
如需一組完整的運作 PowerShell 範例,請參閱 Kafka 擴充功能存放庫。
function.json檔案的特定屬性取決於您的事件提供者,在這些範例中為 Confluent 或 Azure 事件中樞。 下列範例顯示讀取和記錄 Kafka 訊息之函式的 Kafka 觸發程式。
下列function.json會定義特定提供者的觸發程式:
{
"scriptFile": "main.py",
"bindings": [
{
"type": "kafkaTrigger",
"name": "kevent",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"consumerGroup" : "functions",
"protocol": "saslSsl",
"authenticationMode": "plain"
}
]
}
然後,下列程式代碼會在觸發函式時執行:
import logging
from azure.functions import KafkaEvent
def main(kevent : KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
若要在批次中接收事件,請將 cardinality
function.json 檔案中的 值設定為 many
,如下列範例所示:
{
"scriptFile": "main.py",
"bindings": [
{
"type" : "kafkaTrigger",
"direction": "in",
"name" : "kevents",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"topic" : "message_python",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"dataType": "string",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"BrokerList" : "%BrokerList%"
}
]
}
下列程式代碼接著會剖析事件陣列,並記錄事件資料:
import logging
import typing
from azure.functions import KafkaEvent
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
logging.info(event.get_body())
下列程式代碼也會記錄標頭資料:
import logging
import typing
from azure.functions import KafkaEvent
import json
import base64
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
event_dec = event.get_body().decode('utf-8')
event_json = json.loads(event_dec)
logging.info("Python Kafka trigger function called for message " + event_json["Value"])
headers = event_json["Headers"]
for header in headers:
logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))
您可以定義傳遞至觸發程式之事件的一般 Avro 架構 。 下列function.json會使用一般 Avro 架構來定義特定提供者的觸發程式:
{
"scriptFile": "main.py",
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaTriggerAvroGeneric",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
然後,下列程式代碼會在觸發函式時執行:
import logging
from azure.functions import KafkaEvent
def main(kafkaTriggerAvroGeneric : KafkaEvent):
logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
logging.info(kafkaTriggerAvroGeneric.metadata)
如需一組完整的工作 Python 範例,請參閱 Kafka 擴充功能存放庫。
您用來設定觸發程式的批註取決於特定事件提供者。
下列範例顯示 Java 函式,可讀取和記錄 Kafka 事件的內容:
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) String kafkaEventData,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
}
若要接收批次中的事件,請使用輸入字串作為陣列,如下列範例所示:
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
下列函式會記錄 Kafka 事件的訊息和標頭:
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
您可以定義傳遞至觸發程式之事件的一般 Avro 架構 。 下列函式會使用一般 Avro 架構來定義特定提供者的觸發程式:
private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";
@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
@KafkaTrigger(
name = "kafkaAvroGenericSingle",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "ConfluentCloudUsername",
password = "ConfluentCloudPassword",
avroSchema = schema,
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL) Payment payment,
final ExecutionContext context) {
context.getLogger().info(payment.toString());
}
如需 Confluent 的完整工作 Java 範例集,請參閱 Kafka 擴充功能存放庫。
屬性
進程內和隔離的背景工作進程 C# 連結庫都會使用 KafkaTriggerAttribute
來定義函式觸發程式。
下表說明您可以使用這個觸發程式屬性來設定的屬性:
參數 | 描述 |
---|---|
BrokerList | (必要)觸發程式所監視的 Kafka 訊息代理程式清單。 如需詳細資訊,請參閱 連線 。 |
主題 | (必要)觸發程式所監視的主題。 |
ConsumerGroup | (選擇性)觸發程式所使用的 Kafka 取用者群組。 |
AvroSchema | (選擇性)使用 Avro 通訊協定時,一般記錄的架構。 |
AuthenticationMode | (選擇性)使用簡單驗證和安全性層 (SASL) 驗證時的驗證模式。 支援的值為Gssapi 、 (預設值)、、 ScramSha256 ScramSha512 。 Plain |
使用者名稱 | (選擇性)SASL 驗證的用戶名稱。 當 為Gssapi 時AuthenticationMode 不受支援。 如需詳細資訊,請參閱 連線 。 |
密碼 | (選擇性)SASL 驗證的密碼。 當 為Gssapi 時AuthenticationMode 不受支援。 如需詳細資訊,請參閱 連線 。 |
通訊協定 | (選擇性)與訊息代理程式通訊時所使用的安全性通訊協定。 支援的值為 plaintext (預設值)、、ssl 、 sasl_plaintext sasl_ssl 。 |
SslCaLocation | (選擇性)用於驗證訊息代理程序憑證的 CA 憑證檔案路徑。 |
SslCertificateLocation | (選擇性)用戶端憑證的路徑。 |
SslKeyLocation | (選擇性)用於驗證之用戶端私鑰 (PEM) 的路徑。 |
SslKeyPassword | (選擇性)用戶端憑證的密碼。 |
註釋
KafkaTrigger
註釋可讓您建立函式,以在收到主題時執行。 支援的選項包括下列元素:
元素 | 描述 |
---|---|
name | (必要)代表函式程式代碼中佇列或主題訊息的變數名稱。 |
brokerList | (必要)觸發程式所監視的 Kafka 訊息代理程式清單。 如需詳細資訊,請參閱 連線 。 |
topic | (必要)觸發程式所監視的主題。 |
基數 | (選擇性)表示觸發程式輸入的基數。 支援的值為 ONE (預設值) 和 MANY 。 ONE 當輸入是單一訊息,當MANY 輸入是訊息數位時使用。 當您使用 MANY 時,也必須設定 dataType 。 |
dataType | 定義 Functions 如何處理參數值。 根據預設,會以字串形式取得值,Functions 會嘗試將字串還原串行化為實際的純舊 Java 物件 (POJO)。 當 為 時 string ,輸入會視為字串。 當 為 時 binary ,訊息會以二進位數據的形式接收,而 Functions 會嘗試將它還原串行化為實際的參數類型 byte[]。 |
consumerGroup | (選擇性)觸發程式所使用的 Kafka 取用者群組。 |
avroSchema | (選擇性)使用 Avro 通訊協定時,一般記錄的架構。 |
authenticationMode | (選擇性)使用簡單驗證和安全性層 (SASL) 驗證時的驗證模式。 支援的值為Gssapi 、 (預設值)、、 ScramSha256 ScramSha512 。 Plain |
username | (選擇性)SASL 驗證的用戶名稱。 當 為Gssapi 時AuthenticationMode 不受支援。 如需詳細資訊,請參閱 連線 。 |
password | (選擇性)SASL 驗證的密碼。 當 為Gssapi 時AuthenticationMode 不受支援。 如需詳細資訊,請參閱 連線 。 |
protocol | (選擇性)與訊息代理程式通訊時所使用的安全性通訊協定。 支援的值為 plaintext (預設值)、、ssl 、 sasl_plaintext sasl_ssl 。 |
sslCaLocation | (選擇性)用於驗證訊息代理程序憑證的 CA 憑證檔案路徑。 |
sslCertificateLocation | (選擇性)用戶端憑證的路徑。 |
sslKeyLocation | (選擇性)用於驗證之用戶端私鑰 (PEM) 的路徑。 |
sslKeyPassword | (選擇性)用戶端憑證的密碼。 |
組態
下表說明您在 function.json 檔案中設定的繫結設定屬性。
function.json 屬性 | 描述 |
---|---|
type | (必要)必須設定為 kafkaTrigger 。 |
direction | (必要)必須設定為 in 。 |
name | (必要)代表函式程式碼中代理數據的變數名稱。 |
brokerList | (必要)觸發程式所監視的 Kafka 訊息代理程式清單。 如需詳細資訊,請參閱 連線 。 |
topic | (必要)觸發程式所監視的主題。 |
基數 | (選擇性)表示觸發程式輸入的基數。 支援的值為 ONE (預設值) 和 MANY 。 ONE 當輸入是單一訊息,當MANY 輸入是訊息數位時使用。 當您使用 MANY 時,也必須設定 dataType 。 |
dataType | 定義 Functions 如何處理參數值。 根據預設,會以字串形式取得值,Functions 會嘗試將字串還原串行化為實際的純舊 Java 物件 (POJO)。 當 為 時 string ,輸入會視為字串。 當 為 時 binary ,訊息會以二進位數據的形式接收,而 Functions 會嘗試將它還原串行化為實際的參數類型 byte[]。 |
consumerGroup | (選擇性)觸發程式所使用的 Kafka 取用者群組。 |
avroSchema | (選擇性)使用 Avro 通訊協定時,一般記錄的架構。 |
authenticationMode | (選擇性)使用簡單驗證和安全性層 (SASL) 驗證時的驗證模式。 支援的值為Gssapi 、 (預設值)、、 ScramSha256 ScramSha512 。 Plain |
username | (選擇性)SASL 驗證的用戶名稱。 當 為Gssapi 時AuthenticationMode 不受支援。 如需詳細資訊,請參閱 連線 。 |
password | (選擇性)SASL 驗證的密碼。 當 為Gssapi 時AuthenticationMode 不受支援。 如需詳細資訊,請參閱 連線 。 |
protocol | (選擇性)與訊息代理程式通訊時所使用的安全性通訊協定。 支援的值為 plaintext (預設值)、、ssl 、 sasl_plaintext sasl_ssl 。 |
sslCaLocation | (選擇性)用於驗證訊息代理程序憑證的 CA 憑證檔案路徑。 |
sslCertificateLocation | (選擇性)用戶端憑證的路徑。 |
sslKeyLocation | (選擇性)用於驗證之用戶端私鑰 (PEM) 的路徑。 |
sslKeyPassword | (選擇性)用戶端憑證的密碼。 |
使用方式
Kafka 事件目前支援為 JSON 承載的字串和字串陣列。
Kafka 訊息會當做 JSON 承載的字串和字串數位傳遞至函式。
在進階方案中,您必須啟用 Kafka 輸出的運行時間調整監視,才能相應放大至多個實例。 若要深入瞭解,請參閱 啟用運行時間調整。
您無法使用 Azure 入口網站中 [程式代碼 + 測試] 頁面的 [測試/執行] 功能來使用 Kafka 觸發程式。 您必須改為將測試事件直接傳送至觸發程式所監視的主題。
如需 Kafka 觸發程式支援的一組完整host.json設定,請參閱 host.json設定。
連線
觸發程式和系結所需的所有連接資訊都應該在應用程式設定中維護,而不是在程式碼中的系結定義中維護。 這適用於不應該儲存在程式碼中的認證。
重要
認證設定必須參考 應用程式設定。 不要在您的程式代碼或組態檔中硬式編碼認證。 在本機執行時,請針對您的認證使用 local.settings.json 檔案 ,而且不會發佈local.settings.json檔案。
線上到 Azure 中 Confluent 提供的受控 Kafka 叢集時,請確定您的 Confluent Cloud 環境的下列驗證認證是在觸發程式或系結中設定:
設定 | 建議值 | 描述 |
---|---|---|
BrokerList | BootstrapServer |
名為 BootstrapServer 的應用程式設定包含 Confluent Cloud 設定頁面中找到的啟動程式伺服器值。 值類似於 xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 。 |
使用者名稱 | ConfluentCloudUsername |
名為 ConfluentCloudUsername 的應用程式設定包含來自 Confluent Cloud 網站的 API 存取金鑰。 |
密碼 | ConfluentCloudPassword |
名為 ConfluentCloudPassword 的應用程式設定包含從 Confluent Cloud 網站取得的 API 秘密。 |
您用於這些設定的字串值必須以應用程式設定的形式出現在 Azure 或Values
本機開發期間local.settings.json檔案的集合中。
您也應該在繫 Protocol
結定義中設定、 AuthenticationMode
與 SslCaLocation
。