Compartir vía


Enlace de salida de Apache Kafka para Azure Functions

El enlace de salida permite que una aplicación de Azure Functions escriba mensajes en un tema de Kafka.

Importante

Los enlaces de Kafka solo están disponibles para las instancias de Functions del plan elástico prémium y del plan dedicado (App Service). Solo se admiten en la versión 3.x y posteriores del entorno de ejecución de Functions.

Ejemplo

El uso del enlace depende de la modalidad de C# que se usa en la aplicación de funciones, que puede ser una de las siguientes:

Una función de C# compilada de la biblioteca de clases de procesos de trabajo aislados se ejecuta en un proceso aislado del entorno de ejecución.

Los atributos que use dependen del proveedor de eventos específico.

El ejemplo siguiente tiene un tipo de valor devuelto personalizado que es MultipleOutputType, que consta de una respuesta HTTP y una salida de Kafka.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

En la clase MultipleOutputType, Kevent es la variable de enlace de salida para el enlace de Kafka.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Para enviar un lote de eventos, pase una matriz de cadenas al tipo de salida, tal como se muestra en el ejemplo siguiente:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

La matriz de cadenas se define como la propiedad Kevents en la clase en la que se define el enlace de salida:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

La siguiente función agrega encabezados a los datos de salida de Kafka:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Para obtener un conjunto completo de ejemplos de .NET en funcionamiento, consulte el repositorio de extensiones de Kafka.

Nota:

Para obtener un conjunto equivalente de ejemplos de TypeScript, consulte el repositorio de extensiones de Kafka.

Las propiedades específicas del archivo function.json dependen del proveedor de eventos, que en estos ejemplos son Confluent o Azure Event Hubs. Los ejemplos siguientes muestran un enlace de salida de Kafka para una función que desencadena una solicitud HTTP y envía datos de la solicitud al tema de Kafka.

El siguiente archivo function.json define el desencadenador para el proveedor específico en estos ejemplos:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

A continuación, el código siguiente envía un mensaje al tema:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

El código siguiente envía varios mensajes como una matriz al mismo tema:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

En el ejemplo siguiente se muestra cómo enviar un mensaje de evento con encabezados al mismo tema de Kafka:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Para obtener un conjunto completo de ejemplos de JavaScript en funcionamiento, consulte el repositorio de extensiones de Kafka.

Las propiedades específicas del archivo function.json dependen del proveedor de eventos, que en estos ejemplos son Confluent o Azure Event Hubs. Los ejemplos siguientes muestran un enlace de salida de Kafka para una función que desencadena una solicitud HTTP y envía datos de la solicitud al tema de Kafka.

El siguiente archivo function.json define el desencadenador para el proveedor específico en estos ejemplos:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

A continuación, el código siguiente envía un mensaje al tema:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

El código siguiente envía varios mensajes como una matriz al mismo tema:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

En el ejemplo siguiente se muestra cómo enviar un mensaje de evento con encabezados al mismo tema de Kafka:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Para obtener un conjunto completo de ejemplos de PowerShell en funcionamiento, consulte el repositorio de extensiones de Kafka.

Las propiedades específicas del archivo function.json dependen del proveedor de eventos, que en estos ejemplos son Confluent o Azure Event Hubs. Los ejemplos siguientes muestran un enlace de salida de Kafka para una función que desencadena una solicitud HTTP y envía datos de la solicitud al tema de Kafka.

El siguiente archivo function.json define el desencadenador para el proveedor específico en estos ejemplos:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

A continuación, el código siguiente envía un mensaje al tema:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

El código siguiente envía varios mensajes como una matriz al mismo tema:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

En el ejemplo siguiente se muestra cómo enviar un mensaje de evento con encabezados al mismo tema de Kafka:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

Para obtener un conjunto completo de ejemplos de Python en funcionamiento, consulte el repositorio de extensiones de Kafka.

Las anotaciones que se usan para configurar el enlace de salida dependen del proveedor de eventos específico.

La siguiente función envía un mensaje al tema de Kafka.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

En el ejemplo siguiente se muestra cómo enviar varios mensajes a un tema de Kafka.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

En este ejemplo, el parámetro de enlace de salida se cambia a la matriz de cadenas.

En el último ejemplo se usa para las clases KafkaEntity y KafkaHeader:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

La siguiente función de ejemplo envía un mensaje con encabezados a un tema de Kafka.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Para obtener un conjunto completo de ejemplos de Java en funcionamiento para Confluent, consulte el repositorio de extensiones de Kafka.

Atributos

Las bibliotecas de C# en proceso y de proceso de trabajo aislado usan el atributo Kafka para definir el desencadenador de la función.

En la tabla siguiente se explican las propiedades que se pueden establecer mediante el atributo:

