Enlace de salida de Apache Kafka para Azure Functions
El enlace de salida permite que una aplicación de Azure Functions escriba mensajes en un tema de Kafka.
Importante
Los enlaces de Kafka solo están disponibles para las instancias de Functions del plan elástico prémium y del plan dedicado (App Service). Solo se admiten en la versión 3.x y posteriores del entorno de ejecución de Functions.
Ejemplo
El uso del enlace depende de la modalidad de C# que se usa en la aplicación de funciones, que puede ser una de las siguientes:
Una función de C# compilada de la biblioteca de clases de procesos de trabajo aislados se ejecuta en un proceso aislado del entorno de ejecución.
Los atributos que use dependen del proveedor de eventos específico.
El ejemplo siguiente tiene un tipo de valor devuelto personalizado que es MultipleOutputType
, que consta de una respuesta HTTP y una salida de Kafka.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
En la clase MultipleOutputType
, Kevent
es la variable de enlace de salida para el enlace de Kafka.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Para enviar un lote de eventos, pase una matriz de cadenas al tipo de salida, tal como se muestra en el ejemplo siguiente:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
La matriz de cadenas se define como la propiedad Kevents
en la clase en la que se define el enlace de salida:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
La siguiente función agrega encabezados a los datos de salida de Kafka:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
Para obtener un conjunto completo de ejemplos de .NET en funcionamiento, consulte el repositorio de extensiones de Kafka.
Nota:
Para obtener un conjunto equivalente de ejemplos de TypeScript, consulte el repositorio de extensiones de Kafka.
Las propiedades específicas del archivo function.json dependen del proveedor de eventos, que en estos ejemplos son Confluent o Azure Event Hubs. Los ejemplos siguientes muestran un enlace de salida de Kafka para una función que desencadena una solicitud HTTP y envía datos de la solicitud al tema de Kafka.
El siguiente archivo function.json define el desencadenador para el proveedor específico en estos ejemplos:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
A continuación, el código siguiente envía un mensaje al tema:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message);
context.bindings.outputKafkaMessage = message;
context.res = {
// status: 200, /* Defaults to 200 */
body: 'Ok'
};
}
El código siguiente envía varios mensajes como una matriz al mismo tema:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
context.bindings.outputKafkaMessages = ["one", "two"];
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
En el ejemplo siguiente se muestra cómo enviar un mensaje de evento con encabezados al mismo tema de Kafka:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = message
? "Message received: " + message + ". The message transfered to the kafka broker."
: "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
Para obtener un conjunto completo de ejemplos de JavaScript en funcionamiento, consulte el repositorio de extensiones de Kafka.
Las propiedades específicas del archivo function.json dependen del proveedor de eventos, que en estos ejemplos son Confluent o Azure Event Hubs. Los ejemplos siguientes muestran un enlace de salida de Kafka para una función que desencadena una solicitud HTTP y envía datos de la solicitud al tema de Kafka.
El siguiente archivo function.json define el desencadenador para el proveedor específico en estos ejemplos:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
A continuación, el código siguiente envía un mensaje al tema:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
El código siguiente envía varios mensajes como una matriz al mismo tema:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
En el ejemplo siguiente se muestra cómo enviar un mensaje de evento con encabezados al mismo tema de Kafka:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
Para obtener un conjunto completo de ejemplos de PowerShell en funcionamiento, consulte el repositorio de extensiones de Kafka.
Las propiedades específicas del archivo function.json dependen del proveedor de eventos, que en estos ejemplos son Confluent o Azure Event Hubs. Los ejemplos siguientes muestran un enlace de salida de Kafka para una función que desencadena una solicitud HTTP y envía datos de la solicitud al tema de Kafka.
El siguiente archivo function.json define el desencadenador para el proveedor específico en estos ejemplos:
{
"scriptFile": "main.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"direction": "out",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
A continuación, el código siguiente envía un mensaje al tema:
import logging
import azure.functions as func
def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
El código siguiente envía varios mensajes como una matriz al mismo tema:
import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json
def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
outputMessage.set(['one', 'two'])
return 'OK'
En el ejemplo siguiente se muestra cómo enviar un mensaje de evento con encabezados al mismo tema de Kafka:
import logging
import azure.functions as func
import json
def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
Para obtener un conjunto completo de ejemplos de Python en funcionamiento, consulte el repositorio de extensiones de Kafka.
Las anotaciones que se usan para configurar el enlace de salida dependen del proveedor de eventos específico.
La siguiente función envía un mensaje al tema de Kafka.
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
En el ejemplo siguiente se muestra cómo enviar varios mensajes a un tema de Kafka.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
En este ejemplo, el parámetro de enlace de salida se cambia a la matriz de cadenas.
En el último ejemplo se usa para las clases KafkaEntity
y KafkaHeader
:
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
La siguiente función de ejemplo envía un mensaje con encabezados a un tema de Kafka.
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
Para obtener un conjunto completo de ejemplos de Java en funcionamiento para Confluent, consulte el repositorio de extensiones de Kafka.
Atributos
Las bibliotecas de C# en proceso y de proceso de trabajo aislado usan el atributo Kafka
para definir el desencadenador de la función.
En la tabla siguiente se explican las propiedades que se pueden establecer mediante el atributo:
Parámetro | Descripción |
---|---|
BrokerList | (Obligatorio) Lista de agentes de Kafka a los que se envía la salida. Para obtener más información, consulte Conexiones. |
Tema. | (Obligatorio) El tema al que se envía la salida. |
AvroSchema | (Opcional) Esquema de un registro genérico al usar el protocolo Avro. |
MaxMessageBytes | (Opcional) Tamaño máximo del mensaje de salida que se envía (en MB), con un valor predeterminado de 1 . |
BatchSize | (Opcional) Número máximo de mensajes por lotes en un único conjunto de mensajes, con un valor predeterminado de 10000 . |
EnableIdempotence | (Opcional) Cuando se establece en true , garantiza que los mensajes se generan correctamente una vez y en el orden de producción original, con un valor predeterminado de false . |
MessageTimeoutMs | (Opcional) Tiempo de espera del mensaje local, en milisegundos. Este valor solo se aplica localmente y limita el tiempo en que un mensaje generado espera una entrega correcta, con un valor predeterminado 300000 . Un tiempo de 0 es infinito. Este valor es el tiempo máximo que se usa para entregar un mensaje (incluidos los reintentos). El error de entrega se produce cuando se supera el número de reintentos o el tiempo de espera del mensaje. |
RequestTimeoutMs | (Opcional) Tiempo de espera de confirmación de la solicitud de salida, en milisegundos, con un valor predeterminado de 5000 . |
MaxRetries | (Opcional) Número de veces que se va a reintentar el envío de un mensaje con error, con un valor predeterminado de 2 . El reintento puede provocar el reordenamiento, a menos que EnableIdempotence esté establecido en true . |
AuthenticationMode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son Gssapi , Plain (predeterminado), ScramSha256 y ScramSha512 . |
Nombre de usuario | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi . Consulte Conexiones para obtener más información. |
Contraseña | (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi . Consulte Conexiones para obtener más información. |
Protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son plaintext (predeterminado), ssl , sasl_plaintext y sasl_ssl . |
SslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
SslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
SslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
SslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
anotaciones
La anotación KafkaOutput
le permite crear una función que escribe en un tema específico. Entre las opciones admitidas se incluyen los siguientes elementos:
Elemento | Descripción |
---|---|
name | Nombre de la variable que representa los datos del agente en el código de la función. |
brokerList | (Obligatorio) Lista de agentes de Kafka a los que se envía la salida. Para obtener más información, consulte Conexiones. |
topic | (Obligatorio) El tema al que se envía la salida. |
dataType | Define la manera en que Functions controla el valor del parámetro. De forma predeterminada, el valor se obtiene como una cadena y Functions intenta deserializarla en el objeto de Java antiguo sin formato (POJO) real. Cuando es string , la entrada se trata como una cadena. Cuando es binary , el mensaje se recibe como datos binarios y Functions intenta deserializarlo en bytes de tipo de parámetro real[]. |
avroSchema | (Opcional) Esquema de un registro genérico al usar el protocolo de Avro. (Actualmente no se admite para Java). |
maxMessageBytes | (Opcional) Tamaño máximo del mensaje de salida que se envía (en MB), con un valor predeterminado de 1 . |
batchSize | (Opcional) Número máximo de mensajes por lotes en un único conjunto de mensajes, con un valor predeterminado de 10000 . |
enableIdempotence | (Opcional) Cuando se establece en true , garantiza que los mensajes se generan correctamente una vez y en el orden de producción original, con un valor predeterminado de false . |
messageTimeoutMs | (Opcional) Tiempo de espera del mensaje local, en milisegundos. Este valor solo se aplica localmente y limita el tiempo en que un mensaje generado espera una entrega correcta, con un valor predeterminado 300000 . Un tiempo de 0 es infinito. Este valor es el tiempo máximo que se usa para entregar un mensaje (incluidos los reintentos). El error de entrega se produce cuando se supera el número de reintentos o el tiempo de espera del mensaje. |
requestTimeoutMs | (Opcional) Tiempo de espera de confirmación de la solicitud de salida, en milisegundos, con un valor predeterminado de 5000 . |
maxRetries | (Opcional) Número de veces que se va a reintentar el envío de un mensaje con error, con un valor predeterminado de 2 . El reintento puede provocar el reordenamiento, a menos que EnableIdempotence esté establecido en true . |
authenticationMode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son Gssapi , Plain (predeterminado), ScramSha256 y ScramSha512 . |
username | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi . Consulte Conexiones para obtener más información. |
password | (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi . Consulte Conexiones para obtener más información. |
protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son plaintext (predeterminado), ssl , sasl_plaintext y sasl_ssl . |
sslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
sslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
sslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
sslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
Configuración
En la siguiente tabla se explican las propiedades de configuración de enlace que se establecen en el archivo function.json.
Propiedad de function.json | Descripción |
---|---|
type | Se debe establecer en kafka . |
direction | Se debe establecer en out . |
name | Nombre de la variable que representa los datos del agente en el código de la función. |
brokerList | (Obligatorio) Lista de agentes de Kafka a los que se envía la salida. Para obtener más información, consulte Conexiones. |
topic | (Obligatorio) El tema al que se envía la salida. |
avroSchema | (Opcional) Esquema de un registro genérico al usar el protocolo Avro. |
maxMessageBytes | (Opcional) Tamaño máximo del mensaje de salida que se envía (en MB), con un valor predeterminado de 1 . |
batchSize | (Opcional) Número máximo de mensajes por lotes en un único conjunto de mensajes, con un valor predeterminado de 10000 . |
enableIdempotence | (Opcional) Cuando se establece en true , garantiza que los mensajes se generan correctamente una vez y en el orden de producción original, con un valor predeterminado de false . |
messageTimeoutMs | (Opcional) Tiempo de espera del mensaje local, en milisegundos. Este valor solo se aplica localmente y limita el tiempo en que un mensaje generado espera una entrega correcta, con un valor predeterminado 300000 . Un tiempo de 0 es infinito. Este valor es el tiempo máximo que se usa para entregar un mensaje (incluidos los reintentos). El error de entrega se produce cuando se supera el número de reintentos o el tiempo de espera del mensaje. |
requestTimeoutMs | (Opcional) Tiempo de espera de confirmación de la solicitud de salida, en milisegundos, con un valor predeterminado de 5000 . |
maxRetries | (Opcional) Número de veces que se va a reintentar el envío de un mensaje con error, con un valor predeterminado de 2 . El reintento puede provocar el reordenamiento, a menos que EnableIdempotence esté establecido en true . |
authenticationMode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son Gssapi , Plain (predeterminado), ScramSha256 y ScramSha512 . |
username | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi . Consulte Conexiones para obtener más información. |
password | (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi . Consulte Conexiones para obtener más información. |
protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son plaintext (predeterminado), ssl , sasl_plaintext y sasl_ssl . |
sslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
sslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
sslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
sslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
Uso
Tanto las claves como los tipos de valores se admiten con la serialización integrada de Avro y Protobuf.
El desplazamiento, la partición y la marca de tiempo del evento se generan en tiempo de ejecución. Solo se pueden establecer valores y encabezados dentro de la función. El tema se establece en function.json.
Asegúrese de tener acceso al tema de Kafka en el que está intentando escribir. Tiene que configurar el enlace con credenciales de acceso y conexión al tema de Kafka.
En un plan Premium, debe habilitar la supervisión del escalado en tiempo de ejecución para que la salida de Kafka pueda escalarse horizontalmente en varias instancias. Para obtener más información, consulte Habilitación del escalado en tiempo de ejecución.
Para obtener un conjunto completo de configuraciones de host.json admitidas en el desencadenador de Kafka, consulte la configuración de host.json.
Conexiones
Toda la información de conexión que necesiten los desencadenadores y enlaces debe mantenerse en la configuración de la aplicación, y no en las definiciones de enlace del código. Lo mismo se aplica las credenciales, que nunca deben almacenarse en el código.
Importante
La configuración de credenciales debe hacer referencia a una configuración de aplicación. No codifique las credenciales de forma rígida en el código ni en los archivos de configuración. Cuando realice la ejecución de forma local, use el archivo local.settings.json para las credenciales y no publique el archivo local.settings.json.
Al conectarse a un clúster de Kafka administrado que haya proporcionado Confluent en Azure, asegúrese de que las siguientes credenciales de autenticación del entorno de Confluent Cloud están establecidas en el desencadenador o enlace:
Setting | Valor recomendado | Descripción |
---|---|---|
BrokerList | BootstrapServer |
La configuración de la aplicación denominada BootstrapServer contiene el valor del servidor de arranque que se encuentra en la página de configuración de Confluent Cloud. El valor es similar a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 . |
Nombre de usuario | ConfluentCloudUsername |
La configuración de la aplicación denominada ConfluentCloudUsername contiene la clave de acceso de API del sitio web de Confluent Cloud. |
Contraseña | ConfluentCloudPassword |
La configuración de la aplicación denominada ConfluentCloudPassword contiene el secreto de API obtenido en el sitio web de Confluent Cloud. |
Los valores de la cadena que use en esta configuración deben estar presentes como la configuración de la aplicación en Azure o en la colección Values
en el archivo local.settings.json durante el desarrollo local.
También debe establecer Protocol
, AuthenticationMode
y SslCaLocation
en las definiciones de enlace.