Bagikan melalui


Pemicu Apache Kafka untuk Azure Functions

Anda dapat menggunakan pemicu Apache Kafka di Azure Functions untuk menjalankan kode fungsi Anda sebagai respons terhadap pesan dalam topik Kafka. Anda juga dapat menggunakan pengikatan output Kafka untuk menulis dari fungsi Anda ke topik. Untuk informasi tentang detail penyiapan dan konfigurasi, lihat Pengikatan Apache Kafka untuk ringkasan Azure Functions.

Penting

Pengikatan Kafka hanya tersedia untuk Functions pada Paket Elastic Premium dan paket Dedicated (App Service). Paket tersebut hanya didukung pada runtime Functions versi 3.x dan versi yang lebih baru.

Contoh

Penggunaan pemicu bergantung pada modalitas C# yang digunakan di aplikasi fungsi Anda, yang dapat berupa salah satu mode berikut:

Pustaka kelas proses pekerja terisolasi yang dikompilasi fungsi C# berjalan dalam proses yang diisolasi dari runtime.

Atribut yang Anda gunakan bergantung pada penyedia kejadian tertentu.

Contoh berikut menunjukkan fungsi C# yang membaca dan mencatat pesan Kafka sebagai peristiwa Kafka:

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

Untuk menerima peristiwa dalam batch, gunakan array string sebagai input, seperti yang ditunjukkan dalam contoh berikut:

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

Fungsi berikut mencatat pesan dan header untuk Peristiwa Kafka:

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

Untuk set contoh .NET yang berfungsi lengkap, lihat Repositori ekstensi Kafka.

Catatan

Untuk set contoh TypeScript yang setara, lihat Repositori ekstensi Kafka

Properti spesifik file function.json bergantung pada penyedia kejadian Anda, yang dalam contoh ini adalah Confluent atau Azure Event Hubs. Contoh berikut menunjukkan pemicu Kafka untuk fungsi yang membaca dan mencatat pesan Kafka.

Function.json berikut mendefinisikan pemicu untuk penyedia tertentu:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

Kode berikut kemudian berjalan ketika fungsi dipicu:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

Untuk menerima acara dalam batch, atur cardinality ke many di file function.js, seperti yang diperlihatkan dalam contoh berikut.

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

Kode berikut kemudian mengurai array peristiwa dan mencatat data peristiwa:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

Kode berikut juga mencatat data header:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

Anda dapat menentukan skema Avro generik untuk peristiwa yang diteruskan ke pemicu. Function.json berikut mendefinisikan pemicu untuk penyedia tertentu dengan skema Avro generik:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Kode berikut kemudian berjalan ketika fungsi dipicu:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

Untuk set contoh JavaScript yang berfungsi lengkap, lihat Repositori ekstensi Kafka.

Properti spesifik file function.json bergantung pada penyedia kejadian Anda, yang dalam contoh ini adalah Confluent atau Azure Event Hubs. Contoh berikut menunjukkan pemicu Kafka untuk fungsi yang membaca dan mencatat pesan Kafka.

Function.json berikut mendefinisikan pemicu untuk penyedia tertentu:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Kode berikut kemudian berjalan ketika fungsi dipicu:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Untuk menerima acara dalam batch, atur cardinality ke many di file function.js, seperti yang diperlihatkan dalam contoh berikut.

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Kode berikut kemudian mengurai array peristiwa dan mencatat data peristiwa:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

Kode berikut juga mencatat data header:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

Anda dapat menentukan skema Avro generik untuk peristiwa yang diteruskan ke pemicu. Function.json berikut mendefinisikan pemicu untuk penyedia tertentu dengan skema Avro generik:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Kode berikut kemudian berjalan ketika fungsi dipicu:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Untuk set contoh PowerShell yang berfungsi lengkap, lihat Repositori ekstensi Kafka.

Properti spesifik file function.json bergantung pada penyedia kejadian Anda, yang dalam contoh ini adalah Confluent atau Azure Event Hubs. Contoh berikut menunjukkan pemicu Kafka untuk fungsi yang membaca dan mencatat pesan Kafka.

Function.json berikut mendefinisikan pemicu untuk penyedia tertentu:

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

Kode berikut kemudian berjalan ketika fungsi dipicu:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

Untuk menerima acara dalam batch, atur cardinality ke many di file function.js, seperti yang diperlihatkan dalam contoh berikut.

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

Kode berikut kemudian mengurai array peristiwa dan mencatat data peristiwa:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

Kode berikut juga mencatat data header:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

Anda dapat menentukan skema Avro generik untuk peristiwa yang diteruskan ke pemicu. Function.json berikut mendefinisikan pemicu untuk penyedia tertentu dengan skema Avro generik:

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Kode berikut kemudian berjalan ketika fungsi dipicu:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

Untuk set contoh Python yang berfungsi lengkap, lihat Repositori ekstensi Kafka.

Anotasi yang Anda gunakan untuk mengonfigurasi pemicu bergantung pada penyedia peristiwa tertentu.

Contoh berikut menunjukkan fungsi Java yang membaca dan mencatat konten peristiwa Kafka:

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

