Megosztás a következőn keresztül:


Apache Kafka-eseményindító az Azure Functionshez

Az Azure Functions Apache Kafka-eseményindítójának használatával futtathatja a függvénykódot a Kafka-témakörökben szereplő üzenetekre válaszul. Kafka kimeneti kötéssel is írhat a függvényből egy témakörbe. A beállítási és konfigurációs részletekről az Azure Functions Apache Kafka-kötéseinek áttekintésében olvashat.

Fontos

A Kafka-kötések csak az Elastic Premium Csomag és a Dedikált (App Service) csomaghoz tartozó Functions esetében érhetők el. Ezek csak a Functions-futtatókörnyezet 3.x és újabb verziójában támogatottak.

Példa

Az eseményindító használata a függvényalkalmazásban használt C#-modalitástól függ, amely az alábbi módok egyike lehet:

A C# függvényt lefordított izolált feldolgozói folyamatosztály-kódtár a futtatókörnyezettől elkülönített folyamatban fut.

A használt attribútumok az adott eseményszolgáltatótól függenek.

Az alábbi példa egy C# függvényt mutat be, amely Kafka-eseményként olvassa és naplózza a Kafka-üzenetet:

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

Ha eseményeket szeretne fogadni egy kötegben, használjon sztringtömböt bemenetként, ahogyan az a következő példában is látható:

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

Az alábbi függvény naplózza a Kafka-esemény üzenetét és fejléceit:

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

A működő .NET-példák teljes készletéért tekintse meg a Kafka-bővítménytárat.

Feljegyzés

A TypeScript-példák egyenértékű készletét lásd a Kafka-bővítménytárban

A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Kafka-eseményindítót mutatnak be egy Olyan függvényhez, amely egy Kafka-üzenetet olvas és naplóz.

Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

A függvény aktiválásakor a következő kód fut:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

Ha eseményeket szeretne fogadni egy kötegben, állítsa az cardinality értéket many a function.json fájlban, ahogyan az alábbi példákban látható:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

A következő kód ezután elemzi az események tömbét, és naplózza az eseményadatokat:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

A következő kód a fejlécadatokat is naplózza:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

Definiálhat egy általános Avro-sémát az eseményindítónak átadott eseményhez. Az alábbi function.json definiálja az adott szolgáltató eseményindítóját egy általános Avro-sémával:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

A függvény aktiválásakor a következő kód fut:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

A működő JavaScript-példák teljes készletét a Kafka-bővítménytárban tekinti meg.

A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Kafka-eseményindítót mutatnak be egy Olyan függvényhez, amely egy Kafka-üzenetet olvas és naplóz.

Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

A függvény aktiválásakor a következő kód fut:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Ha eseményeket szeretne fogadni egy kötegben, állítsa az cardinality értéket many a function.json fájlban, ahogyan az alábbi példákban látható:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

A következő kód ezután elemzi az események tömbét, és naplózza az eseményadatokat:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

A következő kód a fejlécadatokat is naplózza:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

Definiálhat egy általános Avro-sémát az eseményindítónak átadott eseményhez. Az alábbi function.json definiálja az adott szolgáltató eseményindítóját egy általános Avro-sémával:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

A függvény aktiválásakor a következő kód fut:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

A működő PowerShell-példák teljes halmazát a Kafka-bővítménytárban tekinti meg.

A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Kafka-eseményindítót mutatnak be egy Olyan függvényhez, amely egy Kafka-üzenetet olvas és naplóz.

Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit:

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

A függvény aktiválásakor a következő kód fut:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

Ha eseményeket szeretne fogadni egy kötegben, állítsa az cardinality értéket many a function.json fájlban, ahogyan az alábbi példákban látható:

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

A következő kód ezután elemzi az események tömbét, és naplózza az eseményadatokat:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

A következő kód a fejlécadatokat is naplózza:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

Definiálhat egy általános Avro-sémát az eseményindítónak átadott eseményhez. Az alábbi function.json definiálja az adott szolgáltató eseményindítóját egy általános Avro-sémával:

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

A függvény aktiválásakor a következő kód fut:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

A működő Python-példák teljes halmazát a Kafka-bővítménytárban tekinti meg.

Az eseményindító konfigurálásához használt széljegyzetek az adott eseményszolgáltatótól függenek.

Az alábbi példa egy Java-függvényt mutat be, amely beolvassa és naplózza a Kafka-esemény tartalmát:

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

