Wyzwalacz platformy Apache Kafka dla usługi Azure Functions

Wyzwalacz platformy Apache Kafka w usłudze Azure Functions umożliwia uruchamianie kodu funkcji w odpowiedzi na komunikaty w tematach platformy Kafka. Możesz również użyć powiązania danych wyjściowych platformy Kafka do zapisu z funkcji do tematu. Aby uzyskać informacje na temat szczegółów konfiguracji i konfiguracji, zobacz Omówienie powiązań platformy Apache Kafka dla usługi Azure Functions.

Ważne

Powiązania platformy Kafka są dostępne tylko dla funkcji w ramach planu Elastic Premium idedykowanego (App Service). Są one obsługiwane tylko w wersji 3.x i nowszej środowiska uruchomieniowego usługi Functions.

Przykład

Użycie wyzwalacza zależy od modalności języka C# używanej w aplikacji funkcji, co może być jednym z następujących trybów:

Izolowana biblioteka klas procesów roboczych skompilowana funkcja języka C# jest uruchamiana w procesie odizolowanym od środowiska uruchomieniowego.

Używane atrybuty zależą od określonego dostawcy zdarzeń.

W poniższym przykładzie pokazano funkcję języka C#, która odczytuje i rejestruje komunikat platformy Kafka jako zdarzenie platformy Kafka:

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

Aby odbierać zdarzenia w partii, użyj tablicy ciągów jako danych wejściowych, jak pokazano w poniższym przykładzie:

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

Następująca funkcja rejestruje komunikat i nagłówki zdarzenia platformy Kafka:

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

Pełny zestaw działających przykładów platformy .NET można znaleźć w repozytorium rozszerzeń platformy Kafka.

Uwaga

Aby zapoznać się z równoważnym zestawem przykładów języka TypeScript, zobacz repozytorium rozszerzeń platformy Kafka

Określone właściwości pliku function.json zależą od dostawcy zdarzeń, który w tych przykładach to Confluent lub Azure Event Hubs. W poniższych przykładach pokazano wyzwalacz platformy Kafka dla funkcji, która odczytuje i rejestruje komunikat platformy Kafka.

Następujące function.json definiuje wyzwalacz dla określonego dostawcy:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

Następnie następujący kod jest uruchamiany po wyzwoleniu funkcji:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

Aby odbierać zdarzenia w partii, ustaw cardinality wartość na many w pliku function.json, jak pokazano w następujących przykładach:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

Poniższy kod analizuje następnie tablicę zdarzeń i rejestruje dane zdarzenia:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

Poniższy kod rejestruje również dane nagłówka:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

Można zdefiniować ogólny schemat Avro dla zdarzenia przekazanego do wyzwalacza. Następujące function.json definiuje wyzwalacz dla określonego dostawcy z ogólnym schematem Avro:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Następnie następujący kod jest uruchamiany po wyzwoleniu funkcji:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

Pełny zestaw działających przykładów języka JavaScript można znaleźć w repozytorium rozszerzeń platformy Kafka.

Określone właściwości pliku function.json zależą od dostawcy zdarzeń, który w tych przykładach to Confluent lub Azure Event Hubs. W poniższych przykładach pokazano wyzwalacz platformy Kafka dla funkcji, która odczytuje i rejestruje komunikat platformy Kafka.

Następujące function.json definiuje wyzwalacz dla określonego dostawcy:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Następnie następujący kod jest uruchamiany po wyzwoleniu funkcji:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Aby odbierać zdarzenia w partii, ustaw cardinality wartość na many w pliku function.json, jak pokazano w następujących przykładach:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Poniższy kod analizuje następnie tablicę zdarzeń i rejestruje dane zdarzenia:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

Poniższy kod rejestruje również dane nagłówka:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

Można zdefiniować ogólny schemat Avro dla zdarzenia przekazanego do wyzwalacza. Następujące function.json definiuje wyzwalacz dla określonego dostawcy z ogólnym schematem Avro:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Następnie następujący kod jest uruchamiany po wyzwoleniu funkcji:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Pełny zestaw działających przykładów programu PowerShell można znaleźć w repozytorium rozszerzeń platformy Kafka.

Określone właściwości pliku function.json zależą od dostawcy zdarzeń, który w tych przykładach to Confluent lub Azure Event Hubs. W poniższych przykładach pokazano wyzwalacz platformy Kafka dla funkcji, która odczytuje i rejestruje komunikat platformy Kafka.

Następujące function.json definiuje wyzwalacz dla określonego dostawcy:

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

Następnie następujący kod jest uruchamiany po wyzwoleniu funkcji:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

Aby odbierać zdarzenia w partii, ustaw cardinality wartość na many w pliku function.json, jak pokazano w następujących przykładach:

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

Poniższy kod analizuje następnie tablicę zdarzeń i rejestruje dane zdarzenia:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

