Apache Kafka kimeneti kötés az Azure Functionshez

A kimeneti kötés lehetővé teszi, hogy az Azure Functions-alkalmazások üzeneteket írjanak egy Kafka-témakörbe.

Fontos

A Kafka-kötések csak az Elastic Premium Csomag és a Dedikált (App Service) csomaghoz tartozó Functions esetében érhetők el. Ezek csak a Functions-futtatókörnyezet 3.x és újabb verziójában támogatottak.

Példa

A kötés használata a függvényalkalmazásban használt C#-modalitástól függ, amely az alábbiak egyike lehet:

A C# függvényt lefordított izolált feldolgozói folyamatosztály-kódtár a futtatókörnyezettől elkülönített folyamatban fut.

A használt attribútumok az adott eseményszolgáltatótól függenek.

Az alábbi példa egy egyéni visszatérési típussal rendelkezik MultipleOutputType, amely EGY HTTP-válaszból és egy Kafka-kimenetből áll.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

Az osztályban MultipleOutputTypeKevent a Kafka-kötés kimeneti kötési változója.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Eseményköteg küldéséhez adjon át egy sztringtömböt a kimeneti típusnak, ahogyan az a következő példában látható:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

A sztringtömb az osztály tulajdonságaként Kevents van definiálva, amelyen a kimeneti kötés definiálva van:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

A következő függvény fejléceket ad hozzá a Kafka kimeneti adataihoz:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

A működő .NET-példák teljes készletéért tekintse meg a Kafka-bővítménytárat.

Feljegyzés

A TypeScript-példák egyenértékű készletét lásd a Kafka-bővítménytárban

A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Olyan függvény Kafka kimeneti kötését mutatják be, amelyet egy HTTP-kérés aktivál, és adatokat küld a kérelemből a Kafka-témakörnek.

Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit az alábbi példákban:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

A következő kód ezután üzenetet küld a témakörnek:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

A következő kód több üzenetet küld tömbként ugyanarra a témakörre:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Az alábbi példa bemutatja, hogyan küldhet egy élőfejet tartalmazó eseményüzenetet ugyanarra a Kafka-témakörre:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

A működő JavaScript-példák teljes készletét a Kafka-bővítménytárban tekinti meg.

A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Olyan függvény Kafka kimeneti kötését mutatják be, amelyet egy HTTP-kérés aktivál, és adatokat küld a kérelemből a Kafka-témakörnek.

Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit az alábbi példákban:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

A következő kód ezután üzenetet küld a témakörnek:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

A következő kód több üzenetet küld tömbként ugyanarra a témakörre:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Az alábbi példa bemutatja, hogyan küldhet egy élőfejet tartalmazó eseményüzenetet ugyanarra a Kafka-témakörre:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

A működő PowerShell-példák teljes halmazát a Kafka-bővítménytárban tekinti meg.

A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Olyan függvény Kafka kimeneti kötését mutatják be, amelyet egy HTTP-kérés aktivál, és adatokat küld a kérelemből a Kafka-témakörnek.

Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit az alábbi példákban:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

A következő kód ezután üzenetet küld a témakörnek:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

A következő kód több üzenetet küld tömbként ugyanarra a témakörre:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

Az alábbi példa bemutatja, hogyan küldhet egy élőfejet tartalmazó eseményüzenetet ugyanarra a Kafka-témakörre:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

A működő Python-példák teljes halmazát a Kafka-bővítménytárban tekinti meg.

A kimeneti kötés konfigurálásához használt széljegyzetek az adott eseményszolgáltatótól függenek.

Az alábbi függvény üzenetet küld a Kafka-témakörnek.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

Az alábbi példa bemutatja, hogyan küldhet több üzenetet egy Kafka-témakörnek.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

Ebben a példában a kimeneti kötés paramétere sztringtömbre változik.

Az utolsó példa ezeket KafkaEntity és KafkaHeader osztályokat használja:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