Parámetro Descripción
BrokerList (Obligatorio) Lista de agentes de Kafka a los que se envía la salida. Para obtener más información, consulte Conexiones.
Tema. (Obligatorio) El tema al que se envía la salida.
AvroSchema (Opcional) Esquema de un registro genérico al usar el protocolo Avro.
MaxMessageBytes (Opcional) Tamaño máximo del mensaje de salida que se envía (en MB), con un valor predeterminado de 1.
BatchSize (Opcional) Número máximo de mensajes por lotes en un único conjunto de mensajes, con un valor predeterminado de 10000.
EnableIdempotence (Opcional) Cuando se establece en true, garantiza que los mensajes se generan correctamente una vez y en el orden de producción original, con un valor predeterminado de false.
MessageTimeoutMs (Opcional) Tiempo de espera del mensaje local, en milisegundos. Este valor solo se aplica localmente y limita el tiempo en que un mensaje generado espera una entrega correcta, con un valor predeterminado 300000. Un tiempo de 0 es infinito. Este valor es el tiempo máximo que se usa para entregar un mensaje (incluidos los reintentos). El error de entrega se produce cuando se supera el número de reintentos o el tiempo de espera del mensaje.
RequestTimeoutMs (Opcional) Tiempo de espera de confirmación de la solicitud de salida, en milisegundos, con un valor predeterminado de 5000.
MaxRetries (Opcional) Número de veces que se va a reintentar el envío de un mensaje con error, con un valor predeterminado de 2. El reintento puede provocar el reordenamiento, a menos que EnableIdempotence esté establecido en true.
AuthenticationMode (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son Gssapi, Plain (predeterminado), ScramSha256 y ScramSha512.
Nombre de usuario (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información.
Contraseña (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información.
Protocolo (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son plaintext (predeterminado), ssl, sasl_plaintext y sasl_ssl.
SslCaLocation (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente.
SslCertificateLocation (Opcional) Ruta de acceso al certificado del cliente.
SslKeyLocation (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación.
SslKeyPassword (Opcional) Contraseña del certificado del cliente.

anotaciones

La anotación KafkaOutput le permite crear una función que escribe en un tema específico. Entre las opciones admitidas se incluyen los siguientes elementos:

Elemento Descripción
name Nombre de la variable que representa los datos del agente en el código de la función.
brokerList (Obligatorio) Lista de agentes de Kafka a los que se envía la salida. Para obtener más información, consulte Conexiones.
topic (Obligatorio) El tema al que se envía la salida.
dataType Define la manera en que Functions controla el valor del parámetro. De forma predeterminada, el valor se obtiene como una cadena y Functions intenta deserializarla en el objeto de Java antiguo sin formato (POJO) real. Cuando es string, la entrada se trata como una cadena. Cuando es binary, el mensaje se recibe como datos binarios y Functions intenta deserializarlo en bytes de tipo de parámetro real[].
avroSchema (Opcional) Esquema de un registro genérico al usar el protocolo de Avro. (Actualmente no se admite para Java).
maxMessageBytes (Opcional) Tamaño máximo del mensaje de salida que se envía (en MB), con un valor predeterminado de 1.
batchSize (Opcional) Número máximo de mensajes por lotes en un único conjunto de mensajes, con un valor predeterminado de 10000.
enableIdempotence (Opcional) Cuando se establece en true, garantiza que los mensajes se generan correctamente una vez y en el orden de producción original, con un valor predeterminado de false.
messageTimeoutMs (Opcional) Tiempo de espera del mensaje local, en milisegundos. Este valor solo se aplica localmente y limita el tiempo en que un mensaje generado espera una entrega correcta, con un valor predeterminado 300000. Un tiempo de 0 es infinito. Este valor es el tiempo máximo que se usa para entregar un mensaje (incluidos los reintentos). El error de entrega se produce cuando se supera el número de reintentos o el tiempo de espera del mensaje.
requestTimeoutMs (Opcional) Tiempo de espera de confirmación de la solicitud de salida, en milisegundos, con un valor predeterminado de 5000.
maxRetries (Opcional) Número de veces que se va a reintentar el envío de un mensaje con error, con un valor predeterminado de 2. El reintento puede provocar el reordenamiento, a menos que EnableIdempotence esté establecido en true.
authenticationMode (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son Gssapi, Plain (predeterminado), ScramSha256 y ScramSha512.
username (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información.
password (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información.
protocolo (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son plaintext (predeterminado), ssl, sasl_plaintext y sasl_ssl.
sslCaLocation (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente.
sslCertificateLocation (Opcional) Ruta de acceso al certificado del cliente.
sslKeyLocation (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación.
sslKeyPassword (Opcional) Contraseña del certificado del cliente.

Configuración

En la siguiente tabla se explican las propiedades de configuración de enlace que se establecen en el archivo function.json.

Propiedad de function.json Descripción
type Se debe establecer en kafka.
direction Se debe establecer en out.
name Nombre de la variable que representa los datos del agente en el código de la función.
brokerList (Obligatorio) Lista de agentes de Kafka a los que se envía la salida. Para obtener más información, consulte Conexiones.
topic (Obligatorio) El tema al que se envía la salida.
avroSchema (Opcional) Esquema de un registro genérico al usar el protocolo Avro.
maxMessageBytes (Opcional) Tamaño máximo del mensaje de salida que se envía (en MB), con un valor predeterminado de 1.
batchSize (Opcional) Número máximo de mensajes por lotes en un único conjunto de mensajes, con un valor predeterminado de 10000.
enableIdempotence (Opcional) Cuando se establece en true, garantiza que los mensajes se generan correctamente una vez y en el orden de producción original, con un valor predeterminado de false.
messageTimeoutMs (Opcional) Tiempo de espera del mensaje local, en milisegundos. Este valor solo se aplica localmente y limita el tiempo en que un mensaje generado espera una entrega correcta, con un valor predeterminado 300000. Un tiempo de 0 es infinito. Este valor es el tiempo máximo que se usa para entregar un mensaje (incluidos los reintentos). El error de entrega se produce cuando se supera el número de reintentos o el tiempo de espera del mensaje.
requestTimeoutMs (Opcional) Tiempo de espera de confirmación de la solicitud de salida, en milisegundos, con un valor predeterminado de 5000.
maxRetries (Opcional) Número de veces que se va a reintentar el envío de un mensaje con error, con un valor predeterminado de 2. El reintento puede provocar el reordenamiento, a menos que EnableIdempotence esté establecido en true.
authenticationMode (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son Gssapi, Plain (predeterminado), ScramSha256 y ScramSha512.
username (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información.
password (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información.
protocolo (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son plaintext (predeterminado), ssl, sasl_plaintext y sasl_ssl.
sslCaLocation (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente.
sslCertificateLocation (Opcional) Ruta de acceso al certificado del cliente.
sslKeyLocation (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación.
sslKeyPassword (Opcional) Contraseña del certificado del cliente.

Uso

Tanto las claves como los tipos de valores se admiten con la serialización integrada de Avro y Protobuf.

El desplazamiento, la partición y la marca de tiempo del evento se generan en tiempo de ejecución. Solo se pueden establecer valores y encabezados dentro de la función. El tema se establece en function.json.

Asegúrese de tener acceso al tema de Kafka en el que está intentando escribir. Tiene que configurar el enlace con credenciales de acceso y conexión al tema de Kafka.

En un plan Premium, debe habilitar la supervisión del escalado en tiempo de ejecución para que la salida de Kafka pueda escalarse horizontalmente en varias instancias. Para obtener más información, consulte Habilitación del escalado en tiempo de ejecución.

Para obtener un conjunto completo de configuraciones de host.json admitidas en el desencadenador de Kafka, consulte la configuración de host.json.

Conexiones

Toda la información de conexión que necesiten los desencadenadores y enlaces debe mantenerse en la configuración de la aplicación, y no en las definiciones de enlace del código. Lo mismo se aplica las credenciales, que nunca deben almacenarse en el código.

Importante

La configuración de credenciales debe hacer referencia a una configuración de aplicación. No codifique las credenciales de forma rígida en el código ni en los archivos de configuración. Cuando realice la ejecución de forma local, use el archivo local.settings.json para las credenciales y no publique el archivo local.settings.json.

Al conectarse a un clúster de Kafka administrado que haya proporcionado Confluent en Azure, asegúrese de que las siguientes credenciales de autenticación del entorno de Confluent Cloud están establecidas en el desencadenador o enlace:

Setting Valor recomendado Descripción
BrokerList BootstrapServer La configuración de la aplicación denominada BootstrapServer contiene el valor del servidor de arranque que se encuentra en la página de configuración de Confluent Cloud. El valor es similar a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Nombre de usuario ConfluentCloudUsername La configuración de la aplicación denominada ConfluentCloudUsername contiene la clave de acceso de API del sitio web de Confluent Cloud.
Contraseña ConfluentCloudPassword La configuración de la aplicación denominada ConfluentCloudPassword contiene el secreto de API obtenido en el sitio web de Confluent Cloud.

Los valores de la cadena que use en esta configuración deben estar presentes como la configuración de la aplicación en Azure o en la colección Values en el archivo local.settings.json durante el desarrollo local.

También debe establecer Protocol, AuthenticationMode y SslCaLocation en las definiciones de enlace.

Pasos siguientes