Bagikan melalui


Pengikatan output Apache Kafka untuk Azure Functions

Pengikatan output memungkinkan aplikasi Azure Functions menulis pesan ke topik Kafka.

Penting

Pengikatan Kafka hanya tersedia untuk Functions pada Paket Elastic Premium dan paket Dedicated (App Service). Paket tersebut hanya didukung pada runtime Functions versi 3.x dan versi yang lebih baru.

Contoh

Penggunaan pengikatan bergantung pada modalitas C# yang digunakan dalam aplikasi fungsi Anda, yang dapat berupa salah satu hal berikut:

Pustaka kelas proses pekerja terisolasi yang dikompilasi fungsi C# berjalan dalam proses yang diisolasi dari runtime.

Atribut yang Anda gunakan bergantung pada penyedia kejadian tertentu.

Contoh berikut memiliki jenis pengembalian kustom yaitu MultipleOutputType, yang terdiri dari respons HTTP dan output Kafka.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

Di kelas MultipleOutputType, Kevent adalah variabel pengikatan output untuk pengikatan Kafka.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Untuk mengirim batch kejadian, teruskan array string ke jenis output, seperti yang ditunjukkan dalam contoh berikut:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

Array string didefinisikan sebagai properti Kevents pada kelas, di mana pengikatan output didefinisikan:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Fungsi berikut menambahkan header ke data output Kafka:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Untuk set contoh .NET yang berfungsi lengkap, lihat Repositori ekstensi Kafka.

Catatan

Untuk set contoh TypeScript yang setara, lihat Repositori ekstensi Kafka

Properti spesifik file function.json bergantung pada penyedia kejadian Anda, yang dalam contoh ini adalah Confluent atau Azure Event Hubs. Contoh berikut menunjukkan pengikatan output Kafka untuk fungsi yang dipicu oleh permintaan HTTP dan mengirimkan data dari permintaan ke topik Kafka.

Function.json berikut menentukan pemicu untuk penyedia tertentu dalam contoh berikut:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

Kemudian, kode berikut mengirim pesan ke topik:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

Kode berikut mengirim beberapa pesan sebagai array ke topik yang sama:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Contoh berikut menunjukkan cara mengirim pesan kejadian dengan header ke topik Kafka yang sama:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Untuk set contoh JavaScript yang berfungsi lengkap, lihat Repositori ekstensi Kafka.

Properti spesifik file function.json bergantung pada penyedia kejadian Anda, yang dalam contoh ini adalah Confluent atau Azure Event Hubs. Contoh berikut menunjukkan pengikatan output Kafka untuk fungsi yang dipicu oleh permintaan HTTP dan mengirimkan data dari permintaan ke topik Kafka.

Function.json berikut menentukan pemicu untuk penyedia tertentu dalam contoh berikut:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

Kemudian, kode berikut mengirim pesan ke topik:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Kode berikut mengirim beberapa pesan sebagai array ke topik yang sama:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Contoh berikut menunjukkan cara mengirim pesan kejadian dengan header ke topik Kafka yang sama:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Untuk set contoh PowerShell yang berfungsi lengkap, lihat Repositori ekstensi Kafka.

Properti spesifik file function.json bergantung pada penyedia kejadian Anda, yang dalam contoh ini adalah Confluent atau Azure Event Hubs. Contoh berikut menunjukkan pengikatan output Kafka untuk fungsi yang dipicu oleh permintaan HTTP dan mengirimkan data dari permintaan ke topik Kafka.

Function.json berikut menentukan pemicu untuk penyedia tertentu dalam contoh berikut:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

Kemudian, kode berikut mengirim pesan ke topik:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

Kode berikut mengirim beberapa pesan sebagai array ke topik yang sama:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

Contoh berikut menunjukkan cara mengirim pesan kejadian dengan header ke topik Kafka yang sama:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

Untuk set contoh Python yang berfungsi lengkap, lihat Repositori ekstensi Kafka.

Anotasi yang Anda gunakan untuk mengonfigurasi pengikatan output bergantung pada penyedia kejadian tertentu.

Fungsi berikut mengirimkan pesan ke topik Kafka.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

Contoh berikut menunjukkan cara mengirimkan beberapa pesan ke topik Kafka.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