Az alábbi példafüggvény fejléceket tartalmazó üzenetet küld egy Kafka-témakörnek.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

A Confluenthez készült működő Java-példák teljes készletét a Kafka-bővítménytárban tekinti meg.

Attribútumok

A folyamaton belüli és az izolált feldolgozói folyamat C# kódtárai az Kafka attribútummal határozzák meg a függvény-eseményindítót.

Az alábbi táblázat az attribútum használatával beállítható tulajdonságokat ismerteti:

Paraméter Leírás
BrokerList (Kötelező) Azon Kafka-közvetítők listája, amelyeknek a kimenetet elküldik. További információért tekintse meg a Csatlakozás.
Téma (Kötelező) Az a témakör, amelyre a kimenetet elküldik.
AvroSchema (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor.
MaxMessageBytes (Nem kötelező) Az elküldött kimeneti üzenet maximális mérete (MB-ban), alapértelmezett értékével 1.
BatchSize (Nem kötelező) Egyetlen üzenetkészletben kötegelve lévő üzenetek maximális száma, amelynek alapértelmezett értéke .10000
EnableIdempotence (Nem kötelező) Ha be van trueállítva, garantálja, hogy az üzeneteket pontosan egyszer és az eredeti előállítási sorrendben, az alapértelmezett érték false
MessageTimeoutMs (Nem kötelező) A helyi üzenet időtúllépése ezredmásodpercben. Ezt az értéket csak helyileg kényszeríti ki a rendszer, és korlátozza a létrehozott üzenet sikeres kézbesítésre való várakozási idejét, alapértelmezett 300000beállítással. Az idő 0 végtelen. Ez az érték az üzenet kézbesítéséhez használt maximális idő (beleértve az újrapróbálkozásokat is). Kézbesítési hiba akkor fordul elő, ha túllépi az újrapróbálkozások számát vagy az üzenet időtúllépését.
RequestTimeoutMs (Nem kötelező) A kimeneti kérelem nyugtázási időtúllépése ezredmásodpercben, alapértelmezés szerint 5000a .
MaxRetries (Nem kötelező) A sikertelen üzenetek küldésének újrapróbálkozásának száma az alapértelmezett beállítással 2. Az újrapróbálkozás átrendezést okozhat, hacsak nincs EnableIdempotence beállítva true.
AuthenticationMode (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi: , Plain (alapértelmezett), ScramSha256. ScramSha512
Felhasználónév (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi. További információért tekintse meg a Csatlakozás.
Jelszó (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi. További információért tekintse meg a Csatlakozás.
Protokoll (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl, sasl_plaintext. sasl_ssl
SslCaLocation (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez.
SslCertificateLocation (Nem kötelező) Az ügyfél tanúsítványának elérési útja.
SslKeyLocation (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja.
SslKeyPassword (Nem kötelező) Az ügyfél tanúsítványának jelszava.

Jegyzetek

A KafkaOutput jegyzetekkel létrehozhat egy függvényt, amely egy adott témakörbe ír. A támogatott beállítások a következő elemeket tartalmazzák:

Elem Leírás
név Annak a változónak a neve, amely a függvénykódban a közvetített adatokat jelöli.
brokerList (Kötelező) Azon Kafka-közvetítők listája, amelyeknek a kimenetet elküldik. További információért tekintse meg a Csatlakozás.
témakör (Kötelező) Az a témakör, amelyre a kimenetet elküldik.
Adattípus Meghatározza, hogy a Functions hogyan kezeli a paraméter értékét. Alapértelmezés szerint az érték sztringként lesz lekérve, és a Functions megpróbálja deszerializálni a sztringet a tényleges egyszerű régi Java-objektumra (POJO). Amikor stringa bemenetet csak sztringként kezeli a rendszer. Amikor binaryaz üzenet bináris adatként érkezik, és a Functions megpróbálja deszerializálni azt egy tényleges paramétertípus-bájtra[].
avroSchema (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor.
maxMessageBytes (Nem kötelező) Az elküldött kimeneti üzenet maximális mérete (MB-ban), alapértelmezett értékével 1.
batchSize (Nem kötelező) Egyetlen üzenetkészletben kötegelve lévő üzenetek maximális száma, amelynek alapértelmezett értéke .10000
enableIdempotence (Nem kötelező) Ha be van trueállítva, garantálja, hogy az üzeneteket pontosan egyszer és az eredeti előállítási sorrendben, az alapértelmezett érték false
messageTimeoutMs (Nem kötelező) A helyi üzenet időtúllépése ezredmásodpercben. Ezt az értéket csak helyileg kényszeríti ki a rendszer, és korlátozza a létrehozott üzenet sikeres kézbesítésre való várakozási idejét, alapértelmezett 300000beállítással. Az idő 0 végtelen. Ez az üzenet kézbesítésének maximális időtartama (beleértve az újrapróbálkozásokat is). Kézbesítési hiba akkor fordul elő, ha túllépi az újrapróbálkozások számát vagy az üzenet időtúllépését.
requestTimeoutMs (Nem kötelező) A kimeneti kérelem nyugtázási időtúllépése ezredmásodpercben, alapértelmezés szerint 5000a .
maxRetries (Nem kötelező) A sikertelen üzenetek küldésének újrapróbálkozásának száma az alapértelmezett beállítással 2. Az újrapróbálkozás átrendezést okozhat, hacsak nincs EnableIdempotence beállítva true.
authenticationMode (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi: , Plain (alapértelmezett), ScramSha256. ScramSha512
Felhasználónév (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi. További információért tekintse meg a Csatlakozás.
Jelszó (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi. További információért tekintse meg a Csatlakozás.
Protokoll (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl, sasl_plaintext. sasl_ssl
sslCaLocation (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez.
sslCertificateLocation (Nem kötelező) Az ügyfél tanúsítványának elérési útja.
sslKeyLocation (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja.
sslKeyPassword (Nem kötelező) Az ügyfél tanúsítványának jelszava.

Konfiguráció

Az alábbi táblázat a function.json fájlban beállított kötéskonfigurációs tulajdonságokat ismerteti.

function.json tulajdonság Leírás
type A beállításnak a kafkakövetkezőnek kell lennie: .
direction A beállításnak a outkövetkezőnek kell lennie: .
név Annak a változónak a neve, amely a függvénykódban a közvetített adatokat jelöli.
brokerList (Kötelező) Azon Kafka-közvetítők listája, amelyeknek a kimenetet elküldik. További információért tekintse meg a Csatlakozás.
témakör (Kötelező) Az a témakör, amelyre a kimenetet elküldik.
avroSchema (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor.
maxMessageBytes (Nem kötelező) Az elküldött kimeneti üzenet maximális mérete (MB-ban), alapértelmezett értékével 1.
batchSize (Nem kötelező) Egyetlen üzenetkészletben kötegelve lévő üzenetek maximális száma, amelynek alapértelmezett értéke .10000
enableIdempotence (Nem kötelező) Ha be van trueállítva, garantálja, hogy az üzeneteket pontosan egyszer és az eredeti előállítási sorrendben, az alapértelmezett érték false
messageTimeoutMs (Nem kötelező) A helyi üzenet időtúllépése ezredmásodpercben. Ezt az értéket csak helyileg kényszeríti ki a rendszer, és korlátozza a létrehozott üzenet sikeres kézbesítésre való várakozási idejét, alapértelmezett 300000beállítással. Az idő 0 végtelen. Ez az üzenet kézbesítésének maximális időtartama (beleértve az újrapróbálkozásokat is). Kézbesítési hiba akkor fordul elő, ha túllépi az újrapróbálkozások számát vagy az üzenet időtúllépését.
requestTimeoutMs (Nem kötelező) A kimeneti kérelem nyugtázási időtúllépése ezredmásodpercben, alapértelmezés szerint 5000a .
maxRetries (Nem kötelező) A sikertelen üzenetek küldésének újrapróbálkozásának száma az alapértelmezett beállítással 2. Az újrapróbálkozás átrendezést okozhat, hacsak nincs EnableIdempotence beállítva true.
authenticationMode (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi: , Plain (alapértelmezett), ScramSha256. ScramSha512
Felhasználónév (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi. További információért tekintse meg a Csatlakozás.
Jelszó (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi. További információért tekintse meg a Csatlakozás.
Protokoll (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl, sasl_plaintext. sasl_ssl
sslCaLocation (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez.
sslCertificateLocation (Nem kötelező) Az ügyfél tanúsítványának elérési útja.
sslKeyLocation (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja.
sslKeyPassword (Nem kötelező) Az ügyfél tanúsítványának jelszava.

Használat

A kulcsok és az értéktípusok beépített Avro- és Protobuf-szerializálással is támogatottak.

Az esemény eltolása, partíciója és időbélyege futásidőben jön létre. Csak érték és fejlécek állíthatók be a függvényen belül. A témakör a function.json van beállítva.

Győződjön meg arról, hogy rendelkezik hozzáféréssel ahhoz a Kafka-témakörhöz, amelyhez írni próbál. A kötést hozzáférési és kapcsolati hitelesítő adatokkal konfigurálja a Kafka-témakörhöz.

Prémium csomag esetén engedélyeznie kell a futtatókörnyezeti skálázás figyelését ahhoz, hogy a Kafka-kimenet több példányra is felskálázható legyen. További információ: Futtatókörnyezeti skálázás engedélyezése.

A Kafka-eseményindító támogatott host.json beállításainak teljes halmazát host.json beállítások között találhatja meg.

Kapcsolatok

Az eseményindítók és kötések által igényelt kapcsolati információkat az alkalmazásbeállításokban, és nem a kód kötésdefinícióiban kell tartani. Ez igaz a hitelesítő adatokra, amelyeket soha nem szabad a kódban tárolni.

Fontos

A hitelesítő adatok beállításainak egy alkalmazásbeállításra kell hivatkoznia. A kódban vagy a konfigurációs fájlokban ne írja be a hitelesítő adatokat. Helyi futtatáskor használja a local.settings.json fájlt a hitelesítő adataihoz, és ne tegye közzé a local.settings.json fájlt.

Amikor az Azure-ban a Confluent által biztosított felügyelt Kafka-fürthöz csatlakozik, győződjön meg arról, hogy a Confluent Cloud-környezet következő hitelesítési hitelesítő adatai be vannak állítva az eseményindítóban vagy kötésben:

Beállítás Javasolt érték Leírás
BrokerList BootstrapServer Az elnevezett BootstrapServer alkalmazásbeállítás a Confluent Cloud beállításai lapon található bootstrap-kiszolgáló értékét tartalmazza. Az érték a következőhöz xyz-xyzxzy.westeurope.azure.confluent.cloud:9092hasonló: .
Felhasználónév ConfluentCloudUsername A névvel ellátott ConfluentCloudUsername alkalmazásbeállítás tartalmazza a Confluent Cloud webhely api-hozzáférési kulcsát.
Jelszó ConfluentCloudPassword A névvel ellátott ConfluentCloudPassword alkalmazásbeállítás tartalmazza a Confluent Cloud webhelyről beszerzett API-titkos kódot.

Az ezekhez a beállításokhoz használt sztringértékeknek alkalmazásbeállításokként kell szerepelniük az Azure-ban vagy a Values local.settings.json fájl gyűjteményében a helyi fejlesztés során.

A , AuthenticationModeés SslCaLocation a Protocolkötésdefiníciókat is be kell állítania.

Következő lépések