Apache Kafka-eseményindító az Azure Functionshez

Az Azure Functions Apache Kafka-eseményindítójának használatával futtathatja a függvénykódot a Kafka-témakörökben szereplő üzenetekre válaszul. Kafka kimeneti kötéssel is írhat a függvényből egy témakörbe. A beállítási és konfigurációs részletekről az Azure Functions Apache Kafka-kötéseinek áttekintésében olvashat.

Fontos

A Kafka-kötések csak az Elastic Premium Csomag és a Dedikált (App Service) csomaghoz tartozó Functions esetében érhetők el. Ezek csak a Functions-futtatókörnyezet 3.x és újabb verziójában támogatottak.

Példa

Az eseményindító használata a függvényalkalmazásban használt C#-modalitástól függ, amely az alábbi módok egyike lehet:

A C# függvényt lefordított izolált feldolgozói folyamatosztály-kódtár a futtatókörnyezettől elkülönített folyamatban fut.

A használt attribútumok az adott eseményszolgáltatótól függenek.

Az alábbi példa egy C# függvényt mutat be, amely Kafka-eseményként olvassa és naplózza a Kafka-üzenetet:

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

Ha eseményeket szeretne fogadni egy kötegben, használjon sztringtömböt bemenetként, ahogyan az a következő példában is látható:

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

Az alábbi függvény naplózza a Kafka-esemény üzenetét és fejléceit:

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

A működő .NET-példák teljes készletéért tekintse meg a Kafka-bővítménytárat.

Feljegyzés

A TypeScript-példák egyenértékű készletét lásd a Kafka-bővítménytárban

A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Kafka-eseményindítót mutatnak be egy Olyan függvényhez, amely egy Kafka-üzenetet olvas és naplóz.

Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

A függvény aktiválásakor a következő kód fut:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

Ha eseményeket szeretne fogadni egy kötegben, állítsa az cardinality értéket many a function.json fájlban, ahogyan az alábbi példákban látható:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

A következő kód ezután elemzi az események tömbét, és naplózza az eseményadatokat:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

A következő kód a fejlécadatokat is naplózza:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

Definiálhat egy általános Avro-sémát az eseményindítónak átadott eseményhez. Az alábbi function.json definiálja az adott szolgáltató eseményindítóját egy általános Avro-sémával:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

A függvény aktiválásakor a következő kód fut:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

A működő JavaScript-példák teljes készletét a Kafka-bővítménytárban tekinti meg.

A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Kafka-eseményindítót mutatnak be egy Olyan függvényhez, amely egy Kafka-üzenetet olvas és naplóz.

Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

A függvény aktiválásakor a következő kód fut:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Ha eseményeket szeretne fogadni egy kötegben, állítsa az cardinality értéket many a function.json fájlban, ahogyan az alábbi példákban látható:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

A következő kód ezután elemzi az események tömbét, és naplózza az eseményadatokat:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

A következő kód a fejlécadatokat is naplózza:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

Definiálhat egy általános Avro-sémát az eseményindítónak átadott eseményhez. Az alábbi function.json definiálja az adott szolgáltató eseményindítóját egy általános Avro-sémával:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

A függvény aktiválásakor a következő kód fut:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

A működő PowerShell-példák teljes halmazát a Kafka-bővítménytárban tekinti meg.

A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Kafka-eseményindítót mutatnak be egy Olyan függvényhez, amely egy Kafka-üzenetet olvas és naplóz.

Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit:

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

A függvény aktiválásakor a következő kód fut:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

Ha eseményeket szeretne fogadni egy kötegben, állítsa az cardinality értéket many a function.json fájlban, ahogyan az alábbi példákban látható:

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

A következő kód ezután elemzi az események tömbét, és naplózza az eseményadatokat:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

A következő kód a fejlécadatokat is naplózza:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

Definiálhat egy általános Avro-sémát az eseményindítónak átadott eseményhez. Az alábbi function.json definiálja az adott szolgáltató eseményindítóját egy általános Avro-sémával:

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

A függvény aktiválásakor a következő kód fut:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

A működő Python-példák teljes halmazát a Kafka-bővítménytárban tekinti meg.

Az eseményindító konfigurálásához használt széljegyzetek az adott eseményszolgáltatótól függenek.

Az alábbi példa egy Java-függvényt mutat be, amely beolvassa és naplózza a Kafka-esemény tartalmát:

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