Dalam contoh ini, parameter pengikatan output diubah menjadi array string.

Contoh terakhir yang digunakan untuk kelas KafkaEntity dan KafkaHeader:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

Contoh fungsi berikut mengirim pesan dengan header ke topik Kafka.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Untuk set contoh Java yang berfungsi lengkap bagi Confluent, lihat Repositori ekstensi Kafka.

Atribut

Pustaka C# proses dalam proses dan terisolasi menggunakan Kafka atribut untuk menentukan pemicu fungsi.

Tabel berikut menjelaskan properti yang dapat Anda atur menggunakan atribut ini:

Parameter Deskripsi
BrokerList (Wajib) Daftar broker Kafka tempat output dikirim. Lihat Koneksi untuk informasi selengkapnya.
Topik (Wajib) Topik tempat output dikirim.
AvroSchema (Opsional) Skema rekaman generik saat menggunakan protokol Avro.
MaxMessageBytes (Opsional) Ukuran maksimum pesan output yang dikirim (dalam MB), dengan nilai default 1.
BatchSize (Opsional) Jumlah maksimum pesan yang di-batch dalam satu kumpulan pesan, dengan nilai default 10000.
EnableIdempotence (Opsional) Ketika diatur ke true, menjamin bahwa pesan berhasil diproduksi tepat satu kali dan dalam urutan produksi asli, dengan nilai default false
MessageTimeoutMs (Opsional) Batas waktu pesan lokal, dalam milidetik. Nilai ini hanya diberlakukan secara lokal dan membatasi waktu tunggu pesan yang dihasilkan untuk pengiriman yang berhasil, dengan default 300000. Waktu 0 tidak terbatas. Nilai ini adalah waktu maksimum yang digunakan untuk mengirimkan pesan (termasuk percobaan kembali). Kesalahan pengiriman terjadi ketika jumlah upaya coba lagi atau batas waktu pesan terlampaui.
RequestTimeoutMs (Opsional) Batas waktu pengakuan permintaan output, dalam milidetik, dengan default 5000.
MaxRetries (Opsional) Frekuensi percobaan kembali mengirim Pesan yang gagal, dengan default 2. Mencoba kembali dapat menyebabkan penyusunan ulang, kecuali EnableIdempotence diatur ke true.
AuthenticationMode (Opsional) Mode autentikasi saat menggunakan Autentikasi Sederhana dan Lapisan Keamanan (SASL). Nilai yang didukung adalah Gssapi, Plain (default), ScramSha256, ScramSha512.
Username (Opsional) Nama pengguna untuk autentikasi SASL. Tidak didukung ketika AuthenticationMode adalah Gssapi. Lihat Koneksi untuk informasi selengkapnya.
Password (Opsional) Kata sandi untuk autentikasi SASL. Tidak didukung ketika AuthenticationMode adalah Gssapi. Lihat Koneksi untuk informasi selengkapnya.
Protokol (Opsional) Protokol keamanan yang digunakan saat berkomunikasi dengan broker. Nilai yang didukung adalah plaintext (default), ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (Opsional) Jalur ke file sertifikat CA untuk memverifikasi sertifikat broker.
SslCertificateLocation (Opsional) Jalur ke sertifikat klien.
SslKeyLocation (Opsional) Jalur ke kunci privat (PEM) klien yang digunakan untuk autentikasi.
SslKeyPassword (Opsional) Kata sandi untuk sertifikat klien.

Anotasi

Anotasi KafkaOutput memungkinkan Anda membuat fungsi yang menulis ke topik tertentu. Opsi yang didukung mencakup elemen berikut:

