Aracılığıyla paylaş


Azure İşlevleri için Apache Kafka tetikleyicisi

kafka konularındaki iletilere yanıt olarak işlev kodunuzu çalıştırmak için Azure İşlevleri Apache Kafka tetikleyicisini kullanabilirsiniz. Ayrıca, işlevinizden bir konuya yazmak için Kafka çıkış bağlaması da kullanabilirsiniz. Kurulum ve yapılandırma ayrıntıları hakkında bilgi için bkz. Azure İşlevleri genel bakış için Apache Kafka bağlamaları.

Önemli

Kafka bağlamaları yalnızca Elastik Premium Plan ve Ayrılmış (App Service) planındaki İşlevler için kullanılabilir. Bunlar yalnızca İşlevler çalışma zamanının 3.x ve sonraki sürümlerinde desteklenir.

Örnek

Tetikleyicinin kullanımı, işlev uygulamanızda kullanılan C# modalitesine bağlıdır ve bu modlar aşağıdaki modlardan biri olabilir:

Yalıtılmış çalışan işlem sınıfı kitaplığı derlenmiş C# işlevi çalışma zamanından yalıtılmış bir işlemde çalışır.

Kullandığınız öznitelikler belirli olay sağlayıcısına bağlıdır.

Aşağıdaki örnekte, Kafka iletisini Kafka olayı olarak okuyan ve günlüğe kaydeden bir C# işlevi gösterilmektedir:

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

Olayları toplu olarak almak için, aşağıdaki örnekte gösterildiği gibi giriş olarak bir dize dizisi kullanın:

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

Aşağıdaki işlev, Kafka Olayı için iletiyi ve üst bilgileri günlüğe kaydeder:

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

Çalışan .NET örneklerinin tamamı için kafka uzantısı deposuna bakın.

Not

Eşdeğer bir TypeScript örnekleri kümesi için bkz . Kafka uzantısı deposu

function.json dosyasının belirli özellikleri, bu örneklerde Confluent veya Azure Event Hubs olan olay sağlayıcınıza bağlıdır. Aşağıdaki örneklerde, Kafka iletisini okuyan ve günlüğe kaydeden bir işlev için Kafka tetikleyicisi gösterilmektedir.

Aşağıdaki function.json, belirli bir sağlayıcı için tetikleyiciyi tanımlar:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

Ardından işlev tetiklendiğinde aşağıdaki kod çalıştırılır:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

Olayları toplu olarak almak için, aşağıdaki örneklerde gösterildiği gibi değeri many function.json dosyasında olarak ayarlayıncardinality:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

Aşağıdaki kod daha sonra olay dizisini ayrıştırıp olay verilerini günlüğe kaydeder:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

Aşağıdaki kod ayrıca üst bilgi verilerini günlüğe kaydeder:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

Tetikleyiciye geçirilen olay için genel bir Avro şeması tanımlayabilirsiniz. Aşağıdaki function.json, genel bir Avro şemasına sahip belirli sağlayıcının tetikleyicisini tanımlar:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Ardından işlev tetiklendiğinde aşağıdaki kod çalıştırılır:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

Çalışan JavaScript örneklerinin tamamı için kafka uzantısı deposuna bakın.

function.json dosyasının belirli özellikleri, bu örneklerde Confluent veya Azure Event Hubs olan olay sağlayıcınıza bağlıdır. Aşağıdaki örneklerde, Kafka iletisini okuyan ve günlüğe kaydeden bir işlev için Kafka tetikleyicisi gösterilmektedir.

Aşağıdaki function.json, belirli bir sağlayıcı için tetikleyiciyi tanımlar:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Ardından işlev tetiklendiğinde aşağıdaki kod çalıştırılır:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Olayları toplu olarak almak için, aşağıdaki örneklerde gösterildiği gibi değeri many function.json dosyasında olarak ayarlayıncardinality:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Aşağıdaki kod daha sonra olay dizisini ayrıştırıp olay verilerini günlüğe kaydeder:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

Aşağıdaki kod ayrıca üst bilgi verilerini günlüğe kaydeder:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

Tetikleyiciye geçirilen olay için genel bir Avro şeması tanımlayabilirsiniz. Aşağıdaki function.json, genel bir Avro şemasına sahip belirli sağlayıcının tetikleyicisini tanımlar:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Ardından işlev tetiklendiğinde aşağıdaki kod çalıştırılır:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Çalışan PowerShell örneklerinin tamamı için kafka uzantısı deposuna bakın.

