Vinculação de saída do Apache Kafka para o Azure Functions
A associação de saída permite que um aplicativo do Azure Functions escreva mensagens em um tópico do Kafka.
Importante
As ligações Kafka só estão disponíveis para Funções no plano Elastic Premium e no plano Dedicado (Serviço de Aplicativo). Eles só são suportados na versão 3.x e versão posterior do tempo de execução do Functions.
O uso da associação depende da modalidade C# usada em seu aplicativo de função, que pode ser uma das seguintes:
Uma biblioteca de classes de processo de trabalho isolada compilada função C# é executada em um processo isolado do tempo de execução.
Os atributos que você usa dependem do provedor de eventos específico.
O exemplo a seguir tem um tipo de retorno personalizado que é MultipleOutputType
, que consiste em uma resposta HTTP e uma saída Kafka.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
Na classe MultipleOutputType
, Kevent
é a variável de ligação de saída para a ligação de Kafka.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Para enviar um lote de eventos, passe uma matriz de cadeia de caracteres para o tipo de saída, conforme mostrado no exemplo a seguir:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
A matriz de cadeia de caracteres é definida como Kevents
propriedade na classe, na qual a ligação de saída é definida:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
A função a seguir adiciona cabeçalhos aos dados de saída de Kafka:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
Para obter um conjunto completo de exemplos .NET de trabalho, consulte o repositório de extensão Kafka.
Nota
Para obter um conjunto equivalente de exemplos de TypeScript, consulte o repositório de extensão Kafka
As propriedades específicas do arquivo function.json dependem do seu provedor de eventos, que nesses exemplos são Confluent ou Hubs de Eventos do Azure. Os exemplos a seguir mostram uma ligação de saída Kafka para uma função que é acionada por uma solicitação HTTP e envia dados da solicitação para o tópico Kafka.
A function.json a seguir define o gatilho para o provedor específico nesses exemplos:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
O código a seguir envia uma mensagem para o tópico:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message);
context.bindings.outputKafkaMessage = message;
context.res = {
// status: 200, /* Defaults to 200 */
body: 'Ok'
};
}
O código a seguir envia várias mensagens como uma matriz para o mesmo tópico:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
context.bindings.outputKafkaMessages = ["one", "two"];
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
O exemplo a seguir mostra como enviar uma mensagem de evento com cabeçalhos para o mesmo tópico Kafka:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = message
? "Message received: " + message + ". The message transfered to the kafka broker."
: "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
Para obter um conjunto completo de exemplos JavaScript de trabalho, consulte o repositório de extensões Kafka.
As propriedades específicas do arquivo function.json dependem do seu provedor de eventos, que nesses exemplos são Confluent ou Hubs de Eventos do Azure. Os exemplos a seguir mostram uma ligação de saída Kafka para uma função que é acionada por uma solicitação HTTP e envia dados da solicitação para o tópico Kafka.
A function.json a seguir define o gatilho para o provedor específico nesses exemplos:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
O código a seguir envia uma mensagem para o tópico:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
O código a seguir envia várias mensagens como uma matriz para o mesmo tópico:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
O exemplo a seguir mostra como enviar uma mensagem de evento com cabeçalhos para o mesmo tópico Kafka:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
Para obter um conjunto completo de exemplos de PowerShell em funcionamento, consulte o repositório de extensão Kafka.
As propriedades específicas do arquivo function.json dependem do seu provedor de eventos, que nesses exemplos são Confluent ou Hubs de Eventos do Azure. Os exemplos a seguir mostram uma ligação de saída Kafka para uma função que é acionada por uma solicitação HTTP e envia dados da solicitação para o tópico Kafka.
A function.json a seguir define o gatilho para o provedor específico nesses exemplos:
{
"scriptFile": "main.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"direction": "out",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
O código a seguir envia uma mensagem para o tópico:
import logging
import azure.functions as func
def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
O código a seguir envia várias mensagens como uma matriz para o mesmo tópico:
import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json
def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
outputMessage.set(['one', 'two'])
return 'OK'
O exemplo a seguir mostra como enviar uma mensagem de evento com cabeçalhos para o mesmo tópico Kafka:
import logging
import azure.functions as func
import json
def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
Para obter um conjunto completo de exemplos Python de trabalho, consulte o repositório de extensão Kafka.
As anotações que você usa para configurar a associação de saída dependem do provedor de eventos específico.
A função a seguir envia uma mensagem para o tópico Kafka.
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
O exemplo a seguir mostra como enviar várias mensagens para um tópico Kafka.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
Neste exemplo, o parâmetro de vinculação de saída é alterado para matriz de cadeia de caracteres.
O último exemplo usa para estas KafkaEntity
e KafkaHeader
classes:
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
A função de exemplo a seguir envia uma mensagem com cabeçalhos para um tópico Kafka.
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
Para obter um conjunto completo de exemplos Java de trabalho para Confluent, consulte o repositório de extensão Kafka.
As bibliotecas C# do processo de trabalho em processo e isoladas usam o Kafka
atributo para definir o gatilho de função.
A tabela a seguir explica as propriedades que você pode definir usando esse atributo:
Parâmetro | Description |
---|---|
Lista de Corretores | (Obrigatório) A lista de corretores Kafka para os quais a saída é enviada. Consulte Conexões para obter mais informações. |
Tópico | (Obrigatório) O tópico para o qual a saída é enviada. |
AvroSchema | (Opcional) Esquema de um registro genérico ao usar o protocolo Avro. |
MaxMessageBytes | (Opcional) O tamanho máximo da mensagem de saída que está sendo enviada (em MB), com um valor padrão de 1 . |
Tamanho do lote | (Opcional) Número máximo de mensagens em lote em um único conjunto de mensagens, com um valor padrão de 10000 . |
CapacitaçãoIdempotência | (Opcional) Quando definido como true , garante que as mensagens sejam produzidas com êxito exatamente uma vez e na ordem de produção original, com um valor padrão de false |
MessageTimeoutMs | (Opcional) O tempo limite da mensagem local, em milissegundos. Esse valor só é imposto localmente e limita o tempo que uma mensagem produzida aguarda a entrega bem-sucedida, com um padrão 300000 . Um tempo de 0 é infinito. Esse valor é o tempo máximo usado para entregar uma mensagem (incluindo tentativas). O erro de entrega ocorre quando a contagem de novas tentativas ou o tempo limite da mensagem são excedidos. |
RequestTimeoutMs | (Opcional) O tempo limite de confirmação da solicitação de saída, em milissegundos, com um padrão de 5000 . |
MaxRetries | (Opcional) O número de vezes que tentar enviar novamente uma mensagem com falha, com um padrão de 2 . Tentar novamente pode causar reordenação, a menos que EnableIdempotence esteja definido como true . |
Modo de autenticação | (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são Gssapi , Plain (padrão), ScramSha256 , ScramSha512 . |
Nome de utilizador | (Opcional) O nome de usuário para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi . Consulte Conexões para obter mais informações. |
Palavra-passe | (Opcional) A senha para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi . Consulte Conexões para obter mais informações. |
Protocolo | (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são plaintext (padrão), ssl , , sasl_plaintext sasl_ssl . |
Localização de SslCa. | (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor. |
SslCertificateLocation | (Opcional) Caminho para o certificado do cliente. |
SslKeyLocalização | (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação. |
SslKeyPassword | (Opcional) Senha para o certificado do cliente. |
A KafkaOutput
anotação permite que você crie uma função que grava em um tópico específico. As opções suportadas incluem os seguintes elementos:
Elemento | Description |
---|---|
Designação | O nome da variável que representa os dados intermediados no código da função. |
Lista de corretores | (Obrigatório) A lista de corretores Kafka para os quais a saída é enviada. Consulte Conexões para obter mais informações. |
topic | (Obrigatório) O tópico para o qual a saída é enviada. |
Tipo de dados | Define como Functions lida com o valor do parâmetro. Por padrão, o valor é obtido como uma string e Functions tenta desserializar a string para o objeto Java plain-old real (POJO). Quando string , a entrada é tratada como apenas uma cadeia de caracteres. Quando binary , a mensagem é recebida como dados binários e o Functions tenta desserializá-la para um byte do tipo parâmetro real[]. |
avroSchema | (Opcional) Esquema de um registro genérico ao usar o protocolo Avro. (Atualmente não suportado para Java.) |
maxMessageBytes | (Opcional) O tamanho máximo da mensagem de saída que está sendo enviada (em MB), com um valor padrão de 1 . |
tamanho do lote | (Opcional) Número máximo de mensagens em lote em um único conjunto de mensagens, com um valor padrão de 10000 . |
enableIdempotence | (Opcional) Quando definido como true , garante que as mensagens sejam produzidas com êxito exatamente uma vez e na ordem de produção original, com um valor padrão de false |
messageTimeoutMs | (Opcional) O tempo limite da mensagem local, em milissegundos. Esse valor só é imposto localmente e limita o tempo que uma mensagem produzida aguarda a entrega bem-sucedida, com um padrão 300000 . Um tempo de 0 é infinito. Este é o tempo máximo usado para entregar uma mensagem (incluindo tentativas). O erro de entrega ocorre quando a contagem de novas tentativas ou o tempo limite da mensagem são excedidos. |
requestTimeoutMs | (Opcional) O tempo limite de confirmação da solicitação de saída, em milissegundos, com um padrão de 5000 . |
maxTentativas | (Opcional) O número de vezes que tentar enviar novamente uma mensagem com falha, com um padrão de 2 . Tentar novamente pode causar reordenação, a menos que EnableIdempotence esteja definido como true . |
authenticationMode | (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são Gssapi , Plain (padrão), ScramSha256 , ScramSha512 . |
nome de utilizador | (Opcional) O nome de usuário para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi . Consulte Conexões para obter mais informações. |
palavra-passe | (Opcional) A senha para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi . Consulte Conexões para obter mais informações. |
protocolo | (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são plaintext (padrão), ssl , , sasl_plaintext sasl_ssl . |
sslCaLocalização | (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor. |
sslCertificateLocation | (Opcional) Caminho para o certificado do cliente. |
sslKeyLocalização | (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação. |
sslKeyPassword | (Opcional) Senha para o certificado do cliente. |
A tabela a seguir explica as propriedades de configuração de associação definidas no arquivo function.json .
function.json propriedade | Description |
---|---|
type | Deve ser definido como kafka . |
direção | Deve ser definido como out . |
Designação | O nome da variável que representa os dados intermediados no código da função. |
Lista de corretores | (Obrigatório) A lista de corretores Kafka para os quais a saída é enviada. Consulte Conexões para obter mais informações. |
topic | (Obrigatório) O tópico para o qual a saída é enviada. |
avroSchema | (Opcional) Esquema de um registro genérico ao usar o protocolo Avro. |
maxMessageBytes | (Opcional) O tamanho máximo da mensagem de saída que está sendo enviada (em MB), com um valor padrão de 1 . |
tamanho do lote | (Opcional) Número máximo de mensagens em lote em um único conjunto de mensagens, com um valor padrão de 10000 . |
enableIdempotence | (Opcional) Quando definido como true , garante que as mensagens sejam produzidas com êxito exatamente uma vez e na ordem de produção original, com um valor padrão de false |
messageTimeoutMs | (Opcional) O tempo limite da mensagem local, em milissegundos. Esse valor só é imposto localmente e limita o tempo que uma mensagem produzida aguarda a entrega bem-sucedida, com um padrão 300000 . Um tempo de 0 é infinito. Este é o tempo máximo usado para entregar uma mensagem (incluindo tentativas). O erro de entrega ocorre quando a contagem de novas tentativas ou o tempo limite da mensagem são excedidos. |
requestTimeoutMs | (Opcional) O tempo limite de confirmação da solicitação de saída, em milissegundos, com um padrão de 5000 . |
maxTentativas | (Opcional) O número de vezes que tentar enviar novamente uma mensagem com falha, com um padrão de 2 . Tentar novamente pode causar reordenação, a menos que EnableIdempotence esteja definido como true . |
authenticationMode | (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são Gssapi , Plain (padrão), ScramSha256 , ScramSha512 . |
nome de utilizador | (Opcional) O nome de usuário para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi . Consulte Conexões para obter mais informações. |
palavra-passe | (Opcional) A senha para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi . Consulte Conexões para obter mais informações. |
protocolo | (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são plaintext (padrão), ssl , , sasl_plaintext sasl_ssl . |
sslCaLocalização | (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor. |
sslCertificateLocation | (Opcional) Caminho para o certificado do cliente. |
sslKeyLocalização | (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação. |
sslKeyPassword | (Opcional) Senha para o certificado do cliente. |
O deslocamento, a partição e o carimbo de data/hora do evento são gerados em tempo de execução. Apenas o valor e os cabeçalhos podem ser definidos dentro da função. O tópico é definido no function.json.
Por favor, certifique-se de ter acesso ao tópico Kafka para o qual você está tentando escrever. Você configura a associação com credenciais de acesso e conexão para o tópico Kafka.
Em um plano Premium, você deve habilitar o monitoramento de escala de tempo de execução para que a saída Kafka possa ser dimensionada para várias instâncias. Para saber mais, consulte Habilitar o dimensionamento em tempo de execução.
Para obter um conjunto completo de configurações de host.json suportadas para o gatilho Kafka, consulte host.json configurações.
Todas as informações de conexão exigidas por seus gatilhos e ligações devem ser mantidas nas configurações do aplicativo e não nas definições de vinculação em seu código. Isso é verdade para credenciais, que nunca devem ser armazenadas em seu código.
Importante
As configurações de credenciais devem fazer referência a uma configuração de aplicativo. Não codifice credenciais em seu código ou arquivos de configuração. Ao executar localmente, use o arquivo local.settings.json para suas credenciais e não publique o arquivo local.settings.json.
Ao conectar-se a um cluster Kafka gerenciado fornecido pelo Confluent no Azure, certifique-se de que as seguintes credenciais de autenticação para seu ambiente Confluent Cloud estejam definidas em seu gatilho ou associação:
Definição | Valor recomendado | Description |
---|---|---|
Lista de Corretores | BootstrapServer |
A configuração do aplicativo nomeada BootstrapServer contém o valor do servidor de inicialização encontrado na página Configurações do Confluent Cloud. O valor é semelhante a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 . |
Nome de utilizador | ConfluentCloudUsername |
A configuração do aplicativo nomeada ConfluentCloudUsername contém a chave de acesso da API do site Confluent Cloud. |
Palavra-passe | ConfluentCloudPassword |
A configuração do aplicativo nomeada ConfluentCloudPassword contém o segredo da API obtido do site Confluent Cloud. |
Os valores de cadeia de caracteres que você usa para essas configurações devem estar presentes como configurações de Values
aplicativo no Azure ou na coleção no arquivo local.settings.json durante o desenvolvimento local.
Você também deve definir o Protocol
, AuthenticationMode
e SslCaLocation
em suas definições de vinculação.