Elemen Deskripsi
nama Nama variabel yang mewakili data broker dalam kode fungsi.
brokerList (Wajib) Daftar broker Kafka tempat output dikirim. Lihat Koneksi untuk informasi selengkapnya.
topik (Wajib) Topik tempat output dikirim.
dataType Menentukan cara Functions menangani nilai parameter. Secara default, nilai diperoleh sebagai string dan Functions mencoba mendeserialisasi string ke objek Java biasa (POJO) aktual. Ketika string, input diperlakukan hanya sebagai string. Ketika binary, pesan diterima sebagai data biner, dan Functions mencoba mendeserialisasinya ke byte jenis parameter aktual[].
avroSchema (Opsional) Skema rekaman generik saat menggunakan protokol Avro. (Saat ini tidak didukung untuk Java.)
maxMessageBytes (Opsional) Ukuran maksimum pesan output yang dikirim (dalam MB), dengan nilai default 1.
batchSize (Opsional) Jumlah maksimum pesan yang di-batch dalam satu kumpulan pesan, dengan nilai default 10000.
enableIdempotence (Opsional) Ketika diatur ke true, menjamin bahwa pesan berhasil diproduksi tepat satu kali dan dalam urutan produksi asli, dengan nilai default false
messageTimeoutMs (Opsional) Batas waktu pesan lokal, dalam milidetik. Nilai ini hanya diberlakukan secara lokal dan membatasi waktu tunggu pesan yang dihasilkan untuk pengiriman yang berhasil, dengan default 300000. Waktu 0 tidak terbatas. Ini adalah waktu maksimum yang digunakan untuk mengirimkan pesan (termasuk percobaan kembali). Kesalahan pengiriman terjadi ketika jumlah upaya coba lagi atau batas waktu pesan terlampaui.
requestTimeoutMs (Opsional) Batas waktu pengakuan permintaan output, dalam milidetik, dengan default 5000.
maxRetries (Opsional) Frekuensi percobaan kembali mengirim Pesan yang gagal, dengan default 2. Mencoba kembali dapat menyebabkan penyusunan ulang, kecuali EnableIdempotence diatur ke true.
authenticationMode (Opsional) Mode autentikasi saat menggunakan Autentikasi Sederhana dan Lapisan Keamanan (SASL). Nilai yang didukung adalah Gssapi, Plain (default), ScramSha256, ScramSha512.
username (Opsional) Nama pengguna untuk autentikasi SASL. Tidak didukung ketika AuthenticationMode adalah Gssapi. Lihat Koneksi untuk informasi selengkapnya.
kata sandi (Opsional) Kata sandi untuk autentikasi SASL. Tidak didukung ketika AuthenticationMode adalah Gssapi. Lihat Koneksi untuk informasi selengkapnya.
protokol (Opsional) Protokol keamanan yang digunakan saat berkomunikasi dengan broker. Nilai yang didukung adalah plaintext (default), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Opsional) Jalur ke file sertifikat CA untuk memverifikasi sertifikat broker.
sslCertificateLocation (Opsional) Jalur ke sertifikat klien.
sslKeyLocation (Opsional) Jalur ke kunci privat (PEM) klien yang digunakan untuk autentikasi.
sslKeyPassword (Opsional) Kata sandi untuk sertifikat klien.

Konfigurasi

Tabel berikut menjelaskan properti konfigurasi pengikatan yang Anda atur di file function.json.