function.json dosyasının belirli özellikleri, bu örneklerde Confluent veya Azure Event Hubs olan olay sağlayıcınıza bağlıdır. Aşağıdaki örneklerde, Kafka iletisini okuyan ve günlüğe kaydeden bir işlev için Kafka tetikleyicisi gösterilmektedir.

Aşağıdaki function.json, belirli bir sağlayıcı için tetikleyiciyi tanımlar:

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

Ardından işlev tetiklendiğinde aşağıdaki kod çalıştırılır:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

Olayları toplu olarak almak için, aşağıdaki örneklerde gösterildiği gibi değeri many function.json dosyasında olarak ayarlayıncardinality:

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

Aşağıdaki kod daha sonra olay dizisini ayrıştırıp olay verilerini günlüğe kaydeder:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

Aşağıdaki kod ayrıca üst bilgi verilerini günlüğe kaydeder:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

Tetikleyiciye geçirilen olay için genel bir Avro şeması tanımlayabilirsiniz. Aşağıdaki function.json, genel bir Avro şemasına sahip belirli sağlayıcının tetikleyicisini tanımlar:

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Ardından işlev tetiklendiğinde aşağıdaki kod çalıştırılır:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

Çalışan Python örneklerinin tamamı için kafka uzantısı deposuna bakın.

Tetikleyicinizi yapılandırmak için kullandığınız ek açıklamalar belirli olay sağlayıcısına bağlıdır.

Aşağıdaki örnekte, Kafka olayının içeriğini okuyan ve günlüğe kaydeden bir Java işlevi gösterilmektedir:

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

Bir toplu iş içindeki olayları almak için, aşağıdaki örnekte gösterildiği gibi bir giriş dizesini dizi olarak kullanın:

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

Aşağıdaki işlev, Kafka Olayı için iletiyi ve üst bilgileri günlüğe kaydeder:

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

Tetikleyiciye geçirilen olay için genel bir Avro şeması tanımlayabilirsiniz. Aşağıdaki işlev, genel avro şemasına sahip belirli sağlayıcı için bir tetikleyici tanımlar:

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

Confluent için çalışan java örneklerinin tamamı için Kafka uzantısı deposuna bakın.

Özellikler

hem işlem içi hem de yalıtılmış çalışan işlemi C# kitaplıkları, işlev tetikleyicisini tanımlamak için öğesini KafkaTriggerAttribute kullanır.

Aşağıdaki tabloda, bu tetikleyici özniteliğini kullanarak ayarlayabileceğiniz özellikler açıklanmaktadır:

Parametre Açıklama
BrokerList (Gerekli) Tetikleyici tarafından izlenen Kafka aracılarının listesi. Daha fazla bilgi için bkz . Bağlantılar .
Konu (Gerekli) Tetikleyici tarafından izlenen konu.
ConsumerGroup (İsteğe bağlı) Tetikleyici tarafından kullanılan Kafka tüketici grubu.
AvroSchema (İsteğe bağlı) Avro protokolü kullanılırken genel bir kaydın şeması.
AuthenticationMode (İsteğe bağlı) Basit Kimlik Doğrulaması ve Güvenlik Katmanı (SASL) kimlik doğrulaması kullanılırken kullanılan kimlik doğrulama modu. Desteklenen değerler şunlardır: Gssapi, Plain (varsayılan), ScramSha256, ScramSha512.
Kullanıcı adı (İsteğe bağlı) SASL kimlik doğrulaması için kullanıcı adı. olduğunda AuthenticationMode Gssapidesteklenmez. Daha fazla bilgi için bkz . Bağlantılar .
Parola (İsteğe bağlı) SASL kimlik doğrulamasının parolası. olduğunda AuthenticationMode Gssapidesteklenmez. Daha fazla bilgi için bkz . Bağlantılar .
Protokol (İsteğe bağlı) Aracılarla iletişim kurarken kullanılan güvenlik protokolü. Desteklenen değerler şunlardır plaintext : (varsayılan), ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (İsteğe bağlı) Aracının sertifikasını doğrulamak için CA sertifika dosyasının yolu.
SslCertificateLocation (İsteğe bağlı) İstemci sertifikasının yolu.
SslKeyLocation (İsteğe bağlı) Kimlik doğrulaması için kullanılan istemcinin özel anahtarının (PEM) yolu.
SslKeyPassword (İsteğe bağlı) İstemci sertifikasının parolası.

Ek Açıklamalar