Poniższy kod rejestruje również dane nagłówka:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

Można zdefiniować ogólny schemat Avro dla zdarzenia przekazanego do wyzwalacza. Następujące function.json definiuje wyzwalacz dla określonego dostawcy z ogólnym schematem Avro:

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Następnie następujący kod jest uruchamiany po wyzwoleniu funkcji:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

Pełny zestaw działających przykładów języka Python można znaleźć w repozytorium rozszerzeń platformy Kafka.

Adnotacje używane do konfigurowania wyzwalacza zależą od określonego dostawcy zdarzeń.

W poniższym przykładzie pokazano funkcję Języka Java, która odczytuje i rejestruje zawartość zdarzenia platformy Kafka:

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

Aby odbierać zdarzenia w partii, użyj ciągu wejściowego jako tablicy, jak pokazano w poniższym przykładzie:

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

Następująca funkcja rejestruje komunikat i nagłówki zdarzenia platformy Kafka:

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

Można zdefiniować ogólny schemat Avro dla zdarzenia przekazanego do wyzwalacza. Poniższa funkcja definiuje wyzwalacz dla określonego dostawcy z ogólnym schematem Avro:

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

Pełny zestaw działających przykładów języka Java dla platformy Confluent można znaleźć w repozytorium rozszerzeń platformy Kafka.

Atrybuty

Biblioteki języka C# procesu roboczego zarówno w procesie przetwarzania procesów procesowych, jak i izolowanych, używają KafkaTriggerAttribute elementu , aby zdefiniować wyzwalacz funkcji.

W poniższej tabeli opisano właściwości, które można ustawić przy użyciu tego atrybutu wyzwalacza:

Parametr Opis
Lista brokerów (Wymagane) Lista brokerów platformy Kafka monitorowanych przez wyzwalacz. Aby uzyskać więcej informacji, zobacz Połączenie ions.
Temat (Wymagane) Temat monitorowany przez wyzwalacz.
ConsumerGroup (Opcjonalnie) Grupa odbiorców platformy Kafka używana przez wyzwalacz.
AvroSchema (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro.
Authenticationmode (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to Gssapi, Plain (wartość domyślna), ScramSha256, ScramSha512.
Nazwa użytkownika (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenie ions.
Hasło (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenie ions.
Protokół (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to plaintext (wartość domyślna), ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera.
SslCertificateLocation (Opcjonalnie) Ścieżka do certyfikatu klienta.
SslKeyLocation (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania.
SslKeyPassword (Opcjonalnie) Hasło do certyfikatu klienta.

Adnotacje

Adnotacja KafkaTrigger umożliwia utworzenie funkcji uruchamianej po odebraniu tematu. Obsługiwane opcje obejmują następujące elementy:

Element opis
name (Wymagane) Nazwa zmiennej reprezentującej komunikat kolejki lub tematu w kodzie funkcji.
brokerList (Wymagane) Lista brokerów platformy Kafka monitorowanych przez wyzwalacz. Aby uzyskać więcej informacji, zobacz Połączenie ions.
topic (Wymagane) Temat monitorowany przez wyzwalacz.
Kardynalność (Opcjonalnie) Wskazuje kardynalność danych wejściowych wyzwalacza. Obsługiwane wartości to ONE (wartość domyślna) i MANY. Użyj ONE , gdy dane wejściowe są pojedynczym komunikatem, a MANY dane wejściowe są tablicą komunikatów. W przypadku używania MANYdataTypeelementu należy również ustawić wartość .
Datatype Definiuje sposób obsługi wartości parametru przez funkcję Functions. Domyślnie wartość jest uzyskiwana jako ciąg, a usługa Functions próbuje wykonać deserializacji ciągu do rzeczywistego zwykłego obiektu Java (POJO). Gdy stringelement wejściowy jest traktowany jako tylko ciąg. Gdy binarykomunikat zostanie odebrany jako dane binarne, a usługa Functions próbuje wykonać deserializacji go do rzeczywistego bajtu typu parametru[].
consumerGroup (Opcjonalnie) Grupa odbiorców platformy Kafka używana przez wyzwalacz.
avroSchema (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro.
Authenticationmode (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to Gssapi, Plain (wartość domyślna), ScramSha256, ScramSha512.
Nazwę użytkownika (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenie ions.
Hasło (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenie ions.
Protokół (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to plaintext (wartość domyślna), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera.
sslCertificateLocation (Opcjonalnie) Ścieżka do certyfikatu klienta.
sslKeyLocation (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania.
sslKeyPassword (Opcjonalnie) Hasło do certyfikatu klienta.

Konfigurowanie

W poniższej tabeli opisano właściwości konfiguracji powiązania ustawione w pliku function.json .

właściwość function.json opis
type (Wymagane) Musi być ustawiona wartość kafkaTrigger.
direction (Wymagane) Musi być ustawiona wartość in.
name (Wymagane) Nazwa zmiennej reprezentującej dane obsługiwane przez brokera w kodzie funkcji.
brokerList (Wymagane) Lista brokerów platformy Kafka monitorowanych przez wyzwalacz. Aby uzyskać więcej informacji, zobacz Połączenie ions.
topic (Wymagane) Temat monitorowany przez wyzwalacz.
Kardynalność (Opcjonalnie) Wskazuje kardynalność danych wejściowych wyzwalacza. Obsługiwane wartości to ONE (wartość domyślna) i MANY. Użyj ONE , gdy dane wejściowe są pojedynczym komunikatem, a MANY dane wejściowe są tablicą komunikatów. W przypadku używania MANYdataTypeelementu należy również ustawić wartość .
Datatype Definiuje sposób obsługi wartości parametru przez funkcję Functions. Domyślnie wartość jest uzyskiwana jako ciąg, a usługa Functions próbuje wykonać deserializacji ciągu do rzeczywistego zwykłego obiektu Java (POJO). Gdy stringelement wejściowy jest traktowany jako tylko ciąg. Gdy binarykomunikat zostanie odebrany jako dane binarne, a usługa Functions próbuje wykonać deserializacji go do rzeczywistego bajtu typu parametru[].
consumerGroup (Opcjonalnie) Grupa odbiorców platformy Kafka używana przez wyzwalacz.
avroSchema (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro.
Authenticationmode (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to Gssapi, Plain (wartość domyślna), ScramSha256, ScramSha512.
Nazwę użytkownika (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenie ions.
Hasło (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenie ions.
Protokół (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to plaintext (wartość domyślna), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera.
sslCertificateLocation (Opcjonalnie) Ścieżka do certyfikatu klienta.
sslKeyLocation (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania.
sslKeyPassword (Opcjonalnie) Hasło do certyfikatu klienta.

Użycie

Zdarzenia platformy Kafka są obecnie obsługiwane jako ciągi i tablice ciągów, które są ładunkami JSON.

Komunikaty platformy Kafka są przekazywane do funkcji jako ciągi i tablice ciągów, które są ładunkami JSON.

W planie Premium należy włączyć monitorowanie skalowania w czasie wykonywania dla danych wyjściowych platformy Kafka, aby móc skalować w poziomie do wielu wystąpień. Aby dowiedzieć się więcej, zobacz Włączanie skalowania środowiska uruchomieniowego.

Do pracy z wyzwalaczami platformy Kafka nie można użyć funkcji Testowanie/uruchamianie strony Kod i testowanie w witrynie Azure Portal. Zamiast tego należy wysyłać zdarzenia testowe bezpośrednio do tematu monitorowanego przez wyzwalacz.

Aby uzyskać pełny zestaw obsługiwanych ustawień host.json wyzwalacza platformy Kafka, zobacz host.json ustawienia.

Połączenia

Wszystkie informacje o połączeniu wymagane przez wyzwalacze i powiązania powinny być przechowywane w ustawieniach aplikacji, a nie w definicjach powiązań w kodzie. Dotyczy to poświadczeń, które nigdy nie powinny być przechowywane w kodzie.

Ważne

Ustawienia poświadczeń muszą odwoływać się do ustawienia aplikacji. Nie należy zapisywać poświadczeń w kodzie ani plikach konfiguracji. W przypadku uruchamiania lokalnego użyj pliku local.settings.json dla poświadczeń i nie publikuj pliku local.settings.json.

Podczas nawiązywania połączenia z zarządzanym klastrem Platformy Kafka udostępnianym przez platformę Confluent na platformie Azure upewnij się, że następujące poświadczenia uwierzytelniania dla środowiska platformy Confluent Cloud zostały ustawione w wyzwalaczu lub powiązaniu:

Ustawienie Zalecana wartość opis
Lista brokerów BootstrapServer Ustawienie aplikacji o nazwie BootstrapServer zawiera wartość serwera bootstrap znalezionego na stronie ustawień chmury Confluent. Wartość przypomina xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Nazwa użytkownika ConfluentCloudUsername Ustawienie aplikacji o nazwie ConfluentCloudUsername zawiera klucz dostępu interfejsu API z witryny internetowej Confluent Cloud.
Hasło ConfluentCloudPassword Ustawienie aplikacji o nazwie ConfluentCloudPassword zawiera wpis tajny interfejsu API uzyskany z witryny internetowej platformy Confluent Cloud.

Wartości ciągu używane dla tych ustawień muszą być obecne jako ustawienia aplikacji na platformie Azure lub w Values kolekcji w pliku local.settings.json podczas programowania lokalnego.

Należy również ustawić Protocoldefinicje powiązań , AuthenticationModei SslCaLocation .

Następne kroki