Untuk menerima peristiwa dalam batch, gunakan string input sebagai array, seperti yang ditunjukkan dalam contoh berikut:

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

Fungsi berikut mencatat pesan dan header untuk Peristiwa Kafka:

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

Anda dapat menentukan skema Avro generik untuk peristiwa yang diteruskan ke pemicu. Fungsi berikut menentukan pemicu untuk penyedia tertentu dengan skema Avro generik:

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

Untuk set contoh Java yang berfungsi lengkap bagi Confluent, lihat Repositori ekstensi Kafka.

Atribut

Pustaka C# proses dalam proses dan terisolasi menggunakan KafkaTriggerAttribute untuk menentukan pemicu fungsi.

Tabel berikut menjelaskan properti yang dapat Anda atur menggunakan atribut pemicu ini:

Parameter Deskripsi
BrokerList (Diperlukan) Daftar broker Kafka yang dipantau oleh pemicunya. Lihat Koneksi untuk informasi selengkapnya.
Topik (Diperlukan) Topik yang dipantau oleh pemicu.
ConsumerGroup (Opsional) Grup konsumen Kafka yang digunakan oleh pemicu.
AvroSchema (Opsional) Skema rekaman generik saat menggunakan protokol Avro.
AuthenticationMode (Opsional) Mode autentikasi saat menggunakan Autentikasi Sederhana dan Lapisan Keamanan (SASL). Nilai yang didukung adalah Gssapi, Plain (default), ScramSha256, ScramSha512.
Username (Opsional) Nama pengguna untuk autentikasi SASL. Tidak didukung ketika AuthenticationMode adalah Gssapi. Lihat Koneksi untuk informasi selengkapnya.
Password (Opsional) Kata sandi untuk autentikasi SASL. Tidak didukung ketika AuthenticationMode adalah Gssapi. Lihat Koneksi untuk informasi selengkapnya.
Protokol (Opsional) Protokol keamanan yang digunakan saat berkomunikasi dengan broker. Nilai yang didukung adalah plaintext (default), ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (Opsional) Jalur ke file sertifikat CA untuk memverifikasi sertifikat broker.
SslCertificateLocation (Opsional) Jalur ke sertifikat klien.
SslKeyLocation (Opsional) Jalur ke kunci privat (PEM) klien yang digunakan untuk autentikasi.
SslKeyPassword (Opsional) Kata sandi untuk sertifikat klien.

Anotasi

Anotasi KafkaTrigger memungkinkan Anda membuat fungsi yang berjalan saat topik diterima. Opsi yang didukung mencakup elemen berikut:

Elemen Deskripsi
nama (Diperlukan) Nama variabel yang mewakili antrian atau pesan topik dalam kode fungsi.
brokerList (Diperlukan) Daftar broker Kafka yang dipantau oleh pemicunya. Lihat Koneksi untuk informasi selengkapnya.
topik (Diperlukan) Topik yang dipantau oleh pemicu.
kardinalitas (Opsional) Menunjukkan kardinalitas input pemicu. Nilai yang didukung adalah ONE (default), dan MANY. Gunakan ONE saat input adalah satu pesan dan MANY ketika input adalah array pesan. Saat Anda menggunakan MANY, Anda juga harus mengatur dataType.
dataType Menentukan cara Functions menangani nilai parameter. Secara default, nilai diperoleh sebagai string dan Functions mencoba mendeserialisasi string ke objek Java biasa (POJO) aktual. Ketika string, input diperlakukan hanya sebagai string. Ketika binary, pesan diterima sebagai data biner, dan Functions mencoba mendeserialisasinya ke byte jenis parameter aktual[].
consumerGroup (Opsional) Grup konsumen Kafka yang digunakan oleh pemicu.
avroSchema (Opsional) Skema rekaman generik saat menggunakan protokol Avro.
authenticationMode (Opsional) Mode autentikasi saat menggunakan Autentikasi Sederhana dan Lapisan Keamanan (SASL). Nilai yang didukung adalah Gssapi, Plain (default), ScramSha256, ScramSha512.
username (Opsional) Nama pengguna untuk autentikasi SASL. Tidak didukung ketika AuthenticationMode adalah Gssapi. Lihat Koneksi untuk informasi selengkapnya.
kata sandi (Opsional) Kata sandi untuk autentikasi SASL. Tidak didukung ketika AuthenticationMode adalah Gssapi. Lihat Koneksi untuk informasi selengkapnya.
protokol (Opsional) Protokol keamanan yang digunakan saat berkomunikasi dengan broker. Nilai yang didukung adalah plaintext (default), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Opsional) Jalur ke file sertifikat CA untuk memverifikasi sertifikat broker.
sslCertificateLocation (Opsional) Jalur ke sertifikat klien.
sslKeyLocation (Opsional) Jalur ke kunci privat (PEM) klien yang digunakan untuk autentikasi.
sslKeyPassword (Opsional) Kata sandi untuk sertifikat klien.

Konfigurasi

Tabel berikut menjelaskan properti konfigurasi pengikatan yang Anda atur di file function.json.