Properti function.json Deskripsi
jenis Harus diatur ke kafka.
arah Harus diatur ke out.
nama Nama variabel yang mewakili data broker dalam kode fungsi.
brokerList (Wajib) Daftar broker Kafka tempat output dikirim. Lihat Koneksi untuk informasi selengkapnya.
topik (Wajib) Topik tempat output dikirim.
avroSchema (Opsional) Skema rekaman generik saat menggunakan protokol Avro.
maxMessageBytes (Opsional) Ukuran maksimum pesan output yang dikirim (dalam MB), dengan nilai default 1.
batchSize (Opsional) Jumlah maksimum pesan yang di-batch dalam satu kumpulan pesan, dengan nilai default 10000.
enableIdempotence (Opsional) Ketika diatur ke true, menjamin bahwa pesan berhasil diproduksi tepat satu kali dan dalam urutan produksi asli, dengan nilai default false
messageTimeoutMs (Opsional) Batas waktu pesan lokal, dalam milidetik. Nilai ini hanya diberlakukan secara lokal dan membatasi waktu tunggu pesan yang dihasilkan untuk pengiriman yang berhasil, dengan default 300000. Waktu 0 tidak terbatas. Ini adalah waktu maksimum yang digunakan untuk mengirimkan pesan (termasuk percobaan kembali). Kesalahan pengiriman terjadi ketika jumlah upaya coba lagi atau batas waktu pesan terlampaui.
requestTimeoutMs (Opsional) Batas waktu pengakuan permintaan output, dalam milidetik, dengan default 5000.
maxRetries (Opsional) Frekuensi percobaan kembali mengirim Pesan yang gagal, dengan default 2. Mencoba kembali dapat menyebabkan penyusunan ulang, kecuali EnableIdempotence diatur ke true.
authenticationMode (Opsional) Mode autentikasi saat menggunakan Autentikasi Sederhana dan Lapisan Keamanan (SASL). Nilai yang didukung adalah Gssapi, Plain (default), ScramSha256, ScramSha512.
username (Opsional) Nama pengguna untuk autentikasi SASL. Tidak didukung ketika AuthenticationMode adalah Gssapi. Lihat Koneksi untuk informasi selengkapnya.
kata sandi (Opsional) Kata sandi untuk autentikasi SASL. Tidak didukung ketika AuthenticationMode adalah Gssapi. Lihat Koneksi untuk informasi selengkapnya.
protokol (Opsional) Protokol keamanan yang digunakan saat berkomunikasi dengan broker. Nilai yang didukung adalah plaintext (default), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Opsional) Jalur ke file sertifikat CA untuk memverifikasi sertifikat broker.
sslCertificateLocation (Opsional) Jalur ke sertifikat klien.
sslKeyLocation (Opsional) Jalur ke kunci privat (PEM) klien yang digunakan untuk autentikasi.
sslKeyPassword (Opsional) Kata sandi untuk sertifikat klien.

Penggunaan

Jenis kunci dan nilai didukung dengan serialisasi Avro dan Protobuf bawaan.

Offset, partisi, dan stempel waktu untuk peristiwa dihasilkan di runtime. Hanya nilai dan header yang dapat diatur di dalam fungsi. Topik diatur dalam function.json.

Pastikan untuk memiliki akses ke topik Kafka yang ingin Anda tulis. Anda mengonfigurasi pengikatan dengan kredensial akses dan koneksi ke topik Kafka.

Dalam paket Premium, Anda harus mengaktifkan pemantauan skala runtime agar output Kafka dapat menskalakan ke beberapa instans. Untuk mempelajari selengkapnya, lihat Mengaktifkan penskalakan runtime.

Untuk set lengkap pengaturan host.json yang didukung untuk pemicu Kafka, lihat pengaturan host.json.

Koneksi

Semua informasi koneksi yang diperlukan oleh pemicu dan pengikatan Anda harus dipertahankan dalam pengaturan aplikasi dan bukan dalam definisi pengikatan dalam kode Anda. Hal ini berlaku untuk kredensial, yang seharusnya tidak pernah disimpan dalam kode Anda.

Penting

Pengaturan kredensial harus mereferensikan pengaturan aplikasi. Jangan mengodekan informasi masuk secara permanen dalam kode atau file konfigurasi Anda. Saat berjalan secara lokal, gunakan file local.settings.json untuk informasi masuk Anda, dan jangan terbitkan file local.settings.json.

Saat menyambungkan ke kluster Kafka terkelola yang disediakan oleh Confluent di Azure, pastikan bahwa informasi masuk autentikasi berikut untuk lingkungan Confluent Cloud Anda diatur dalam pemicu atau pengikatan Anda:

Pengaturan Nilai yang direkomendasikan Deskripsi
BrokerList BootstrapServer Pengaturan aplikasi bernama BootstrapServer berisi nilai server bootstrap yang ditemukan di halaman pengaturan Confluent Cloud. Nilainya menyerupai xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Username ConfluentCloudUsername Pengaturan aplikasi bernama ConfluentCloudUsername berisi kunci akses API dari situs web Confluent Cloud.
Password ConfluentCloudPassword Pengaturan aplikasi bernama ConfluentCloudPassword yang berisi rahasia API dari situs web Confluent Cloud.

Nilai string yang Anda gunakan untuk pengaturan ini harus ada sebagai pengaturan aplikasi di Azure atau dalam kumpulan Values di file local.settings.json selama pengembangan lokal.

Anda juga harus mengatur Protocol, AuthenticationMode, dan SslCaLocation dalam definisi pengikatan Anda.

Langkah berikutnya