Apache Kafka-trigger voor Azure Functions
U kunt de Apache Kafka-trigger in Azure Functions gebruiken om uw functiecode uit te voeren als reactie op berichten in Kafka-onderwerpen. U kunt ook een Kafka-uitvoerbinding gebruiken om vanuit uw functie naar een onderwerp te schrijven. Zie het overzicht van Apache Kafka-bindingen voor Azure Functions voor meer informatie over de installatie- en configuratiedetails.
Belangrijk
Kafka-bindingen zijn alleen beschikbaar voor Functions in het Elastic Premium-abonnement en het Toegewezen (App Service)-abonnement. Ze worden alleen ondersteund op versie 3.x en latere versie van de Functions-runtime.
Opmerking
Het gebruik van de trigger is afhankelijk van de C#-modaliteit die wordt gebruikt in uw functie-app. Dit kan een van de volgende modi zijn:
Een geïsoleerde werkprocesklassebibliotheek gecompileerde C#-functie wordt uitgevoerd in een proces dat is geïsoleerd van de runtime.
De kenmerken die u gebruikt, zijn afhankelijk van de specifieke gebeurtenisprovider.
In het volgende voorbeeld ziet u een C#-functie die het Kafka-bericht leest en registreert als een Kafka-gebeurtenis:
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
Als u gebeurtenissen in een batch wilt ontvangen, gebruikt u een tekenreeksmatrix als invoer, zoals wordt weergegeven in het volgende voorbeeld:
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
De volgende functie registreert het bericht en de headers voor de Kafka-gebeurtenis:
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
Zie de Kafka-extensieopslagplaats voor een volledige set werkende .NET-voorbeelden.
De specifieke eigenschappen van het function.json-bestand zijn afhankelijk van uw gebeurtenisprovider, die in deze voorbeelden Confluent of Azure Event Hubs zijn. In de volgende voorbeelden ziet u een Kafka-trigger voor een functie die een Kafka-bericht leest en registreert.
De volgende function.json definieert de trigger voor de specifieke provider:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "saslSsl",
"authenticationMode": "plain",
"consumerGroup" : "$Default",
"dataType": "string"
}
]
}
De volgende code wordt vervolgens uitgevoerd wanneer de functie wordt geactiveerd:
module.exports = async function (context, event) {
// context.log.info(event)
context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};
Als u gebeurtenissen in een batch wilt ontvangen, stelt u de cardinality
waarde many
in op het function.json-bestand, zoals wordt weergegeven in de volgende voorbeelden:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%"
}
]
}
De volgende code parseert vervolgens de matrix met gebeurtenissen en registreert de gebeurtenisgegevens:
module.exports = async function (context, events) {
function print(event) {
var eventJson = JSON.parse(event)
context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
}
events.map(print);
};
Met de volgende code worden ook de headergegevens vastgelegd:
module.exports = async function (context, event) {
function print(kevent) {
var keventJson = JSON.parse(kevent)
context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
context.log.info(`Headers for this message:`)
let headers = keventJson.Headers;
headers.forEach(element => {
context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`)
});
}
event.map(print);
};
U kunt een algemeen Avro-schema definiëren voor de gebeurtenis die aan de trigger is doorgegeven. De volgende function.json definieert de trigger voor de specifieke provider met een algemeen Avro-schema:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaAvroGenericSingle",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
De volgende code wordt vervolgens uitgevoerd wanneer de functie wordt geactiveerd:
module.exports = async function (context, event) {
context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};
Zie de Kafka-extensieopslagplaats voor een volledige set werkende JavaScript-voorbeelden.
De specifieke eigenschappen van het function.json-bestand zijn afhankelijk van uw gebeurtenisprovider, die in deze voorbeelden Confluent of Azure Event Hubs zijn. In de volgende voorbeelden ziet u een Kafka-trigger voor een functie die een Kafka-bericht leest en registreert.
De volgende function.json definieert de trigger voor de specifieke provider:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
De volgende code wordt vervolgens uitgevoerd wanneer de functie wordt geactiveerd:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Als u gebeurtenissen in een batch wilt ontvangen, stelt u de cardinality
waarde many
in op het function.json-bestand, zoals wordt weergegeven in de volgende voorbeelden:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
De volgende code parseert vervolgens de matrix met gebeurtenissen en registreert de gebeurtenisgegevens:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
$event = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $event.Value"
}
Met de volgende code worden ook de headergegevens vastgelegd:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
foreach ($kafkaEvent in $kafkaEvents) {
$kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
Write-Output "Headers for this message:"
foreach ($header in $kevent.Headers) {
$DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
$Key = $header.Key
Write-Output "Key: $Key Value: $DecodedValue"
}
}
U kunt een algemeen Avro-schema definiëren voor de gebeurtenis die aan de trigger is doorgegeven. De volgende function.json definieert de trigger voor de specifieke provider met een algemeen Avro-schema:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvent",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
De volgende code wordt vervolgens uitgevoerd wanneer de functie wordt geactiveerd:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Zie de Kafka-extensieopslagplaats voor een volledige set werkende PowerShell-voorbeelden.
De specifieke eigenschappen van het function.json-bestand zijn afhankelijk van uw gebeurtenisprovider, die in deze voorbeelden Confluent of Azure Event Hubs zijn. In de volgende voorbeelden ziet u een Kafka-trigger voor een functie die een Kafka-bericht leest en registreert.
De volgende function.json definieert de trigger voor de specifieke provider:
{
"scriptFile": "main.py",
"bindings": [
{
"type": "kafkaTrigger",
"name": "kevent",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"consumerGroup" : "functions",
"protocol": "saslSsl",
"authenticationMode": "plain"
}
]
}
De volgende code wordt vervolgens uitgevoerd wanneer de functie wordt geactiveerd:
import logging
from azure.functions import KafkaEvent
def main(kevent : KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
Als u gebeurtenissen in een batch wilt ontvangen, stelt u de cardinality
waarde many
in op het function.json-bestand, zoals wordt weergegeven in de volgende voorbeelden:
{
"scriptFile": "main.py",
"bindings": [
{
"type" : "kafkaTrigger",
"direction": "in",
"name" : "kevents",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"topic" : "message_python",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"dataType": "string",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"BrokerList" : "%BrokerList%"
}
]
}
De volgende code parseert vervolgens de matrix met gebeurtenissen en registreert de gebeurtenisgegevens:
import logging
import typing
from azure.functions import KafkaEvent
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
logging.info(event.get_body())
Met de volgende code worden ook de headergegevens vastgelegd:
import logging
import typing
from azure.functions import KafkaEvent
import json
import base64
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
event_dec = event.get_body().decode('utf-8')
event_json = json.loads(event_dec)
logging.info("Python Kafka trigger function called for message " + event_json["Value"])
headers = event_json["Headers"]
for header in headers:
logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))
U kunt een algemeen Avro-schema definiëren voor de gebeurtenis die aan de trigger is doorgegeven. De volgende function.json definieert de trigger voor de specifieke provider met een algemeen Avro-schema:
{
"scriptFile": "main.py",
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaTriggerAvroGeneric",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
De volgende code wordt vervolgens uitgevoerd wanneer de functie wordt geactiveerd:
import logging
from azure.functions import KafkaEvent
def main(kafkaTriggerAvroGeneric : KafkaEvent):
logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
logging.info(kafkaTriggerAvroGeneric.metadata)
Zie de Kafka-extensieopslagplaats voor een volledige set werkende Python-voorbeelden.
De aantekeningen die u gebruikt om uw trigger te configureren, zijn afhankelijk van de specifieke gebeurtenisprovider.
In het volgende voorbeeld ziet u een Java-functie die de inhoud van de Kafka-gebeurtenis leest en registreert:
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) String kafkaEventData,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
}
Als u gebeurtenissen in een batch wilt ontvangen, gebruikt u een invoertekenreeks als een matrix, zoals wordt weergegeven in het volgende voorbeeld:
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
De volgende functie registreert het bericht en de headers voor de Kafka-gebeurtenis:
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
U kunt een algemeen Avro-schema definiëren voor de gebeurtenis die aan de trigger is doorgegeven. De volgende functie definieert een trigger voor de specifieke provider met een algemeen Avro-schema:
private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";
@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
@KafkaTrigger(
name = "kafkaAvroGenericSingle",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "ConfluentCloudUsername",
password = "ConfluentCloudPassword",
avroSchema = schema,
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL) Payment payment,
final ExecutionContext context) {
context.getLogger().info(payment.toString());
}
Zie de Kafka-extensieopslagplaats voor een volledige set werkende Java-voorbeelden voor Confluent.
Kenmerken
C #-bibliotheken voor zowel in-process - als geïsoleerde werkprocessen maken gebruik van de KafkaTriggerAttribute
functietrigger om de functietrigger te definiëren.
In de volgende tabel worden de eigenschappen uitgelegd die u kunt instellen met behulp van dit triggerkenmerk:
Parameter | Description |
---|---|
BrokerList | (Vereist) De lijst met Kafka-brokers die worden bewaakt door de trigger. Zie Verbindingen voor meer informatie. |
Onderwerp | (Vereist) Het onderwerp dat wordt bewaakt door de trigger. |
ConsumerGroup | (Optioneel) Kafka-consumentengroep die door de trigger wordt gebruikt. |
AvroSchema | (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol. |
AuthenticationMode | (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn Gssapi , Plain (standaard), ScramSha256 . ScramSha512 |
Gebruikersnaam | (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
Wachtwoord | (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
Protocol | (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn plaintext (standaard), ssl , . sasl_ssl sasl_plaintext |
SslCaLocation | (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker. |
SslCertificateLocation | (Optioneel) Pad naar het certificaat van de client. |
SslKeyLocation | (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie. |
SslKeyPassword | (Optioneel) Wachtwoord voor het certificaat van de client. |
Aantekeningen
Met de KafkaTrigger
aantekening kunt u een functie maken die wordt uitgevoerd wanneer een onderwerp wordt ontvangen. Ondersteunde opties omvatten de volgende elementen:
Element | Description |
---|---|
name | (Vereist) De naam van de variabele die de wachtrij of het onderwerpbericht in functiecode vertegenwoordigt. |
brokerList | (Vereist) De lijst met Kafka-brokers die worden bewaakt door de trigger. Zie Verbindingen voor meer informatie. |
onderwerp | (Vereist) Het onderwerp dat wordt bewaakt door de trigger. |
cardinality | (Optioneel) Geeft de kardinaliteit van de triggerinvoer aan. De ondersteunde waarden zijn ONE (standaard) en MANY . Gebruik ONE wanneer de invoer één bericht is en MANY wanneer de invoer een matrix van berichten is. Wanneer u gebruikt MANY , moet u ook een dataType . |
Datatype | Definieert hoe Functions de parameterwaarde verwerkt. De waarde wordt standaard verkregen als een tekenreeks en Functions probeert de tekenreeks te deserialiseren naar een echt normaal oud Java-object (POJO). Wanneer string , wordt de invoer behandeld als alleen een tekenreeks. Wanneer binary , het bericht wordt ontvangen als binaire gegevens en Functions probeert het te deserialiseren naar een werkelijke parametertype byte[]. |
consumerGroup | (Optioneel) Kafka-consumentengroep die door de trigger wordt gebruikt. |
avroSchema | (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol. |
authenticationMode | (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn Gssapi , Plain (standaard), ScramSha256 . ScramSha512 |
gebruikersnaam | (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
password | (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
protocol | (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn plaintext (standaard), ssl , . sasl_ssl sasl_plaintext |
sslCaLocation | (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker. |
sslCertificateLocation | (Optioneel) Pad naar het certificaat van de client. |
sslKeyLocation | (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie. |
sslKeyPassword | (Optioneel) Wachtwoord voor het certificaat van de client. |
Configuratie
In de volgende tabel worden de bindingsconfiguratie-eigenschappen uitgelegd die u in het function.json-bestand hebt ingesteld.
eigenschap function.json | Beschrijving |
---|---|
type | (Vereist) Moet worden ingesteld op kafkaTrigger . |
direction | (Vereist) Moet worden ingesteld op in . |
name | (Vereist) De naam van de variabele die de brokergegevens in functiecode vertegenwoordigt. |
brokerList | (Vereist) De lijst met Kafka-brokers die worden bewaakt door de trigger. Zie Verbindingen voor meer informatie. |
onderwerp | (Vereist) Het onderwerp dat wordt bewaakt door de trigger. |
cardinality | (Optioneel) Geeft de kardinaliteit van de triggerinvoer aan. De ondersteunde waarden zijn ONE (standaard) en MANY . Gebruik ONE wanneer de invoer één bericht is en MANY wanneer de invoer een matrix van berichten is. Wanneer u gebruikt MANY , moet u ook een dataType . |
Datatype | Definieert hoe Functions de parameterwaarde verwerkt. De waarde wordt standaard verkregen als een tekenreeks en Functions probeert de tekenreeks te deserialiseren naar een echt normaal oud Java-object (POJO). Wanneer string , wordt de invoer behandeld als alleen een tekenreeks. Wanneer binary , het bericht wordt ontvangen als binaire gegevens en Functions probeert het te deserialiseren naar een werkelijke parametertype byte[]. |
consumerGroup | (Optioneel) Kafka-consumentengroep die door de trigger wordt gebruikt. |
avroSchema | (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol. |
authenticationMode | (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn Gssapi , Plain (standaard), ScramSha256 . ScramSha512 |
gebruikersnaam | (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
password | (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
protocol | (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn plaintext (standaard), ssl , . sasl_ssl sasl_plaintext |
sslCaLocation | (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker. |
sslCertificateLocation | (Optioneel) Pad naar het certificaat van de client. |
sslKeyLocation | (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie. |
sslKeyPassword | (Optioneel) Wachtwoord voor het certificaat van de client. |
Gebruik
Kafka-gebeurtenissen worden momenteel ondersteund als tekenreeksen en tekenreeksmatrices die JSON-nettoladingen zijn.
Kafka-berichten worden doorgegeven aan de functie als tekenreeksen en tekenreeksmatrices die JSON-nettoladingen zijn.
In een Premium-abonnement moet u runtimeschaalbewaking inschakelen voor de Kafka-uitvoer om uit te kunnen schalen naar meerdere exemplaren. Zie Schalen van runtime inschakelen voor meer informatie.
U kunt de functie Testen/uitvoeren van de pagina Code en test in Azure Portal niet gebruiken om te werken met Kafka-triggers. U moet in plaats daarvan testgebeurtenissen rechtstreeks verzenden naar het onderwerp dat wordt bewaakt door de trigger.
Zie host.json instellingen voor een volledige set ondersteunde host.json instellingen voor de Kafka-trigger.
Connecties
Alle verbindingsgegevens die vereist zijn voor uw triggers en bindingen, moeten worden onderhouden in toepassingsinstellingen en niet in de bindingsdefinities in uw code. Dit geldt voor referenties, die nooit in uw code moeten worden opgeslagen.
Belangrijk
Referentie-instellingen moeten verwijzen naar een toepassingsinstelling. Gebruik geen codereferenties in uw code- of configuratiebestanden. Wanneer u lokaal werkt, gebruikt u het local.settings.json-bestand voor uw referenties en publiceert u het local.settings.json bestand niet.
Wanneer u verbinding maakt met een beheerd Kafka-cluster dat wordt geleverd door Confluent in Azure, moet u ervoor zorgen dat de volgende verificatiereferenties voor uw Confluent Cloud-omgeving zijn ingesteld in uw trigger of binding:
Instelling | Aanbevolen waarde | Beschrijving |
---|---|---|
BrokerList | BootstrapServer |
De app-instelling met de naam BootstrapServer bevat de waarde van de bootstrap-server die is gevonden op de pagina met Confluent Cloud-instellingen. De waarde lijkt op xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 . |
Gebruikersnaam | ConfluentCloudUsername |
App-instelling met de naam ConfluentCloudUsername bevat de API-toegangssleutel van de Confluent Cloud-website. |
Wachtwoord | ConfluentCloudPassword |
App-instelling met de naam ConfluentCloudPassword bevat het API-geheim dat is verkregen van de Confluent Cloud-website. |
De tekenreekswaarden die u voor deze instellingen gebruikt, moeten aanwezig zijn als toepassingsinstellingen in Azure of in de Values
verzameling in het local.settings.json-bestand tijdens lokale ontwikkeling.
U moet ook de Protocol
, AuthenticationMode
en SslCaLocation
in uw bindingsdefinities instellen.