Desencadenador de Apache Kafka para Azure Functions
Puede usar el desencadenador de Apache Kafka en Azure Functions para ejecutar el código de función en respuesta a mensajes en temas de Kafka. También puede usar un enlace de salida de Kafka para escribir desde la función en un tema. Para obtener información sobre los detalles de instalación y configuración, consulte Información general de los enlaces de Apache Kafka para Azure Functions.
Importante
Los enlaces de Kafka solo están disponibles para las instancias de Functions del plan elástico prémium y del plan dedicado (App Service). Solo se admiten en la versión 3.x y posteriores del entorno de ejecución de Functions.
Ejemplo
El uso del desencadenador depende de la modalidad de C# que se usa en la aplicación de funciones, que puede ser uno de los modos siguientes:
Una función de C# compilada de la biblioteca de clases de procesos de trabajo aislados se ejecuta en un proceso aislado del entorno de ejecución.
Los atributos que use dependen del proveedor de eventos específico.
En el ejemplo siguiente se muestra una función de C# que lee y registra el mensaje de Kafka como un evento de Kafka:
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
Para recibir eventos en un lote, use una matriz de cadenas como entrada, como se muestra en el ejemplo siguiente:
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
La función siguiente registra el mensaje y los encabezados del evento de Kafka:
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
Para obtener un conjunto completo de ejemplos de .NET en funcionamiento, consulte el repositorio de extensiones de Kafka.
Nota:
Para obtener un conjunto equivalente de ejemplos de TypeScript, consulte el repositorio de extensiones de Kafka.
Las propiedades específicas del archivo function.json dependen del proveedor de eventos, que en estos ejemplos es Confluent o Azure Event Hubs. En los ejemplos siguientes se muestra un desencadenador de Kafka para una función que lee y registra un mensaje de Kafka.
El siguiente archivo function.json define el desencadenador para el proveedor específico:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "saslSsl",
"authenticationMode": "plain",
"consumerGroup" : "$Default",
"dataType": "string"
}
]
}
Después, se ejecuta el código siguiente cuando se desencadena la función:
module.exports = async function (context, event) {
// context.log.info(event)
context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};
Para recibir eventos en un lote, establezca el valor de cardinality
en many
en el archivo function.json, como se muestra en los ejemplos siguientes:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%"
}
]
}
Después, el código siguiente analiza la matriz de eventos y registra los datos del evento:
module.exports = async function (context, events) {
function print(event) {
var eventJson = JSON.parse(event)
context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
}
events.map(print);
};
El código siguiente también registra los datos del encabezado:
module.exports = async function (context, event) {
function print(kevent) {
var keventJson = JSON.parse(kevent)
context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
context.log.info(`Headers for this message:`)
let headers = keventJson.Headers;
headers.forEach(element => {
context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`)
});
}
event.map(print);
};
Puede definir un esquema de Avro genérico para el evento que se pasa al desencadenador. El siguiente archivo function.json define el desencadenador para el proveedor específico con un esquema de Avro genérico:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaAvroGenericSingle",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
Después, se ejecuta el código siguiente cuando se desencadena la función:
module.exports = async function (context, event) {
context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};
Para obtener un conjunto completo de ejemplos de JavaScript en funcionamiento, consulte el repositorio de extensiones de Kafka.
Las propiedades específicas del archivo function.json dependen del proveedor de eventos, que en estos ejemplos es Confluent o Azure Event Hubs. En los ejemplos siguientes se muestra un desencadenador de Kafka para una función que lee y registra un mensaje de Kafka.
El siguiente archivo function.json define el desencadenador para el proveedor específico:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
Después, se ejecuta el código siguiente cuando se desencadena la función:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Para recibir eventos en un lote, establezca el valor de cardinality
en many
en el archivo function.json, como se muestra en los ejemplos siguientes:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
Después, el código siguiente analiza la matriz de eventos y registra los datos del evento:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
$event = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $event.Value"
}
El código siguiente también registra los datos del encabezado:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
foreach ($kafkaEvent in $kafkaEvents) {
$kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
Write-Output "Headers for this message:"
foreach ($header in $kevent.Headers) {
$DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
$Key = $header.Key
Write-Output "Key: $Key Value: $DecodedValue"
}
}
Puede definir un esquema de Avro genérico para el evento que se pasa al desencadenador. El siguiente archivo function.json define el desencadenador para el proveedor específico con un esquema de Avro genérico:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvent",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
Después, se ejecuta el código siguiente cuando se desencadena la función:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Para obtener un conjunto completo de ejemplos de PowerShell en funcionamiento, consulte el repositorio de extensiones de Kafka.
Las propiedades específicas del archivo function.json dependen del proveedor de eventos, que en estos ejemplos es Confluent o Azure Event Hubs. En los ejemplos siguientes se muestra un desencadenador de Kafka para una función que lee y registra un mensaje de Kafka.
El siguiente archivo function.json define el desencadenador para el proveedor específico:
{
"scriptFile": "main.py",
"bindings": [
{
"type": "kafkaTrigger",
"name": "kevent",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"consumerGroup" : "functions",
"protocol": "saslSsl",
"authenticationMode": "plain"
}
]
}
Después, se ejecuta el código siguiente cuando se desencadena la función:
import logging
from azure.functions import KafkaEvent
def main(kevent : KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
Para recibir eventos en un lote, establezca el valor de cardinality
en many
en el archivo function.json, como se muestra en los ejemplos siguientes:
{
"scriptFile": "main.py",
"bindings": [
{
"type" : "kafkaTrigger",
"direction": "in",
"name" : "kevents",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"topic" : "message_python",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"dataType": "string",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"BrokerList" : "%BrokerList%"
}
]
}
Después, el código siguiente analiza la matriz de eventos y registra los datos del evento:
import logging
import typing
from azure.functions import KafkaEvent
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
logging.info(event.get_body())
El código siguiente también registra los datos del encabezado:
import logging
import typing
from azure.functions import KafkaEvent
import json
import base64
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
event_dec = event.get_body().decode('utf-8')
event_json = json.loads(event_dec)
logging.info("Python Kafka trigger function called for message " + event_json["Value"])
headers = event_json["Headers"]
for header in headers:
logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))
Puede definir un esquema de Avro genérico para el evento que se pasa al desencadenador. El siguiente archivo function.json define el desencadenador para el proveedor específico con un esquema de Avro genérico:
{
"scriptFile": "main.py",
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaTriggerAvroGeneric",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
Después, se ejecuta el código siguiente cuando se desencadena la función:
import logging
from azure.functions import KafkaEvent
def main(kafkaTriggerAvroGeneric : KafkaEvent):
logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
logging.info(kafkaTriggerAvroGeneric.metadata)
Para obtener un conjunto completo de ejemplos de Python en funcionamiento, consulte el repositorio de extensiones de Kafka.
Las anotaciones que se usan para configurar el desencadenador dependen del proveedor de eventos específico.
En el ejemplo siguiente se muestra una función de Java que lee y registra el contenido del evento de Kafka:
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) String kafkaEventData,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
}
Para recibir eventos en un lote, use una cadena de entrada como una matriz, como se muestra en el ejemplo siguiente:
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
La función siguiente registra el mensaje y los encabezados del evento de Kafka:
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
Puede definir un esquema de Avro genérico para el evento que se pasa al desencadenador. La función siguiente define un desencadenador para el proveedor específico con un esquema de Avro genérico:
private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";
@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
@KafkaTrigger(
name = "kafkaAvroGenericSingle",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "ConfluentCloudUsername",
password = "ConfluentCloudPassword",
avroSchema = schema,
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL) Payment payment,
final ExecutionContext context) {
context.getLogger().info(payment.toString());
}
Para obtener un conjunto completo de ejemplos de Java en funcionamiento para Confluent, consulte el repositorio de extensiones de Kafka.
Atributos
Las bibliotecas de C# de procesos de trabajo aislados y en proceso usan KafkaTriggerAttribute
para definir el desencadenador de la función.
En la tabla siguiente se explican las propiedades que se pueden establecer mediante este atributo de desencadenador:
Parámetro | Descripción |
---|---|
BrokerList | (Obligatorio) Lista de agentes de Kafka que supervisa el desencadenador. Consulte Conexiones para obtener más información. |
Tema. | (Obligatorio) Tema supervisado por el desencadenador. |
ConsumerGroup | (Opcional) Grupo de consumidores de Kafka que usa el desencadenador. |
AvroSchema | (Opcional) Esquema de un registro genérico al usar el protocolo de Avro. |
AuthenticationMode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son Gssapi , Plain (predeterminado), ScramSha256 y ScramSha512 . |
Nombre de usuario | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi . Consulte Conexiones para obtener más información. |
Contraseña | (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi . Consulte Conexiones para obtener más información. |
Protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son plaintext (predeterminado), ssl , sasl_plaintext y sasl_ssl . |
SslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
SslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
SslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
SslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
anotaciones
La anotación KafkaTrigger
permite crear una función que se ejecuta cuando se recibe un tema. Entre las opciones admitidas se incluyen los elementos siguientes:
Elemento | Descripción |
---|---|
name | (Obligatorio) Nombre de la variable que representa el mensaje de cola o tema en el código de la función. |
brokerList | (Obligatorio) Lista de agentes de Kafka que supervisa el desencadenador. Consulte Conexiones para obtener más información. |
topic | (Obligatorio) Tema supervisado por el desencadenador. |
cardinalidad | (Opcional) Indica la cardinalidad de la entrada del desencadenador. Los valores admitidos son ONE (predeterminado) y MANY . Se usa ONE cuando la entrada es un único mensaje y MANY cuando la entrada es una matriz de mensajes. Cuando se usa MANY , también se debe establecer un valor dataType . |
dataType | Define la manera en que Functions controla el valor del parámetro. De forma predeterminada, el valor se obtiene como una cadena y Functions intenta deserializarla en el objeto de Java antiguo sin formato (POJO) real. Cuando es string , la entrada se trata como una cadena. Cuando es binary , el mensaje se recibe como datos binarios y Functions intenta deserializarlo en bytes de tipo de parámetro real[]. |
consumerGroup | (Opcional) Grupo de consumidores de Kafka que usa el desencadenador. |
avroSchema | (Opcional) Esquema de un registro genérico al usar el protocolo de Avro. |
authenticationMode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son Gssapi , Plain (predeterminado), ScramSha256 y ScramSha512 . |
username | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi . Consulte Conexiones para obtener más información. |
password | (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi . Consulte Conexiones para obtener más información. |
protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son plaintext (predeterminado), ssl , sasl_plaintext y sasl_ssl . |
sslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
sslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
sslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
sslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
Configuración
En la siguiente tabla se explican las propiedades de configuración de enlace que se establecen en el archivo function.json.
Propiedad de function.json | Descripción |
---|---|
type | (Obligatorio) Se debe establecer en kafkaTrigger . |
direction | (Obligatorio) Se debe establecer en in . |
name | (Obligatorio) Nombre de la variable que representa los datos del agente en el código de la función. |
brokerList | (Obligatorio) Lista de agentes de Kafka que supervisa el desencadenador. Consulte Conexiones para obtener más información. |
topic | (Obligatorio) Tema supervisado por el desencadenador. |
cardinalidad | (Opcional) Indica la cardinalidad de la entrada del desencadenador. Los valores admitidos son ONE (predeterminado) y MANY . Se usa ONE cuando la entrada es un único mensaje y MANY cuando la entrada es una matriz de mensajes. Cuando se usa MANY , también se debe establecer un valor dataType . |
dataType | Define la manera en que Functions controla el valor del parámetro. De forma predeterminada, el valor se obtiene como una cadena y Functions intenta deserializarla en el objeto de Java antiguo sin formato (POJO) real. Cuando es string , la entrada se trata como una cadena. Cuando es binary , el mensaje se recibe como datos binarios y Functions intenta deserializarlo en bytes de tipo de parámetro real[]. |
consumerGroup | (Opcional) Grupo de consumidores de Kafka que usa el desencadenador. |
avroSchema | (Opcional) Esquema de un registro genérico al usar el protocolo de Avro. |
authenticationMode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son Gssapi , Plain (predeterminado), ScramSha256 y ScramSha512 . |
username | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi . Consulte Conexiones para obtener más información. |
password | (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi . Consulte Conexiones para obtener más información. |
protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son plaintext (predeterminado), ssl , sasl_plaintext y sasl_ssl . |
sslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
sslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
sslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
sslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
Uso
Los eventos de Kafka se admiten actualmente como cadenas y matrices de cadenas que son cargas JSON.
Los mensajes de Kafka se pasan a la función como cadenas y matrices de cadenas que son cargas JSON.
En un plan prémium, debe habilitar la supervisión del escalado en tiempo de ejecución para que la salida de Kafka pueda escalarse horizontalmente en varias instancias. Para obtener más información, consulte Habilitación del escalado en tiempo de ejecución.
No puede usar la característica Probar/ejecutar de la página Código y prueba de Azure Portal para trabajar con desencadenadores de Kafka. Debe enviar eventos de prueba directamente al tema supervisado por el desencadenador.
Para obtener un conjunto completo de configuraciones de host.json admitidas en el desencadenador de Kafka, consulte la configuración de host.json.
Conexiones
Toda la información de conexión que necesiten los desencadenadores y enlaces debe mantenerse en la configuración de la aplicación, y no en las definiciones de enlace del código. Lo mismo se aplica las credenciales, que nunca deben almacenarse en el código.
Importante
La configuración de credenciales debe hacer referencia a una configuración de aplicación. No codifique las credenciales de forma rígida en el código ni en los archivos de configuración. Cuando realice la ejecución de forma local, use el archivo local.settings.json para las credenciales y no publique el archivo local.settings.json.
Al conectarse a un clúster de Kafka administrado que haya proporcionado Confluent en Azure, asegúrese de que las siguientes credenciales de autenticación del entorno de Confluent Cloud están establecidas en el desencadenador o enlace:
Setting | Valor recomendado | Descripción |
---|---|---|
BrokerList | BootstrapServer |
La configuración de la aplicación denominada BootstrapServer contiene el valor del servidor de arranque que se encuentra en la página de configuración de Confluent Cloud. El valor es similar a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 . |
Nombre de usuario | ConfluentCloudUsername |
La configuración de la aplicación denominada ConfluentCloudUsername contiene la clave de acceso de API del sitio web de Confluent Cloud. |
Contraseña | ConfluentCloudPassword |
La configuración de la aplicación denominada ConfluentCloudPassword contiene el secreto de API obtenido en el sitio web de Confluent Cloud. |
Los valores de la cadena que use en esta configuración deben estar presentes como la configuración de la aplicación en Azure o en la colección Values
en el archivo local.settings.json durante el desarrollo local.
También debe establecer Protocol
, AuthenticationMode
y SslCaLocation
en las definiciones de enlace.