Ek KafkaTrigger açıklama, konu alındığında çalışan bir işlev oluşturmanıza olanak tanır. Desteklenen seçenekler aşağıdaki öğeleri içerir:

Öğe Açıklama
ad (Gerekli) İşlev kodundaki kuyruk veya konu iletisini temsil eden değişkenin adı.
brokerList (Gerekli) Tetikleyici tarafından izlenen Kafka aracılarının listesi. Daha fazla bilgi için bkz . Bağlantılar .
topic (Gerekli) Tetikleyici tarafından izlenen konu.
Önem düzeyi (İsteğe bağlı) Tetikleyici girişinin kardinalitesini gösterir. Desteklenen değerler (varsayılan) ve MANY'tir ONE . Giriş tek bir ileti olduğunda ve MANY giriş bir ileti dizisi olduğunda kullanınONE. kullanırken MANYbir de ayarlamanız dataTypegerekir.
Datatype İşlevler'in parametre değerini nasıl işlediğini tanımlar. Varsayılan olarak, değer bir dize olarak elde edilir ve İşlevler dizeyi gerçek düz eski Java nesnesine (POJO) seri durumdan çıkarmaya çalışır. olduğunda string, giriş yalnızca bir dize olarak değerlendirilir. olduğunda binary, ileti ikili veri olarak alınır ve İşlevler bunu gerçek parametre türü bayt[] olarak seri durumdan çıkarmaya çalışır.
consumerGroup (İsteğe bağlı) Tetikleyici tarafından kullanılan Kafka tüketici grubu.
avroSchema (İsteğe bağlı) Avro protokolü kullanılırken genel bir kaydın şeması.
authenticationMode (İsteğe bağlı) Basit Kimlik Doğrulaması ve Güvenlik Katmanı (SASL) kimlik doğrulaması kullanılırken kullanılan kimlik doğrulama modu. Desteklenen değerler şunlardır: Gssapi, Plain (varsayılan), ScramSha256, ScramSha512.
kullanıcı adı (İsteğe bağlı) SASL kimlik doğrulaması için kullanıcı adı. olduğunda AuthenticationMode Gssapidesteklenmez. Daha fazla bilgi için bkz . Bağlantılar .
parola (İsteğe bağlı) SASL kimlik doğrulamasının parolası. olduğunda AuthenticationMode Gssapidesteklenmez. Daha fazla bilgi için bkz . Bağlantılar .
protokol (İsteğe bağlı) Aracılarla iletişim kurarken kullanılan güvenlik protokolü. Desteklenen değerler şunlardır plaintext : (varsayılan), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (İsteğe bağlı) Aracının sertifikasını doğrulamak için CA sertifika dosyasının yolu.
sslCertificateLocation (İsteğe bağlı) İstemci sertifikasının yolu.
sslKeyLocation (İsteğe bağlı) Kimlik doğrulaması için kullanılan istemcinin özel anahtarının (PEM) yolu.
sslKeyPassword (İsteğe bağlı) İstemci sertifikasının parolası.

Yapılandırma

Aşağıdaki tabloda, function.json dosyasında ayarladığınız bağlama yapılandırma özellikleri açıklanmaktadır.