Ha eseményeket szeretne fogadni egy kötegben, használjon egy bemeneti sztringet tömbként az alábbi példában látható módon:

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

Az alábbi függvény naplózza a Kafka-esemény üzenetét és fejléceit:

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

Definiálhat egy általános Avro-sémát az eseményindítónak átadott eseményhez. Az alábbi függvény egy általános Avro-sémával rendelkező eseményindítót határoz meg az adott szolgáltatóhoz:

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

A Confluenthez készült működő Java-példák teljes készletét a Kafka-bővítménytárban tekinti meg.

Attribútumok

A C#-kódtárak a folyamaton belüli és az KafkaTriggerAttribute izolált feldolgozói folyamat C#-kódtárait is használják a függvény triggerének meghatározásához.

Az alábbi táblázat az eseményindító attribútummal beállítható tulajdonságokat ismerteti:

Paraméter Leírás
BrokerList (Kötelező) Az eseményindító által figyelt Kafka-közvetítők listája. További információért tekintse meg a Csatlakozás.
Téma (Kötelező) Az eseményindító által figyelt témakör.
ConsumerGroup (Nem kötelező) Az eseményindító által használt Kafka fogyasztói csoport.
AvroSchema (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor.
AuthenticationMode (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi: , Plain (alapértelmezett), ScramSha256. ScramSha512
Felhasználónév (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi. További információért tekintse meg a Csatlakozás.
Jelszó (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi. További információért tekintse meg a Csatlakozás.
Protokoll (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl, sasl_plaintext. sasl_ssl
SslCaLocation (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez.
SslCertificateLocation (Nem kötelező) Az ügyfél tanúsítványának elérési útja.
SslKeyLocation (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja.
SslKeyPassword (Nem kötelező) Az ügyfél tanúsítványának jelszava.

Jegyzetek

A KafkaTrigger széljegyzet lehetővé teszi egy olyan függvény létrehozását, amely egy témakör fogadásakor fut. A támogatott beállítások a következő elemeket tartalmazzák:

Elem Leírás
név (Kötelező) Annak a változónak a neve, amely az üzenetsort vagy a témakörüzenetet jelöli a függvénykódban.
brokerList (Kötelező) Az eseményindító által figyelt Kafka-közvetítők listája. További információért tekintse meg a Csatlakozás.
témakör (Kötelező) Az eseményindító által figyelt témakör.
Számossága (Nem kötelező) A trigger bemenetének számosságát jelzi. A támogatott értékek ( ONE alapértelmezett) és MANY. Akkor használható ONE , ha a bemenet egyetlen üzenet, és MANY ha a bemenet egy üzenettömb. Ha használja MANY, be kell állítania egy dataType.
Adattípus Meghatározza, hogy a Functions hogyan kezeli a paraméter értékét. Alapértelmezés szerint az érték sztringként lesz lekérve, és a Functions megpróbálja deszerializálni a sztringet a tényleges egyszerű régi Java-objektumra (POJO). Amikor stringa bemenetet csak sztringként kezeli a rendszer. Amikor binaryaz üzenet bináris adatként érkezik, és a Functions megpróbálja deszerializálni azt egy tényleges paramétertípus-bájtra[].
consumerGroup (Nem kötelező) Az eseményindító által használt Kafka fogyasztói csoport.
avroSchema (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor.
authenticationMode (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi: , Plain (alapértelmezett), ScramSha256. ScramSha512
Felhasználónév (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi. További információért tekintse meg a Csatlakozás.
Jelszó (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi. További információért tekintse meg a Csatlakozás.
Protokoll (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl, sasl_plaintext. sasl_ssl
sslCaLocation (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez.
sslCertificateLocation (Nem kötelező) Az ügyfél tanúsítványának elérési útja.
sslKeyLocation (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja.
sslKeyPassword (Nem kötelező) Az ügyfél tanúsítványának jelszava.

Konfiguráció

Az alábbi táblázat a function.json fájlban beállított kötéskonfigurációs tulajdonságokat ismerteti.

function.json tulajdonság Leírás
type (Kötelező) A beállításnak a kafkaTriggerkövetkezőnek kell lennie: .
direction (Kötelező) A beállításnak a inkövetkezőnek kell lennie: .
név (Kötelező) Annak a változónak a neve, amely a függvénykódban a közvetített adatokat jelöli.
brokerList (Kötelező) Az eseményindító által figyelt Kafka-közvetítők listája. További információért tekintse meg a Csatlakozás.
témakör (Kötelező) Az eseményindító által figyelt témakör.
Számossága (Nem kötelező) A trigger bemenetének számosságát jelzi. A támogatott értékek ( ONE alapértelmezett) és MANY. Akkor használható ONE , ha a bemenet egyetlen üzenet, és MANY ha a bemenet egy üzenettömb. Ha használja MANY, be kell állítania egy dataType.
Adattípus Meghatározza, hogy a Functions hogyan kezeli a paraméter értékét. Alapértelmezés szerint az érték sztringként lesz lekérve, és a Functions megpróbálja deszerializálni a sztringet a tényleges egyszerű régi Java-objektumra (POJO). Amikor stringa bemenetet csak sztringként kezeli a rendszer. Amikor binaryaz üzenet bináris adatként érkezik, és a Functions megpróbálja deszerializálni azt egy tényleges paramétertípus-bájtra[].
consumerGroup (Nem kötelező) Az eseményindító által használt Kafka fogyasztói csoport.
avroSchema (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor.
authenticationMode (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi: , Plain (alapértelmezett), ScramSha256. ScramSha512
Felhasználónév (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi. További információért tekintse meg a Csatlakozás.
Jelszó (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi. További információért tekintse meg a Csatlakozás.
Protokoll (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl, sasl_plaintext. sasl_ssl
sslCaLocation (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez.
sslCertificateLocation (Nem kötelező) Az ügyfél tanúsítványának elérési útja.
sslKeyLocation (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja.
sslKeyPassword (Nem kötelező) Az ügyfél tanúsítványának jelszava.

Használat

A Kafka-események jelenleg JSON hasznos adatnak minősülő sztringekként és sztringtömbökként támogatottak.

A Kafka-üzenetek JSON-hasznos adatnak minősülő sztringekként és sztringtömbökként kerülnek a függvénybe.

Prémium csomag esetén engedélyeznie kell a futtatókörnyezeti skálázás figyelését ahhoz, hogy a Kafka-kimenet több példányra is felskálázható legyen. További információ: Futtatókörnyezeti skálázás engedélyezése.

A Kafka-eseményindítók használatához nem használhatja az Azure Portal Code + Test lapjának Teszt/Futtatás funkcióját. Ehelyett közvetlenül az eseményindító által figyelt témakörnek kell elküldenie a teszteseményeket.

A Kafka-eseményindító támogatott host.json beállításainak teljes halmazát host.json beállítások között találhatja meg.

Kapcsolatok

Az eseményindítók és kötések által igényelt kapcsolati információkat az alkalmazásbeállításokban, és nem a kód kötésdefinícióiban kell tartani. Ez igaz a hitelesítő adatokra, amelyeket soha nem szabad a kódban tárolni.

Fontos

A hitelesítő adatok beállításainak egy alkalmazásbeállításra kell hivatkoznia. A kódban vagy a konfigurációs fájlokban ne írja be a hitelesítő adatokat. Helyi futtatáskor használja a local.settings.json fájlt a hitelesítő adataihoz, és ne tegye közzé a local.settings.json fájlt.

Amikor az Azure-ban a Confluent által biztosított felügyelt Kafka-fürthöz csatlakozik, győződjön meg arról, hogy a Confluent Cloud-környezet következő hitelesítési hitelesítő adatai be vannak állítva az eseményindítóban vagy kötésben:

Beállítás Javasolt érték Leírás
BrokerList BootstrapServer Az elnevezett BootstrapServer alkalmazásbeállítás a Confluent Cloud beállításai lapon található bootstrap-kiszolgáló értékét tartalmazza. Az érték a következőhöz xyz-xyzxzy.westeurope.azure.confluent.cloud:9092hasonló: .
Felhasználónév ConfluentCloudUsername A névvel ellátott ConfluentCloudUsername alkalmazásbeállítás tartalmazza a Confluent Cloud webhely api-hozzáférési kulcsát.
Jelszó ConfluentCloudPassword A névvel ellátott ConfluentCloudPassword alkalmazásbeállítás tartalmazza a Confluent Cloud webhelyről beszerzett API-titkos kódot.

Az ezekhez a beállításokhoz használt sztringértékeknek alkalmazásbeállításokként kell szerepelniük az Azure-ban vagy a Values local.settings.json fájl gyűjteményében a helyi fejlesztés során.

A , AuthenticationModeés SslCaLocation a Protocolkötésdefiníciókat is be kell állítania.

Következő lépések