Properti function.json Deskripsi
jenis (Diperlukan) - harus diatur ke kafkaTrigger.
arah (Diperlukan) - harus diatur ke in.
nama (Diperlukan) Nama variabel yang mewakili data yang diperantarai dalam kode fungsi.
brokerList (Diperlukan) Daftar broker Kafka yang dipantau oleh pemicunya. Lihat Koneksi untuk informasi selengkapnya.
topik (Diperlukan) Topik yang dipantau oleh pemicu.
kardinalitas (Opsional) Menunjukkan kardinalitas input pemicu. Nilai yang didukung adalah ONE (default), dan MANY. Gunakan ONE saat input adalah satu pesan dan MANY ketika input adalah array pesan. Saat Anda menggunakan MANY, Anda juga harus mengatur dataType.
dataType Menentukan cara Functions menangani nilai parameter. Secara default, nilai diperoleh sebagai string dan Functions mencoba mendeserialisasi string ke objek Java biasa (POJO) aktual. Ketika string, input diperlakukan hanya sebagai string. Ketika binary, pesan diterima sebagai data biner, dan Functions mencoba mendeserialisasinya ke byte jenis parameter aktual[].
consumerGroup (Opsional) Grup konsumen Kafka yang digunakan oleh pemicu.
avroSchema (Opsional) Skema rekaman generik saat menggunakan protokol Avro.
authenticationMode (Opsional) Mode autentikasi saat menggunakan Autentikasi Sederhana dan Lapisan Keamanan (SASL). Nilai yang didukung adalah Gssapi, Plain (default), ScramSha256, ScramSha512.
username (Opsional) Nama pengguna untuk autentikasi SASL. Tidak didukung ketika AuthenticationMode adalah Gssapi. Lihat Koneksi untuk informasi selengkapnya.
kata sandi (Opsional) Kata sandi untuk autentikasi SASL. Tidak didukung ketika AuthenticationMode adalah Gssapi. Lihat Koneksi untuk informasi selengkapnya.
protokol (Opsional) Protokol keamanan yang digunakan saat berkomunikasi dengan broker. Nilai yang didukung adalah plaintext (default), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Opsional) Jalur ke file sertifikat CA untuk memverifikasi sertifikat broker.
sslCertificateLocation (Opsional) Jalur ke sertifikat klien.
sslKeyLocation (Opsional) Jalur ke kunci privat (PEM) klien yang digunakan untuk autentikasi.
sslKeyPassword (Opsional) Kata sandi untuk sertifikat klien.

Penggunaan

Peristiwa Kafka saat ini didukung sebagai string dan array string yang merupakan payload JSON.

Pesan Kafka diteruskan ke fungsi sebagai string dan array string yang merupakan muatan JSON.

Dalam paket Premium, Anda harus mengaktifkan pemantauan skala runtime agar output Kafka dapat menskalakan ke beberapa instans. Untuk mempelajari selengkapnya, lihat Mengaktifkan penskalakan runtime.

Anda tidak dapat menggunakan fitur Uji/Jalankan dari halaman Kode + Uji di Portal Microsoft Azure untuk bekerja dengan pemicu Kafka. Anda harus mengirim peristiwa pengujian langsung ke topik yang dipantau oleh pemicu.

Untuk set lengkap pengaturan host.json yang didukung untuk pemicu Kafka, lihat pengaturan host.json.

Koneksi

Semua informasi koneksi yang diperlukan oleh pemicu dan pengikatan Anda harus dipertahankan dalam pengaturan aplikasi dan bukan dalam definisi pengikatan dalam kode Anda. Hal ini berlaku untuk kredensial, yang seharusnya tidak pernah disimpan dalam kode Anda.

Penting

Pengaturan kredensial harus mereferensikan pengaturan aplikasi. Jangan mengodekan informasi masuk secara permanen dalam kode atau file konfigurasi Anda. Saat berjalan secara lokal, gunakan file local.settings.json untuk informasi masuk Anda, dan jangan terbitkan file local.settings.json.

Saat menyambungkan ke kluster Kafka terkelola yang disediakan oleh Confluent di Azure, pastikan bahwa informasi masuk autentikasi berikut untuk lingkungan Confluent Cloud Anda diatur dalam pemicu atau pengikatan Anda:

Pengaturan Nilai yang direkomendasikan Deskripsi
BrokerList BootstrapServer Pengaturan aplikasi bernama BootstrapServer berisi nilai server bootstrap yang ditemukan di halaman pengaturan Confluent Cloud. Nilainya menyerupai xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Username ConfluentCloudUsername Pengaturan aplikasi bernama ConfluentCloudUsername berisi kunci akses API dari situs web Confluent Cloud.
Password ConfluentCloudPassword Pengaturan aplikasi bernama ConfluentCloudPassword yang berisi rahasia API dari situs web Confluent Cloud.

Nilai string yang Anda gunakan untuk pengaturan ini harus ada sebagai pengaturan aplikasi di Azure atau dalam kumpulan Values di file local.settings.json selama pengembangan lokal.

Anda juga harus mengatur Protocol, AuthenticationMode, dan SslCaLocation dalam definisi pengikatan Anda.

Langkah berikutnya