function.json özelliği Açıklama
type (Gerekli) olarak ayarlanmalıdır kafkaTrigger.
direction (Gerekli) olarak ayarlanmalıdır in.
ad (Gerekli) İşlev kodunda aracılı verileri temsil eden değişkenin adı.
brokerList (Gerekli) Tetikleyici tarafından izlenen Kafka aracılarının listesi. Daha fazla bilgi için bkz . Bağlantılar .
topic (Gerekli) Tetikleyici tarafından izlenen konu.
Önem düzeyi (İsteğe bağlı) Tetikleyici girişinin kardinalitesini gösterir. Desteklenen değerler (varsayılan) ve MANY'tir ONE . Giriş tek bir ileti olduğunda ve MANY giriş bir ileti dizisi olduğunda kullanınONE. kullanırken MANYbir de ayarlamanız dataTypegerekir.
Datatype İşlevler'in parametre değerini nasıl işlediğini tanımlar. Varsayılan olarak, değer bir dize olarak elde edilir ve İşlevler dizeyi gerçek düz eski Java nesnesine (POJO) seri durumdan çıkarmaya çalışır. olduğunda string, giriş yalnızca bir dize olarak değerlendirilir. olduğunda binary, ileti ikili veri olarak alınır ve İşlevler bunu gerçek parametre türü bayt[] olarak seri durumdan çıkarmaya çalışır.
consumerGroup (İsteğe bağlı) Tetikleyici tarafından kullanılan Kafka tüketici grubu.
avroSchema (İsteğe bağlı) Avro protokolü kullanılırken genel bir kaydın şeması.
authenticationMode (İsteğe bağlı) Basit Kimlik Doğrulaması ve Güvenlik Katmanı (SASL) kimlik doğrulaması kullanılırken kullanılan kimlik doğrulama modu. Desteklenen değerler şunlardır: Gssapi, Plain (varsayılan), ScramSha256, ScramSha512.
kullanıcı adı (İsteğe bağlı) SASL kimlik doğrulaması için kullanıcı adı. olduğunda AuthenticationMode Gssapidesteklenmez. Daha fazla bilgi için bkz . Bağlantılar .
parola (İsteğe bağlı) SASL kimlik doğrulamasının parolası. olduğunda AuthenticationMode Gssapidesteklenmez. Daha fazla bilgi için bkz . Bağlantılar .
protokol (İsteğe bağlı) Aracılarla iletişim kurarken kullanılan güvenlik protokolü. Desteklenen değerler şunlardır plaintext : (varsayılan), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (İsteğe bağlı) Aracının sertifikasını doğrulamak için CA sertifika dosyasının yolu.
sslCertificateLocation (İsteğe bağlı) İstemci sertifikasının yolu.
sslKeyLocation (İsteğe bağlı) Kimlik doğrulaması için kullanılan istemcinin özel anahtarının (PEM) yolu.
sslKeyPassword (İsteğe bağlı) İstemci sertifikasının parolası.

Kullanım

Kafka olayları şu anda JSON yükleri olan dizeler ve dize dizileri olarak desteklenmektedir.

Kafka iletileri, işlevine JSON yükleri olan dizeler ve dize dizileri olarak geçirilir.

Premium planda, Kafka çıkışının birden çok örneğe ölçeği genişletebilmesi için çalışma zamanı ölçeği izlemeyi etkinleştirmeniz gerekir. Daha fazla bilgi edinmek için bkz . Çalışma zamanı ölçeklendirmesini etkinleştirme.

Kafka tetikleyicileriyle çalışmak için Azure Portal'daki Kod + Test sayfasının Test/Çalıştırma özelliğini kullanamazsınız. Bunun yerine, test olaylarını tetikleyici tarafından izlenen konuya doğrudan göndermeniz gerekir.

Kafka tetikleyicisi için desteklenen host.json ayarlarının tamamı için bkz . host.json ayarları.

Bağlantılar

Tetikleyicileriniz ve bağlamalarınız için gereken tüm bağlantı bilgileri, kodunuzdaki bağlama tanımlarında değil uygulama ayarlarında tutulmalıdır. Bu, kodunuzda hiçbir zaman depolanmaması gereken kimlik bilgileri için geçerlidir.

Önemli

Kimlik bilgileri ayarları bir uygulama ayarına başvurmalıdır. Kod veya yapılandırma dosyalarınızda kimlik bilgilerini sabit kodlamayın. Yerel olarak çalışırken, kimlik bilgileriniz için local.settings.json dosyasını kullanın ve local.settings.json dosyasını yayımlamayın.

Azure'da Confluent tarafından sağlanan yönetilen bir Kafka kümesine bağlanırken, Confluent Cloud ortamınız için aşağıdaki kimlik doğrulama kimlik bilgilerinin tetikleyicinizde veya bağlamanızda ayarlandığından emin olun:

Ayar Önerilen değer Açıklama
BrokerList BootstrapServer adlı BootstrapServer uygulama ayarı, Confluent Bulut ayarları sayfasında bulunan bootstrap sunucusunun değerini içerir. değerine benzer.xyz-xyzxzy.westeurope.azure.confluent.cloud:9092
Kullanıcı adı ConfluentCloudUsername adlı ConfluentCloudUsername uygulama ayarı Confluent Cloud web sitesinden API erişim anahtarını içerir.
Parola ConfluentCloudPassword adlı ConfluentCloudPassword uygulama ayarı Confluent Cloud web sitesinden alınan API gizli dizisini içerir.

Bu ayarlar için kullandığınız dize değerleri, yerel geliştirme sırasında Azure'da veya Values local.settings.json dosyasındaki koleksiyonda uygulama ayarları olarak bulunmalıdır.

Bağlama tanımlarınızda , AuthenticationModeve SslCaLocation değerlerini de ayarlamanız Protocolgerekir.

Sonraki adımlar