Apache Kafka-eseményindító az Azure Functionshez
Az Azure Functions Apache Kafka-eseményindítójának használatával futtathatja a függvénykódot a Kafka-témakörökben szereplő üzenetekre válaszul. Kafka kimeneti kötéssel is írhat a függvényből egy témakörbe. A beállítási és konfigurációs részletekről az Azure Functions Apache Kafka-kötéseinek áttekintésében olvashat.
Fontos
A Kafka-kötések csak az Elastic Premium Csomag és a Dedikált (App Service) csomaghoz tartozó Functions esetében érhetők el. Ezek csak a Functions-futtatókörnyezet 3.x és újabb verziójában támogatottak.
Példa
Az eseményindító használata a függvényalkalmazásban használt C#-modalitástól függ, amely az alábbi módok egyike lehet:
A C# függvényt lefordított izolált feldolgozói folyamatosztály-kódtár a futtatókörnyezettől elkülönített folyamatban fut.
A használt attribútumok az adott eseményszolgáltatótól függenek.
Az alábbi példa egy C# függvényt mutat be, amely Kafka-eseményként olvassa és naplózza a Kafka-üzenetet:
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
Ha eseményeket szeretne fogadni egy kötegben, használjon sztringtömböt bemenetként, ahogyan az a következő példában is látható:
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
Az alábbi függvény naplózza a Kafka-esemény üzenetét és fejléceit:
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
A működő .NET-példák teljes készletéért tekintse meg a Kafka-bővítménytárat.
Feljegyzés
A TypeScript-példák egyenértékű készletét lásd a Kafka-bővítménytárban
A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Kafka-eseményindítót mutatnak be egy Olyan függvényhez, amely egy Kafka-üzenetet olvas és naplóz.
Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "saslSsl",
"authenticationMode": "plain",
"consumerGroup" : "$Default",
"dataType": "string"
}
]
}
A függvény aktiválásakor a következő kód fut:
module.exports = async function (context, event) {
// context.log.info(event)
context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};
Ha eseményeket szeretne fogadni egy kötegben, állítsa az cardinality
értéket many
a function.json fájlban, ahogyan az alábbi példákban látható:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%"
}
]
}
A következő kód ezután elemzi az események tömbét, és naplózza az eseményadatokat:
module.exports = async function (context, events) {
function print(event) {
var eventJson = JSON.parse(event)
context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
}
events.map(print);
};
A következő kód a fejlécadatokat is naplózza:
module.exports = async function (context, event) {
function print(kevent) {
var keventJson = JSON.parse(kevent)
context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
context.log.info(`Headers for this message:`)
let headers = keventJson.Headers;
headers.forEach(element => {
context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`)
});
}
event.map(print);
};
Definiálhat egy általános Avro-sémát az eseményindítónak átadott eseményhez. Az alábbi function.json definiálja az adott szolgáltató eseményindítóját egy általános Avro-sémával:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaAvroGenericSingle",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
A függvény aktiválásakor a következő kód fut:
module.exports = async function (context, event) {
context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};
A működő JavaScript-példák teljes készletét a Kafka-bővítménytárban tekinti meg.
A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Kafka-eseményindítót mutatnak be egy Olyan függvényhez, amely egy Kafka-üzenetet olvas és naplóz.
Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
A függvény aktiválásakor a következő kód fut:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Ha eseményeket szeretne fogadni egy kötegben, állítsa az cardinality
értéket many
a function.json fájlban, ahogyan az alábbi példákban látható:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
A következő kód ezután elemzi az események tömbét, és naplózza az eseményadatokat:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
$event = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $event.Value"
}
A következő kód a fejlécadatokat is naplózza:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
foreach ($kafkaEvent in $kafkaEvents) {
$kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
Write-Output "Headers for this message:"
foreach ($header in $kevent.Headers) {
$DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
$Key = $header.Key
Write-Output "Key: $Key Value: $DecodedValue"
}
}
Definiálhat egy általános Avro-sémát az eseményindítónak átadott eseményhez. Az alábbi function.json definiálja az adott szolgáltató eseményindítóját egy általános Avro-sémával:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvent",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
A függvény aktiválásakor a következő kód fut:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
A működő PowerShell-példák teljes halmazát a Kafka-bővítménytárban tekinti meg.
A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Kafka-eseményindítót mutatnak be egy Olyan függvényhez, amely egy Kafka-üzenetet olvas és naplóz.
Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit:
{
"scriptFile": "main.py",
"bindings": [
{
"type": "kafkaTrigger",
"name": "kevent",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"consumerGroup" : "functions",
"protocol": "saslSsl",
"authenticationMode": "plain"
}
]
}
A függvény aktiválásakor a következő kód fut:
import logging
from azure.functions import KafkaEvent
def main(kevent : KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
Ha eseményeket szeretne fogadni egy kötegben, állítsa az cardinality
értéket many
a function.json fájlban, ahogyan az alábbi példákban látható:
{
"scriptFile": "main.py",
"bindings": [
{
"type" : "kafkaTrigger",
"direction": "in",
"name" : "kevents",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"topic" : "message_python",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"dataType": "string",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"BrokerList" : "%BrokerList%"
}
]
}
A következő kód ezután elemzi az események tömbét, és naplózza az eseményadatokat:
import logging
import typing
from azure.functions import KafkaEvent
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
logging.info(event.get_body())
A következő kód a fejlécadatokat is naplózza:
import logging
import typing
from azure.functions import KafkaEvent
import json
import base64
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
event_dec = event.get_body().decode('utf-8')
event_json = json.loads(event_dec)
logging.info("Python Kafka trigger function called for message " + event_json["Value"])
headers = event_json["Headers"]
for header in headers:
logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))
Definiálhat egy általános Avro-sémát az eseményindítónak átadott eseményhez. Az alábbi function.json definiálja az adott szolgáltató eseményindítóját egy általános Avro-sémával:
{
"scriptFile": "main.py",
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaTriggerAvroGeneric",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
A függvény aktiválásakor a következő kód fut:
import logging
from azure.functions import KafkaEvent
def main(kafkaTriggerAvroGeneric : KafkaEvent):
logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
logging.info(kafkaTriggerAvroGeneric.metadata)
A működő Python-példák teljes halmazát a Kafka-bővítménytárban tekinti meg.
Az eseményindító konfigurálásához használt széljegyzetek az adott eseményszolgáltatótól függenek.
Az alábbi példa egy Java-függvényt mutat be, amely beolvassa és naplózza a Kafka-esemény tartalmát:
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) String kafkaEventData,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
}
Ha eseményeket szeretne fogadni egy kötegben, használjon egy bemeneti sztringet tömbként az alábbi példában látható módon:
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
Az alábbi függvény naplózza a Kafka-esemény üzenetét és fejléceit:
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
Definiálhat egy általános Avro-sémát az eseményindítónak átadott eseményhez. Az alábbi függvény egy általános Avro-sémával rendelkező eseményindítót határoz meg az adott szolgáltatóhoz:
private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";
@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
@KafkaTrigger(
name = "kafkaAvroGenericSingle",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "ConfluentCloudUsername",
password = "ConfluentCloudPassword",
avroSchema = schema,
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL) Payment payment,
final ExecutionContext context) {
context.getLogger().info(payment.toString());
}
A Confluenthez készült működő Java-példák teljes készletét a Kafka-bővítménytárban tekinti meg.
Attribútumok
A C#-kódtárak a folyamaton belüli és az KafkaTriggerAttribute
izolált feldolgozói folyamat C#-kódtárait is használják a függvény triggerének meghatározásához.
Az alábbi táblázat az eseményindító attribútummal beállítható tulajdonságokat ismerteti:
Paraméter | Leírás |
---|---|
BrokerList | (Kötelező) Az eseményindító által figyelt Kafka-közvetítők listája. További információt a Kapcsolatok című témakörben talál. |
Téma | (Kötelező) Az eseményindító által figyelt témakör. |
ConsumerGroup | (Nem kötelező) Az eseményindító által használt Kafka fogyasztói csoport. |
AvroSchema | (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor. |
AuthenticationMode | (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi : , Plain (alapértelmezett), ScramSha256 . ScramSha512 |
Felhasználónév | (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi . További információt a Kapcsolatok című témakörben talál. |
Jelszó | (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi . További információt a Kapcsolatok című témakörben talál. |
Protokoll | (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl , sasl_plaintext . sasl_ssl |
SslCaLocation | (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez. |
SslCertificateLocation | (Nem kötelező) Az ügyfél tanúsítványának elérési útja. |
SslKeyLocation | (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja. |
SslKeyPassword | (Nem kötelező) Az ügyfél tanúsítványának jelszava. |
Jegyzetek
A KafkaTrigger
széljegyzet lehetővé teszi egy olyan függvény létrehozását, amely egy témakör fogadásakor fut. A támogatott beállítások a következő elemeket tartalmazzák:
Elem | Leírás |
---|---|
név | (Kötelező) Annak a változónak a neve, amely az üzenetsort vagy a témakörüzenetet jelöli a függvénykódban. |
brokerList | (Kötelező) Az eseményindító által figyelt Kafka-közvetítők listája. További információt a Kapcsolatok című témakörben talál. |
témakör | (Kötelező) Az eseményindító által figyelt témakör. |
Számossága | (Nem kötelező) A trigger bemenetének számosságát jelzi. A támogatott értékek ( ONE alapértelmezett) és MANY . Akkor használható ONE , ha a bemenet egyetlen üzenet, és MANY ha a bemenet egy üzenettömb. Ha használja MANY , be kell állítania egy dataType . |
Adattípus | Meghatározza, hogy a Functions hogyan kezeli a paraméter értékét. Alapértelmezés szerint az érték sztringként lesz lekérve, és a Functions megpróbálja deszerializálni a sztringet a tényleges egyszerű régi Java-objektumra (POJO). Amikor string a bemenetet csak sztringként kezeli a rendszer. Amikor binary az üzenet bináris adatként érkezik, és a Functions megpróbálja deszerializálni azt egy tényleges paramétertípus-bájtra[]. |
consumerGroup | (Nem kötelező) Az eseményindító által használt Kafka fogyasztói csoport. |
avroSchema | (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor. |
authenticationMode | (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi : , Plain (alapértelmezett), ScramSha256 . ScramSha512 |
felhasználónév | (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi . További információt a Kapcsolatok című témakörben talál. |
jelszó | (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi . További információt a Kapcsolatok című témakörben talál. |
protokoll | (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl , sasl_plaintext . sasl_ssl |
sslCaLocation | (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez. |
sslCertificateLocation | (Nem kötelező) Az ügyfél tanúsítványának elérési útja. |
sslKeyLocation | (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja. |
sslKeyPassword | (Nem kötelező) Az ügyfél tanúsítványának jelszava. |
Konfiguráció
Az alábbi táblázat a function.json fájlban beállított kötéskonfigurációs tulajdonságokat ismerteti.
function.json tulajdonság | Leírás |
---|---|
type | (Kötelező) A beállításnak a kafkaTrigger következőnek kell lennie: . |
direction | (Kötelező) A beállításnak a in következőnek kell lennie: . |
név | (Kötelező) Annak a változónak a neve, amely a függvénykódban a közvetített adatokat jelöli. |
brokerList | (Kötelező) Az eseményindító által figyelt Kafka-közvetítők listája. További információt a Kapcsolatok című témakörben talál. |
témakör | (Kötelező) Az eseményindító által figyelt témakör. |
Számossága | (Nem kötelező) A trigger bemenetének számosságát jelzi. A támogatott értékek ( ONE alapértelmezett) és MANY . Akkor használható ONE , ha a bemenet egyetlen üzenet, és MANY ha a bemenet egy üzenettömb. Ha használja MANY , be kell állítania egy dataType . |
Adattípus | Meghatározza, hogy a Functions hogyan kezeli a paraméter értékét. Alapértelmezés szerint az érték sztringként lesz lekérve, és a Functions megpróbálja deszerializálni a sztringet a tényleges egyszerű régi Java-objektumra (POJO). Amikor string a bemenetet csak sztringként kezeli a rendszer. Amikor binary az üzenet bináris adatként érkezik, és a Functions megpróbálja deszerializálni azt egy tényleges paramétertípus-bájtra[]. |
consumerGroup | (Nem kötelező) Az eseményindító által használt Kafka fogyasztói csoport. |
avroSchema | (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor. |
authenticationMode | (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi : , Plain (alapértelmezett), ScramSha256 . ScramSha512 |
felhasználónév | (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi . További információt a Kapcsolatok című témakörben talál. |
jelszó | (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi . További információt a Kapcsolatok című témakörben talál. |
protokoll | (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl , sasl_plaintext . sasl_ssl |
sslCaLocation | (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez. |
sslCertificateLocation | (Nem kötelező) Az ügyfél tanúsítványának elérési útja. |
sslKeyLocation | (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja. |
sslKeyPassword | (Nem kötelező) Az ügyfél tanúsítványának jelszava. |
Használat
A Kafka-események jelenleg JSON hasznos adatnak minősülő sztringekként és sztringtömbökként támogatottak.
A Kafka-üzenetek JSON-hasznos adatnak minősülő sztringekként és sztringtömbökként kerülnek a függvénybe.
Prémium csomag esetén engedélyeznie kell a futtatókörnyezeti skálázás figyelését ahhoz, hogy a Kafka-kimenet több példányra is felskálázható legyen. További információ: Futtatókörnyezeti skálázás engedélyezése.
A Kafka-eseményindítók használatához nem használhatja az Azure Portal Code + Test lapjának Teszt/Futtatás funkcióját. Ehelyett közvetlenül az eseményindító által figyelt témakörnek kell elküldenie a teszteseményeket.
A Kafka-eseményindító támogatott host.json beállításainak teljes halmazát host.json beállítások között találhatja meg.
Kapcsolatok
Az eseményindítók és kötések által igényelt kapcsolati információkat az alkalmazásbeállításokban, és nem a kód kötésdefinícióiban kell tartani. Ez igaz a hitelesítő adatokra, amelyeket soha nem szabad a kódban tárolni.
Fontos
A hitelesítő adatok beállításainak egy alkalmazásbeállításra kell hivatkoznia. A kódban vagy a konfigurációs fájlokban ne írja be a hitelesítő adatokat. Helyi futtatáskor használja a local.settings.json fájlt a hitelesítő adataihoz, és ne tegye közzé a local.settings.json fájlt.
Amikor az Azure-ban a Confluent által biztosított felügyelt Kafka-fürthöz csatlakozik, győződjön meg arról, hogy a Confluent Cloud-környezet következő hitelesítési hitelesítő adatai be vannak állítva az eseményindítóban vagy kötésben:
Beállítás | Javasolt érték | Leírás |
---|---|---|
BrokerList | BootstrapServer |
Az elnevezett BootstrapServer alkalmazásbeállítás a Confluent Cloud beállításai lapon található bootstrap-kiszolgáló értékét tartalmazza. Az érték a következőhöz xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 hasonló: . |
Felhasználónév | ConfluentCloudUsername |
A névvel ellátott ConfluentCloudUsername alkalmazásbeállítás tartalmazza a Confluent Cloud webhely api-hozzáférési kulcsát. |
Jelszó | ConfluentCloudPassword |
A névvel ellátott ConfluentCloudPassword alkalmazásbeállítás tartalmazza a Confluent Cloud webhelyről beszerzett API-titkos kódot. |
Az ezekhez a beállításokhoz használt sztringértékeknek alkalmazásbeállításokként kell szerepelniük az Azure-ban vagy a Values
local.settings.json fájl gyűjteményében a helyi fejlesztés során.
A , AuthenticationMode
és SslCaLocation
a Protocol
kötésdefiníciókat is be kell állítania.