Výstupní vazba Apache Kafka pro Azure Functions
Výstupní vazba umožňuje aplikaci Azure Functions psát zprávy do tématu Kafka.
Důležité
Vazby Kafka jsou dostupné jenom pro funkce v plánu Elastic Premium a vyhrazeném plánu (App Service). Podporují se pouze ve verzi 3.x a novější verzi modulu runtime Functions.
Příklad
Použití vazby závisí na způsobu použití jazyka C# používaném ve vaší aplikaci funkcí, což může být jedna z následujících možností:
Kompilovaná funkce C# v izolované knihovně tříd pracovních procesů běží v procesu izolovaném od modulu runtime.
Atributy, které použijete, závisí na konkrétním poskytovateli událostí.
Následující příklad obsahuje vlastní návratový typ, který se MultipleOutputType
skládá z odpovědi HTTP a výstupu Kafka.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
Ve třídě MultipleOutputType
Kevent
je výstupní proměnná vazby pro vazbu Kafka.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Pokud chcete odeslat dávku událostí, předejte řetězcové pole do výstupního typu, jak je znázorněno v následujícím příkladu:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
Řetězcové pole je definováno jako Kevents
vlastnost třídy, na které je definována výstupní vazba:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Následující funkce přidá hlavičky do výstupních dat Kafka:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
Kompletní sadu funkčních příkladů .NET najdete v úložišti rozšíření Kafka.
Poznámka:
Ekvivalentní sadu příkladů TypeScriptu najdete v úložišti rozšíření Kafka.
Konkrétní vlastnosti souboru function.json závisí na poskytovateli událostí, který v těchto příkladech jsou Confluent nebo Azure Event Hubs. Následující příklady ukazují výstupní vazbu Kafka pro funkci aktivovanou požadavkem HTTP a odesílají data z požadavku do tématu Kafka.
Následující function.json definuje trigger pro konkrétního poskytovatele v těchto příkladech:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
Následující kód pak odešle zprávu do tématu:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message);
context.bindings.outputKafkaMessage = message;
context.res = {
// status: 200, /* Defaults to 200 */
body: 'Ok'
};
}
Následující kód odešle více zpráv jako pole do stejného tématu:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
context.bindings.outputKafkaMessages = ["one", "two"];
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
Následující příklad ukazuje, jak odeslat zprávu události se záhlavími do stejného tématu Kafka:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = message
? "Message received: " + message + ". The message transfered to the kafka broker."
: "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
Kompletní sadu funkčních příkladů JavaScriptu najdete v úložišti rozšíření Kafka.
Konkrétní vlastnosti souboru function.json závisí na poskytovateli událostí, který v těchto příkladech jsou Confluent nebo Azure Event Hubs. Následující příklady ukazují výstupní vazbu Kafka pro funkci aktivovanou požadavkem HTTP a odesílají data z požadavku do tématu Kafka.
Následující function.json definuje trigger pro konkrétního poskytovatele v těchto příkladech:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
Následující kód pak odešle zprávu do tématu:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
Následující kód odešle více zpráv jako pole do stejného tématu:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
Následující příklad ukazuje, jak odeslat zprávu události se záhlavími do stejného tématu Kafka:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
Kompletní sadu funkčních příkladů PowerShellu najdete v úložišti rozšíření Kafka.
Konkrétní vlastnosti souboru function.json závisí na poskytovateli událostí, který v těchto příkladech jsou Confluent nebo Azure Event Hubs. Následující příklady ukazují výstupní vazbu Kafka pro funkci aktivovanou požadavkem HTTP a odesílají data z požadavku do tématu Kafka.
Následující function.json definuje trigger pro konkrétního poskytovatele v těchto příkladech:
{
"scriptFile": "main.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"direction": "out",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
Následující kód pak odešle zprávu do tématu:
import logging
import azure.functions as func
def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
Následující kód odešle více zpráv jako pole do stejného tématu:
import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json
def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
outputMessage.set(['one', 'two'])
return 'OK'
Následující příklad ukazuje, jak odeslat zprávu události se záhlavími do stejného tématu Kafka:
import logging
import azure.functions as func
import json
def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
Kompletní sadu funkčních příkladů Pythonu najdete v úložišti rozšíření Kafka.
Poznámky, které použijete ke konfiguraci výstupní vazby, závisí na konkrétním poskytovateli událostí.
Následující funkce odešle zprávu do tématu Kafka.
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
Následující příklad ukazuje, jak odeslat více zpráv do tématu Kafka.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
V tomto příkladu se parametr výstupní vazby změní na pole řetězců.
Poslední příklad se používá pro tyto KafkaEntity
a KafkaHeader
třídy:
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
Následující ukázková funkce odešle zprávu se záhlavími do tématu Kafka.
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
Kompletní sadu funkčních příkladů Javy pro Confluent najdete v úložišti rozšíření Kafka.
Atributy
Knihovny C# v procesu i izolovaného pracovního procesu používají Kafka
atribut k definování triggeru funkce.
Následující tabulka vysvětluje vlastnosti, které můžete nastavit pomocí tohoto atributu:
Parametr | Popis |
---|---|
BrokerList | (Povinné) Seznam zprostředkovatelů Kafka, do kterých se odesílá výstup. Další informace najdete v tématu Připojení . |
Předmět zájmu | (Povinné) Téma, do kterého se výstup odešle. |
AvroSchema | (Volitelné) Schéma obecného záznamu při použití protokolu Avro |
MaxMessageBytes | (Volitelné) Maximální velikost odesílané výstupní zprávy (v MB) s výchozí hodnotou 1 . |
BatchSize | (Volitelné) Maximální počet zpráv v jedné sadě zpráv s výchozí hodnotou 10000 . |
EnableIdempotence | (Volitelné) Pokud je nastavena hodnota true , zaručuje, že zprávy jsou úspěšně vytvořeny přesně jednou a v původním pořadí produkce s výchozí hodnotou false |
MessageTimeoutMs | (Volitelné) Časový limit místní zprávy v milisekundách. Tato hodnota se vynucuje pouze místně a omezuje čas čekání vytvořené zprávy na úspěšné doručení s výchozí 300000 hodnotou . Čas 0 je nekonečný. Tato hodnota je maximální doba použitá k doručení zprávy (včetně opakování). K chybě doručení dochází při překročení počtu opakování nebo vypršení časového limitu zprávy. |
RequestTimeoutMs | (Volitelné) Časový limit potvrzení výstupního požadavku v milisekundách s výchozí hodnotou 5000 . |
MaxRetries | (Volitelné) Počet opakování pokusu o odeslání neúspěšné zprávy s výchozí hodnotou 2 . Opakování může způsobit změny pořadí, pokud EnableIdempotence není nastavena na true hodnotu . |
AuthenticationMode | (Volitelné) Režim ověřování při použití ověřování SASL (Simple Authentication and Security Layer). Podporované hodnoty jsou Gssapi , Plain (výchozí), ScramSha256 , ScramSha512 . |
Uživatelské jméno | (Volitelné) Uživatelské jméno pro ověřování SASL. Nepodporuje se, pokud AuthenticationMode je .Gssapi Další informace najdete v tématu Připojení . |
Heslo | (Volitelné) Heslo pro ověřování SASL. Nepodporuje se, pokud AuthenticationMode je .Gssapi Další informace najdete v tématu Připojení . |
Protokol | (Volitelné) Protokol zabezpečení používaný při komunikaci s zprostředkovateli. Podporované hodnoty jsou plaintext (výchozí), ssl , sasl_plaintext , sasl_ssl . |
SslCaLocation | (Volitelné) Cesta k souboru certifikátu certifikační autority pro ověření certifikátu zprostředkovatele |
SslCertificateLocation | (Volitelné) Cesta k certifikátu klienta |
SslKeyLocation | (Volitelné) Cesta k privátnímu klíči klienta (PEM) používanému k ověřování |
SslKeyPassword | (Volitelné) Heslo pro certifikát klienta. |
Poznámky
Poznámka KafkaOutput
umožňuje vytvořit funkci, která zapisuje do konkrétního tématu. Mezi podporované možnosti patří následující prvky:
Element (Prvek) | Popis |
---|---|
Jméno | Název proměnné, která představuje zprostředkovaná data v kódu funkce. |
brokerList | (Povinné) Seznam zprostředkovatelů Kafka, do kterých se odesílá výstup. Další informace najdete v tématu Připojení . |
topic | (Povinné) Téma, do kterého se výstup odešle. |
Datatype | Definuje, jak služba Functions zpracovává hodnotu parametru. Ve výchozím nastavení je hodnota získána jako řetězec a Functions se pokusí deserializovat řetězec na skutečný prostý starý java objekt (POJO). Když string se vstup považuje za řetězec. Když binary se zpráva přijme jako binární data a functions se pokusí deserializovat na skutečný typ parametru bajt[]. |
avroSchema | (Volitelné) Schéma obecného záznamu při použití protokolu Avro (Aktuálně se nepodporuje pro Javu.) |
maxMessageBytes | (Volitelné) Maximální velikost odesílané výstupní zprávy (v MB) s výchozí hodnotou 1 . |
batchSize | (Volitelné) Maximální počet zpráv v jedné sadě zpráv s výchozí hodnotou 10000 . |
enableIdempotence | (Volitelné) Pokud je nastavena hodnota true , zaručuje, že zprávy jsou úspěšně vytvořeny přesně jednou a v původním pořadí produkce s výchozí hodnotou false |
messageTimeoutMs | (Volitelné) Časový limit místní zprávy v milisekundách. Tato hodnota se vynucuje pouze místně a omezuje čas čekání vytvořené zprávy na úspěšné doručení s výchozí 300000 hodnotou . Čas 0 je nekonečný. Jedná se o maximální dobu použitou k doručení zprávy (včetně opakování). K chybě doručení dochází při překročení počtu opakování nebo vypršení časového limitu zprávy. |
requestTimeoutMs | (Volitelné) Časový limit potvrzení výstupního požadavku v milisekundách s výchozí hodnotou 5000 . |
maxRetries | (Volitelné) Počet opakování pokusu o odeslání neúspěšné zprávy s výchozí hodnotou 2 . Opakování může způsobit změny pořadí, pokud EnableIdempotence není nastavena na true hodnotu . |
authenticationMode | (Volitelné) Režim ověřování při použití ověřování SASL (Simple Authentication and Security Layer). Podporované hodnoty jsou Gssapi , Plain (výchozí), ScramSha256 , ScramSha512 . |
uživatelské jméno | (Volitelné) Uživatelské jméno pro ověřování SASL. Nepodporuje se, pokud AuthenticationMode je .Gssapi Další informace najdete v tématu Připojení . |
heslo | (Volitelné) Heslo pro ověřování SASL. Nepodporuje se, pokud AuthenticationMode je .Gssapi Další informace najdete v tématu Připojení . |
protokol | (Volitelné) Protokol zabezpečení používaný při komunikaci s zprostředkovateli. Podporované hodnoty jsou plaintext (výchozí), ssl , sasl_plaintext , sasl_ssl . |
sslCaLocation | (Volitelné) Cesta k souboru certifikátu certifikační autority pro ověření certifikátu zprostředkovatele |
sslCertificateLocation | (Volitelné) Cesta k certifikátu klienta |
sslKeyLocation | (Volitelné) Cesta k privátnímu klíči klienta (PEM) používanému k ověřování |
sslKeyPassword | (Volitelné) Heslo pro certifikát klienta. |
Konfigurace
Následující tabulka vysvětluje vlastnosti konfigurace vazby, které jste nastavili v souboru function.json .
vlastnost function.json | Popis |
---|---|
type | Musí být nastavena na kafka hodnotu . |
direction | Musí být nastavena na out hodnotu . |
Jméno | Název proměnné, která představuje zprostředkovaná data v kódu funkce. |
brokerList | (Povinné) Seznam zprostředkovatelů Kafka, do kterých se odesílá výstup. Další informace najdete v tématu Připojení . |
topic | (Povinné) Téma, do kterého se výstup odešle. |
avroSchema | (Volitelné) Schéma obecného záznamu při použití protokolu Avro |
maxMessageBytes | (Volitelné) Maximální velikost odesílané výstupní zprávy (v MB) s výchozí hodnotou 1 . |
batchSize | (Volitelné) Maximální počet zpráv v jedné sadě zpráv s výchozí hodnotou 10000 . |
enableIdempotence | (Volitelné) Pokud je nastavena hodnota true , zaručuje, že zprávy jsou úspěšně vytvořeny přesně jednou a v původním pořadí produkce s výchozí hodnotou false |
messageTimeoutMs | (Volitelné) Časový limit místní zprávy v milisekundách. Tato hodnota se vynucuje pouze místně a omezuje čas čekání vytvořené zprávy na úspěšné doručení s výchozí 300000 hodnotou . Čas 0 je nekonečný. Jedná se o maximální dobu použitou k doručení zprávy (včetně opakování). K chybě doručení dochází při překročení počtu opakování nebo vypršení časového limitu zprávy. |
requestTimeoutMs | (Volitelné) Časový limit potvrzení výstupního požadavku v milisekundách s výchozí hodnotou 5000 . |
maxRetries | (Volitelné) Počet opakování pokusu o odeslání neúspěšné zprávy s výchozí hodnotou 2 . Opakování může způsobit změny pořadí, pokud EnableIdempotence není nastavena na true hodnotu . |
authenticationMode | (Volitelné) Režim ověřování při použití ověřování SASL (Simple Authentication and Security Layer). Podporované hodnoty jsou Gssapi , Plain (výchozí), ScramSha256 , ScramSha512 . |
uživatelské jméno | (Volitelné) Uživatelské jméno pro ověřování SASL. Nepodporuje se, pokud AuthenticationMode je .Gssapi Další informace najdete v tématu Připojení . |
heslo | (Volitelné) Heslo pro ověřování SASL. Nepodporuje se, pokud AuthenticationMode je .Gssapi Další informace najdete v tématu Připojení . |
protokol | (Volitelné) Protokol zabezpečení používaný při komunikaci s zprostředkovateli. Podporované hodnoty jsou plaintext (výchozí), ssl , sasl_plaintext , sasl_ssl . |
sslCaLocation | (Volitelné) Cesta k souboru certifikátu certifikační autority pro ověření certifikátu zprostředkovatele |
sslCertificateLocation | (Volitelné) Cesta k certifikátu klienta |
sslKeyLocation | (Volitelné) Cesta k privátnímu klíči klienta (PEM) používanému k ověřování |
sslKeyPassword | (Volitelné) Heslo pro certifikát klienta. |
Využití
S integrovaným serializací Avro a Protobuf se podporují klíče i typy hodnot.
Posun, oddíl a časové razítko události se generují za běhu. Uvnitř funkce lze nastavit pouze hodnoty a hlavičky. Téma je nastavené v function.json.
Ujistěte se, že máte přístup k tématu Kafka, ke kterému se pokoušíte psát. Vazbu nakonfigurujete pomocí přihlašovacích údajů pro přístup a připojení k tématu Kafka.
V plánu Premium musíte povolit monitorování škálování modulu runtime pro výstup Kafka, aby bylo možné škálovat na více instancí. Další informace najdete v tématu Povolení škálování modulu runtime.
Úplnou sadu podporovaných nastavení host.json pro trigger Kafka najdete v tématu host.json nastavení.
Propojení
Všechny informace o připojení vyžadované triggery a vazbami by se měly udržovat v nastavení aplikace, a ne v definicích vazeb v kódu. To platí pro přihlašovací údaje, které by nikdy neměly být uloženy ve vašem kódu.
Důležité
Nastavení přihlašovacích údajů musí odkazovat na nastavení aplikace. Nezakódujte v kódu ani v konfiguračních souborech pevně zakódujte přihlašovací údaje. Při místním spuštění použijte local.settings.json soubor pro přihlašovací údaje a nepublikujte local.settings.json soubor.
Při připojování ke spravovanému clusteru Kafka poskytovanému Confluentem v Azure se ujistěte, že ve vašem triggeru nebo vazbě jsou nastavené následující ověřovací přihlašovací údaje pro vaše prostředí Confluent Cloud:
Nastavení | Doporučená hodnota | Popis |
---|---|---|
BrokerList | BootstrapServer |
Nastavení aplikace s názvem BootstrapServer obsahuje hodnotu serveru bootstrap nalezené na stránce nastavení confluent cloudu. Hodnota se podobá xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 . |
Uživatelské jméno | ConfluentCloudUsername |
Nastavení aplikace s názvem ConfluentCloudUsername obsahuje přístupový klíč rozhraní API z webu Confluent Cloud. |
Heslo | ConfluentCloudPassword |
Nastavení aplikace s názvem ConfluentCloudPassword obsahuje tajný klíč rozhraní API získaný z webu Confluent Cloud. |
Řetězcové hodnoty, které použijete pro tato nastavení, musí být přítomné jako nastavení aplikace v Azure nebo v Values
kolekci v souboru local.settings.json během místního vývoje.
Měli byste také nastavit , Protocol
AuthenticationMode
a SslCaLocation
v definicích vazeb.