Ha eseményeket szeretne fogadni egy kötegben, használjon egy bemeneti sztringet tömbként az alábbi példában látható módon:

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

Az alábbi függvény naplózza a Kafka-esemény üzenetét és fejléceit:

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

Definiálhat egy általános Avro-sémát az eseményindítónak átadott eseményhez. Az alábbi függvény egy általános Avro-sémával rendelkező eseményindítót határoz meg az adott szolgáltatóhoz:

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

A Confluenthez készült működő Java-példák teljes készletét a Kafka-bővítménytárban tekinti meg.

Attribútumok

A C#-kódtárak a folyamaton belüli és az KafkaTriggerAttribute izolált feldolgozói folyamat C#-kódtárait is használják a függvény triggerének meghatározásához.

Az alábbi táblázat az eseményindító attribútummal beállítható tulajdonságokat ismerteti:

Paraméter Leírás
BrokerList (Kötelező) Az eseményindító által figyelt Kafka-közvetítők listája. További információt a Kapcsolatok című témakörben talál.
Téma (Kötelező) Az eseményindító által figyelt témakör.
ConsumerGroup (Nem kötelező) Az eseményindító által használt Kafka fogyasztói csoport.
AvroSchema (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor.
AuthenticationMode (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi: , Plain (alapértelmezett), ScramSha256. ScramSha512
Felhasználónév (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi. További információt a Kapcsolatok című témakörben talál.
Jelszó (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi. További információt a Kapcsolatok című témakörben talál.
Protokoll (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl, sasl_plaintext. sasl_ssl
SslCaLocation (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez.
SslCertificateLocation (Nem kötelező) Az ügyfél tanúsítványának elérési útja.
SslKeyLocation (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja.
SslKeyPassword (Nem kötelező) Az ügyfél tanúsítványának jelszava.

Jegyzetek

A KafkaTrigger széljegyzet lehetővé teszi egy olyan függvény létrehozását, amely egy témakör fogadásakor fut. A támogatott beállítások a következő elemeket tartalmazzák:

Elem Leírás
név (Kötelező) Annak a változónak a neve, amely az üzenetsort vagy a témakörüzenetet jelöli a függvénykódban.
brokerList (Kötelező) Az eseményindító által figyelt Kafka-közvetítők listája. További információt a Kapcsolatok című témakörben talál.
témakör (Kötelező) Az eseményindító által figyelt témakör.
Számossága (Nem kötelező) A trigger bemenetének számosságát jelzi. A támogatott értékek ( ONE alapértelmezett) és MANY. Akkor használható ONE , ha a bemenet egyetlen üzenet, és MANY ha a bemenet egy üzenettömb. Ha használja MANY, be kell állítania egy dataType.
Adattípus Meghatározza, hogy a Functions hogyan kezeli a paraméter értékét. Alapértelmezés szerint az érték sztringként lesz lekérve, és a Functions megpróbálja deszerializálni a sztringet a tényleges egyszerű régi Java-objektumra (POJO). Amikor stringa bemenetet csak sztringként kezeli a rendszer. Amikor binaryaz üzenet bináris adatként érkezik, és a Functions megpróbálja deszerializálni azt egy tényleges paramétertípus-bájtra[].
consumerGroup (Nem kötelező) Az eseményindító által használt Kafka fogyasztói csoport.
avroSchema (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor.
authenticationMode (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi: , Plain (alapértelmezett), ScramSha256. ScramSha512
felhasználónév (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi. További információt a Kapcsolatok című témakörben talál.
jelszó (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi. További információt a Kapcsolatok című témakörben talál.
protokoll (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl, sasl_plaintext. sasl_ssl
sslCaLocation (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez.
sslCertificateLocation (Nem kötelező) Az ügyfél tanúsítványának elérési útja.
sslKeyLocation (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja.
sslKeyPassword (Nem kötelező) Az ügyfél tanúsítványának jelszava.

Konfiguráció

Az alábbi táblázat a function.json fájlban beállított kötéskonfigurációs tulajdonságokat ismerteti.

function.json tulajdonság Leírás
type (Kötelező) A beállításnak a kafkaTriggerkövetkezőnek kell lennie: .
direction (Kötelező) A beállításnak a inkövetkezőnek kell lennie: .
név (Kötelező) Annak a változónak a neve, amely a függvénykódban a közvetített adatokat jelöli.
brokerList (Kötelező) Az eseményindító által figyelt Kafka-közvetítők listája. További információt a Kapcsolatok című témakörben talál.
témakör (Kötelező) Az eseményindító által figyelt témakör.
Számossága (Nem kötelező) A trigger bemenetének számosságát jelzi. A támogatott értékek ( ONE alapértelmezett) és MANY. Akkor használható ONE , ha a bemenet egyetlen üzenet, és MANY ha a bemenet egy üzenettömb. Ha használja MANY, be kell állítania egy dataType.
Adattípus Meghatározza, hogy a Functions hogyan kezeli a paraméter értékét. Alapértelmezés szerint az érték sztringként lesz lekérve, és a Functions megpróbálja deszerializálni a sztringet a tényleges egyszerű régi Java-objektumra (POJO). Amikor stringa bemenetet csak sztringként kezeli a rendszer. Amikor binaryaz üzenet bináris adatként érkezik, és a Functions megpróbálja deszerializálni azt egy tényleges paramétertípus-bájtra[].
consumerGroup (Nem kötelező) Az eseményindító által használt Kafka fogyasztói csoport.
avroSchema (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor.
authenticationMode (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi: , Plain (alapértelmezett), ScramSha256. ScramSha512
felhasználónév (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi. További információt a Kapcsolatok című témakörben talál.
jelszó (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi. További információt a Kapcsolatok című témakörben talál.
protokoll (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl, sasl_plaintext. sasl_ssl
sslCaLocation (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez.
sslCertificateLocation (Nem kötelező) Az ügyfél tanúsítványának elérési útja.
sslKeyLocation (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja.
sslKeyPassword (Nem kötelező) Az ügyfél tanúsítványának jelszava.

Használat

A Kafka-események jelenleg JSON hasznos adatnak minősülő sztringekként és sztringtömbökként támogatottak.

A Kafka-üzenetek JSON-hasznos adatnak minősülő sztringekként és sztringtömbökként kerülnek a függvénybe.

Prémium csomag esetén engedélyeznie kell a futtatókörnyezeti skálázás figyelését ahhoz, hogy a Kafka-kimenet több példányra is felskálázható legyen. További információ: Futtatókörnyezeti skálázás engedélyezése.

A Kafka-eseményindítók használatához nem használhatja az Azure Portal Code + Test lapjának Teszt/Futtatás funkcióját. Ehelyett közvetlenül az eseményindító által figyelt témakörnek kell elküldenie a teszteseményeket.

A Kafka-eseményindító támogatott host.json beállításainak teljes halmazát host.json beállítások között találhatja meg.

Kapcsolatok

Az eseményindítók és kötések által igényelt kapcsolati információkat az alkalmazásbeállításokban, és nem a kód kötésdefinícióiban kell tartani. Ez igaz a hitelesítő adatokra, amelyeket soha nem szabad a kódban tárolni.

Fontos

A hitelesítő adatok beállításainak egy alkalmazásbeállításra kell hivatkoznia. A kódban vagy a konfigurációs fájlokban ne írja be a hitelesítő adatokat. Helyi futtatáskor használja a local.settings.json fájlt a hitelesítő adataihoz, és ne tegye közzé a local.settings.json fájlt.

Amikor az Azure-ban a Confluent által biztosított felügyelt Kafka-fürthöz csatlakozik, győződjön meg arról, hogy a Confluent Cloud-környezet következő hitelesítési hitelesítő adatai be vannak állítva az eseményindítóban vagy kötésben:

Beállítás Javasolt érték Leírás
BrokerList BootstrapServer Az elnevezett BootstrapServer alkalmazásbeállítás a Confluent Cloud beállításai lapon található bootstrap-kiszolgáló értékét tartalmazza. Az érték a következőhöz xyz-xyzxzy.westeurope.azure.confluent.cloud:9092hasonló: .
Felhasználónév ConfluentCloudUsername A névvel ellátott ConfluentCloudUsername alkalmazásbeállítás tartalmazza a Confluent Cloud webhely api-hozzáférési kulcsát.
Jelszó ConfluentCloudPassword A névvel ellátott ConfluentCloudPassword alkalmazásbeállítás tartalmazza a Confluent Cloud webhelyről beszerzett API-titkos kódot.

Az ezekhez a beállításokhoz használt sztringértékeknek alkalmazásbeállításokként kell szerepelniük az Azure-ban vagy a Values local.settings.json fájl gyűjteményében a helyi fejlesztés során.

A , AuthenticationModeés SslCaLocation a Protocolkötésdefiníciókat is be kell állítania.

Következő lépések