適用於 Azure Functions 的 Apache Kafka 輸出系結
輸出系結可讓 Azure Functions 應用程式將訊息寫入 Kafka 主題。
重要
Kafka 系結僅適用於彈性進階方案和專用 (App Service) 方案中的 Functions。 只有 3.x 版和更新版本的 Functions 運行時間才支援它們。
範例
系結的使用方式取決於函式應用程式中所使用的 C# 形式,這可以是下列其中一項:
已編譯 C# 函式的隔離背景工作進程類別庫會在與運行時間隔離的進程中執行。
您使用的屬性取決於特定事件提供者。
下列範例具有自定義傳回類型,其 MultipleOutputType
包含 HTTP 回應和 Kafka 輸出。
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
在類別 MultipleOutputType
中, Kevent
是 Kafka 系結的輸出系結變數。
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
若要傳送事件批次,請將字串陣列傳遞至輸出類型,如下列範例所示:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
字串陣列定義為 Kevents
類別上的 屬性,其中定義了輸出系結:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
下列函式會將標頭新增至 Kafka 輸出資料:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
如需一組完整的工作 .NET 範例,請參閱 Kafka擴充功能存放庫。
注意
如需一組對等的 TypeScript 範例,請參閱 Kafka 擴充功能存放庫
function.json檔案的特定屬性取決於您的事件提供者,在這些範例中為 Confluent 或 Azure 事件中樞。 下列範例示範由 HTTP 要求所觸發之函式的 Kafka 輸出系結,並將要求中的數據傳送至 Kafka 主題。
下列function.json會定義這些範例中特定提供者的觸發程式:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
下列程式代碼接著會將訊息傳送至主題:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message);
context.bindings.outputKafkaMessage = message;
context.res = {
// status: 200, /* Defaults to 200 */
body: 'Ok'
};
}
下列程式代碼會將多個訊息當做數位件傳送至相同的主題:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
context.bindings.outputKafkaMessages = ["one", "two"];
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
下列範例示範如何將標頭的事件訊息傳送至相同的 Kafka 主題:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = message
? "Message received: " + message + ". The message transfered to the kafka broker."
: "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
如需一組完整的 JavaScript 範例,請參閱 Kafka 擴充功能存放庫。
function.json檔案的特定屬性取決於您的事件提供者,在這些範例中為 Confluent 或 Azure 事件中樞。 下列範例示範由 HTTP 要求所觸發之函式的 Kafka 輸出系結,並將要求中的數據傳送至 Kafka 主題。
下列function.json會定義這些範例中特定提供者的觸發程式:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
下列程式代碼接著會將訊息傳送至主題:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
下列程式代碼會將多個訊息當做數位件傳送至相同的主題:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
下列範例示範如何將標頭的事件訊息傳送至相同的 Kafka 主題:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
如需一組完整的運作 PowerShell 範例,請參閱 Kafka 擴充功能存放庫。
function.json檔案的特定屬性取決於您的事件提供者,在這些範例中為 Confluent 或 Azure 事件中樞。 下列範例示範由 HTTP 要求所觸發之函式的 Kafka 輸出系結,並將要求中的數據傳送至 Kafka 主題。
下列function.json會定義這些範例中特定提供者的觸發程式:
{
"scriptFile": "main.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"direction": "out",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
下列程式代碼接著會將訊息傳送至主題:
import logging
import azure.functions as func
def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
下列程式代碼會將多個訊息當做數位件傳送至相同的主題:
import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json
def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
outputMessage.set(['one', 'two'])
return 'OK'
下列範例示範如何將標頭的事件訊息傳送至相同的 Kafka 主題:
import logging
import azure.functions as func
import json
def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
如需一組完整的工作 Python 範例,請參閱 Kafka 擴充功能存放庫。
您用來設定輸出系結的批註取決於特定事件提供者。
下列函式會將訊息傳送至 Kafka 主題。
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
下列範例示範如何將多個訊息傳送至 Kafka 主題。
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
在此範例中,輸出系結參數會變更為字串陣列。
最後一個範例會使用這些 KafkaEntity
和 KafkaHeader
類別:
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
下列範例函式會將具有標頭的訊息傳送至 Kafka 主題。
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
如需 Confluent 的完整工作 Java 範例集,請參閱 Kafka 擴充功能存放庫。
屬性
進程內和隔離的背景工作進程 C# 連結庫都會使用 Kafka
屬性來定義函式觸發程式。
下表說明您可以使用此屬性設定的屬性:
參數 | 描述 |
---|---|
BrokerList | (必要)傳送輸出的 Kafka 訊息代理程式清單。 如需詳細資訊,請參閱 連線 。 |
主題 | (必要)輸出要傳送至的主題。 |
AvroSchema | (選擇性)使用 Avro 通訊協定時,一般記錄的架構。 |
MaxMessageBytes | (選擇性)所傳送輸出訊息的大小上限(以 MB 為單位),預設值為 1 。 |
BatchSize | (選擇性)單一訊息集中批處理的訊息數目上限,預設值為 10000 。 |
EnableIdempotence | (選擇性)當設定為 true 時,保證訊息已成功產生一次,且以原始產生順序,預設值為 false |
MessageTimeoutMs | (選擇性)本機訊息逾時,以毫秒為單位。 這個值只會在本機強制執行,並限制產生的訊息等候成功傳遞的時間,預設 300000 為 。 的時間是無限的 0 。 此值是用來傳遞訊息的最長時間(包括重試)。 超過重試計數或訊息逾時時,就會發生傳遞錯誤。 |
RequestTimeoutMs | (選擇性)輸出要求的通知逾時,以毫秒為單位,預設值為 5000 。 |
MaxRetries | (選擇性)重試傳送失敗訊息的次數,預設值為 2 。 除非 EnableIdempotence 設定為 true ,否則重試可能會導致重新排序。 |
AuthenticationMode | (選擇性)使用簡單驗證和安全性層 (SASL) 驗證時的驗證模式。 支援的值為Gssapi 、 (預設值)、、 ScramSha256 ScramSha512 。 Plain |
使用者名稱 | (選擇性)SASL 驗證的用戶名稱。 當 為Gssapi 時AuthenticationMode 不受支援。 如需詳細資訊,請參閱 連線 。 |
密碼 | (選擇性)SASL 驗證的密碼。 當 為Gssapi 時AuthenticationMode 不受支援。 如需詳細資訊,請參閱 連線 。 |
通訊協定 | (選擇性)與訊息代理程式通訊時所使用的安全性通訊協定。 支援的值為 plaintext (預設值)、、ssl 、 sasl_plaintext sasl_ssl 。 |
SslCaLocation | (選擇性)用於驗證訊息代理程序憑證的 CA 憑證檔案路徑。 |
SslCertificateLocation | (選擇性)用戶端憑證的路徑。 |
SslKeyLocation | (選擇性)用於驗證之用戶端私鑰 (PEM) 的路徑。 |
SslKeyPassword | (選擇性)用戶端憑證的密碼。 |
註釋
批 KafkaOutput
註可讓您建立寫入特定主題的函式。 支援的選項包括下列元素:
元素 | 描述 |
---|---|
name | 代表函式程式碼中代理數據的變數名稱。 |
brokerList | (必要)傳送輸出的 Kafka 訊息代理程式清單。 如需詳細資訊,請參閱 連線 。 |
topic | (必要)輸出要傳送至的主題。 |
dataType | 定義 Functions 如何處理參數值。 根據預設,會以字串形式取得值,Functions 會嘗試將字串還原串行化為實際的純舊 Java 物件 (POJO)。 當 為 時 string ,輸入會視為字串。 當 為 時 binary ,訊息會以二進位數據的形式接收,而 Functions 會嘗試將它還原串行化為實際的參數類型 byte[]。 |
avroSchema | (選擇性)使用 Avro 通訊協定時,一般記錄的架構。 (目前不支援 Java。 |
maxMessageBytes | (選擇性)所傳送輸出訊息的大小上限(以 MB 為單位),預設值為 1 。 |
batchSize | (選擇性)單一訊息集中批處理的訊息數目上限,預設值為 10000 。 |
enableIdempotence | (選擇性)當設定為 true 時,保證訊息已成功產生一次,且以原始產生順序,預設值為 false |
messageTimeoutMs | (選擇性)本機訊息逾時,以毫秒為單位。 這個值只會在本機強制執行,並限制產生的訊息等候成功傳遞的時間,預設 300000 為 。 的時間是無限的 0 。 這是用來傳遞訊息的最長時間(包括重試)。 超過重試計數或訊息逾時時,就會發生傳遞錯誤。 |
requestTimeoutMs | (選擇性)輸出要求的通知逾時,以毫秒為單位,預設值為 5000 。 |
maxRetries | (選擇性)重試傳送失敗訊息的次數,預設值為 2 。 除非 EnableIdempotence 設定為 true ,否則重試可能會導致重新排序。 |
authenticationMode | (選擇性)使用簡單驗證和安全性層 (SASL) 驗證時的驗證模式。 支援的值為Gssapi 、 (預設值)、、 ScramSha256 ScramSha512 。 Plain |
username | (選擇性)SASL 驗證的用戶名稱。 當 為Gssapi 時AuthenticationMode 不受支援。 如需詳細資訊,請參閱 連線 。 |
password | (選擇性)SASL 驗證的密碼。 當 為Gssapi 時AuthenticationMode 不受支援。 如需詳細資訊,請參閱 連線 。 |
protocol | (選擇性)與訊息代理程式通訊時所使用的安全性通訊協定。 支援的值為 plaintext (預設值)、、ssl 、 sasl_plaintext sasl_ssl 。 |
sslCaLocation | (選擇性)用於驗證訊息代理程序憑證的 CA 憑證檔案路徑。 |
sslCertificateLocation | (選擇性)用戶端憑證的路徑。 |
sslKeyLocation | (選擇性)用於驗證之用戶端私鑰 (PEM) 的路徑。 |
sslKeyPassword | (選擇性)用戶端憑證的密碼。 |
組態
下表說明您在 function.json 檔案中設定的繫結設定屬性。
function.json 屬性 | 描述 |
---|---|
type | 必須設定為 kafka 。 |
direction | 必須設定為 out 。 |
name | 代表函式程式碼中代理數據的變數名稱。 |
brokerList | (必要)傳送輸出的 Kafka 訊息代理程式清單。 如需詳細資訊,請參閱 連線 。 |
topic | (必要)輸出要傳送至的主題。 |
avroSchema | (選擇性)使用 Avro 通訊協定時,一般記錄的架構。 |
maxMessageBytes | (選擇性)所傳送輸出訊息的大小上限(以 MB 為單位),預設值為 1 。 |
batchSize | (選擇性)單一訊息集中批處理的訊息數目上限,預設值為 10000 。 |
enableIdempotence | (選擇性)當設定為 true 時,保證訊息已成功產生一次,且以原始產生順序,預設值為 false |
messageTimeoutMs | (選擇性)本機訊息逾時,以毫秒為單位。 這個值只會在本機強制執行,並限制產生的訊息等候成功傳遞的時間,預設 300000 為 。 的時間是無限的 0 。 這是用來傳遞訊息的最長時間(包括重試)。 超過重試計數或訊息逾時時,就會發生傳遞錯誤。 |
requestTimeoutMs | (選擇性)輸出要求的通知逾時,以毫秒為單位,預設值為 5000 。 |
maxRetries | (選擇性)重試傳送失敗訊息的次數,預設值為 2 。 除非 EnableIdempotence 設定為 true ,否則重試可能會導致重新排序。 |
authenticationMode | (選擇性)使用簡單驗證和安全性層 (SASL) 驗證時的驗證模式。 支援的值為Gssapi 、 (預設值)、、 ScramSha256 ScramSha512 。 Plain |
username | (選擇性)SASL 驗證的用戶名稱。 當 為Gssapi 時AuthenticationMode 不受支援。 如需詳細資訊,請參閱 連線 。 |
password | (選擇性)SASL 驗證的密碼。 當 為Gssapi 時AuthenticationMode 不受支援。 如需詳細資訊,請參閱 連線 。 |
protocol | (選擇性)與訊息代理程式通訊時所使用的安全性通訊協定。 支援的值為 plaintext (預設值)、、ssl 、 sasl_plaintext sasl_ssl 。 |
sslCaLocation | (選擇性)用於驗證訊息代理程序憑證的 CA 憑證檔案路徑。 |
sslCertificateLocation | (選擇性)用戶端憑證的路徑。 |
sslKeyLocation | (選擇性)用於驗證之用戶端私鑰 (PEM) 的路徑。 |
sslKeyPassword | (選擇性)用戶端憑證的密碼。 |
使用方式
在運行時間會產生事件的位移、數據分割和時間戳。 函式內只能設定值和標頭。 主題是在function.json中設定。
請務必能夠存取您嘗試撰寫的 Kafka 主題。 您可以使用 Kafka 主題的存取和連線認證來設定系結。
在進階方案中,您必須啟用 Kafka 輸出的運行時間調整監視,才能相應放大至多個實例。 若要深入瞭解,請參閱 啟用運行時間調整。
如需 Kafka 觸發程式支援的一組完整host.json設定,請參閱 host.json設定。
連線
觸發程式和系結所需的所有連接資訊都應該在應用程式設定中維護,而不是在程式碼中的系結定義中維護。 這適用於不應該儲存在程式碼中的認證。
重要
認證設定必須參考 應用程式設定。 不要在您的程式代碼或組態檔中硬式編碼認證。 在本機執行時,請針對您的認證使用 local.settings.json 檔案 ,而且不會發佈local.settings.json檔案。
線上到 Azure 中 Confluent 提供的受控 Kafka 叢集時,請確定您的 Confluent Cloud 環境的下列驗證認證是在觸發程式或系結中設定:
設定 | 建議值 | 描述 |
---|---|---|
BrokerList | BootstrapServer |
名為 BootstrapServer 的應用程式設定包含 Confluent Cloud 設定頁面中找到的啟動程式伺服器值。 值類似於 xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 。 |
使用者名稱 | ConfluentCloudUsername |
名為 ConfluentCloudUsername 的應用程式設定包含來自 Confluent Cloud 網站的 API 存取金鑰。 |
密碼 | ConfluentCloudPassword |
名為 ConfluentCloudPassword 的應用程式設定包含從 Confluent Cloud 網站取得的 API 秘密。 |
您用於這些設定的字串值必須以應用程式設定的形式出現在 Azure 或Values
本機開發期間local.settings.json檔案的集合中。
您也應該在繫 Protocol
結定義中設定、 AuthenticationMode
與 SslCaLocation
。