Sdílet prostřednictvím


Výstupní vazba Apache Kafka pro Azure Functions

Výstupní vazba umožňuje aplikaci Azure Functions psát zprávy do tématu Kafka.

Důležité

Vazby Kafka jsou dostupné jenom pro funkce v plánu Elastic Premium a vyhrazeném plánu (App Service). Podporují se pouze ve verzi 3.x a novější verzi modulu runtime Functions.

Příklad

Použití vazby závisí na způsobu použití jazyka C# používaném ve vaší aplikaci funkcí, což může být jedna z následujících možností:

Kompilovaná funkce C# v izolované knihovně tříd pracovních procesů běží v procesu izolovaném od modulu runtime.

Atributy, které použijete, závisí na konkrétním poskytovateli událostí.

Následující příklad obsahuje vlastní návratový typ, který se MultipleOutputTypeskládá z odpovědi HTTP a výstupu Kafka.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

Ve třídě MultipleOutputTypeKevent je výstupní proměnná vazby pro vazbu Kafka.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Pokud chcete odeslat dávku událostí, předejte řetězcové pole do výstupního typu, jak je znázorněno v následujícím příkladu:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

Řetězcové pole je definováno jako Kevents vlastnost třídy, na které je definována výstupní vazba:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Následující funkce přidá hlavičky do výstupních dat Kafka:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Kompletní sadu funkčních příkladů .NET najdete v úložišti rozšíření Kafka.

Poznámka:

Ekvivalentní sadu příkladů TypeScriptu najdete v úložišti rozšíření Kafka.

Konkrétní vlastnosti souboru function.json závisí na poskytovateli událostí, který v těchto příkladech jsou Confluent nebo Azure Event Hubs. Následující příklady ukazují výstupní vazbu Kafka pro funkci aktivovanou požadavkem HTTP a odesílají data z požadavku do tématu Kafka.

Následující function.json definuje trigger pro konkrétního poskytovatele v těchto příkladech:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

Následující kód pak odešle zprávu do tématu:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

Následující kód odešle více zpráv jako pole do stejného tématu:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Následující příklad ukazuje, jak odeslat zprávu události se záhlavími do stejného tématu Kafka:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Kompletní sadu funkčních příkladů JavaScriptu najdete v úložišti rozšíření Kafka.

Konkrétní vlastnosti souboru function.json závisí na poskytovateli událostí, který v těchto příkladech jsou Confluent nebo Azure Event Hubs. Následující příklady ukazují výstupní vazbu Kafka pro funkci aktivovanou požadavkem HTTP a odesílají data z požadavku do tématu Kafka.

Následující function.json definuje trigger pro konkrétního poskytovatele v těchto příkladech:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

Následující kód pak odešle zprávu do tématu:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Následující kód odešle více zpráv jako pole do stejného tématu:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Následující příklad ukazuje, jak odeslat zprávu události se záhlavími do stejného tématu Kafka:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Kompletní sadu funkčních příkladů PowerShellu najdete v úložišti rozšíření Kafka.

Konkrétní vlastnosti souboru function.json závisí na poskytovateli událostí, který v těchto příkladech jsou Confluent nebo Azure Event Hubs. Následující příklady ukazují výstupní vazbu Kafka pro funkci aktivovanou požadavkem HTTP a odesílají data z požadavku do tématu Kafka.

Následující function.json definuje trigger pro konkrétního poskytovatele v těchto příkladech:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

Následující kód pak odešle zprávu do tématu:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

Následující kód odešle více zpráv jako pole do stejného tématu:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

Následující příklad ukazuje, jak odeslat zprávu události se záhlavími do stejného tématu Kafka:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

Kompletní sadu funkčních příkladů Pythonu najdete v úložišti rozšíření Kafka.

Poznámky, které použijete ke konfiguraci výstupní vazby, závisí na konkrétním poskytovateli událostí.

Následující funkce odešle zprávu do tématu Kafka.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

Následující příklad ukazuje, jak odeslat více zpráv do tématu Kafka.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

V tomto příkladu se parametr výstupní vazby změní na pole řetězců.

Poslední příklad se používá pro tyto KafkaEntity a KafkaHeader třídy:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

Následující ukázková funkce odešle zprávu se záhlavími do tématu Kafka.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Kompletní sadu funkčních příkladů Javy pro Confluent najdete v úložišti rozšíření Kafka.

Atributy

Knihovny C# v procesu i izolovaného pracovního procesu používají Kafka atribut k definování triggeru funkce.

Následující tabulka vysvětluje vlastnosti, které můžete nastavit pomocí tohoto atributu:

Parametr Popis
BrokerList (Povinné) Seznam zprostředkovatelů Kafka, do kterých se odesílá výstup. Další informace najdete v tématu Připojení .
Předmět zájmu (Povinné) Téma, do kterého se výstup odešle.
AvroSchema (Volitelné) Schéma obecného záznamu při použití protokolu Avro
MaxMessageBytes (Volitelné) Maximální velikost odesílané výstupní zprávy (v MB) s výchozí hodnotou 1.
BatchSize (Volitelné) Maximální počet zpráv v jedné sadě zpráv s výchozí hodnotou 10000.
EnableIdempotence (Volitelné) Pokud je nastavena hodnota true, zaručuje, že zprávy jsou úspěšně vytvořeny přesně jednou a v původním pořadí produkce s výchozí hodnotou false
MessageTimeoutMs (Volitelné) Časový limit místní zprávy v milisekundách. Tato hodnota se vynucuje pouze místně a omezuje čas čekání vytvořené zprávy na úspěšné doručení s výchozí 300000hodnotou . Čas 0 je nekonečný. Tato hodnota je maximální doba použitá k doručení zprávy (včetně opakování). K chybě doručení dochází při překročení počtu opakování nebo vypršení časového limitu zprávy.
RequestTimeoutMs (Volitelné) Časový limit potvrzení výstupního požadavku v milisekundách s výchozí hodnotou 5000.
MaxRetries (Volitelné) Počet opakování pokusu o odeslání neúspěšné zprávy s výchozí hodnotou 2. Opakování může způsobit změny pořadí, pokud EnableIdempotence není nastavena na truehodnotu .
AuthenticationMode (Volitelné) Režim ověřování při použití ověřování SASL (Simple Authentication and Security Layer). Podporované hodnoty jsou Gssapi, Plain (výchozí), ScramSha256, ScramSha512.
Uživatelské jméno (Volitelné) Uživatelské jméno pro ověřování SASL. Nepodporuje se, pokud AuthenticationMode je .Gssapi Další informace najdete v tématu Připojení .
Heslo (Volitelné) Heslo pro ověřování SASL. Nepodporuje se, pokud AuthenticationMode je .Gssapi Další informace najdete v tématu Připojení .
Protokol (Volitelné) Protokol zabezpečení používaný při komunikaci s zprostředkovateli. Podporované hodnoty jsou plaintext (výchozí), ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (Volitelné) Cesta k souboru certifikátu certifikační autority pro ověření certifikátu zprostředkovatele
SslCertificateLocation (Volitelné) Cesta k certifikátu klienta
SslKeyLocation (Volitelné) Cesta k privátnímu klíči klienta (PEM) používanému k ověřování
SslKeyPassword (Volitelné) Heslo pro certifikát klienta.

Poznámky

Poznámka KafkaOutput umožňuje vytvořit funkci, která zapisuje do konkrétního tématu. Mezi podporované možnosti patří následující prvky:

Element (Prvek) Popis
Jméno Název proměnné, která představuje zprostředkovaná data v kódu funkce.
brokerList (Povinné) Seznam zprostředkovatelů Kafka, do kterých se odesílá výstup. Další informace najdete v tématu Připojení .
topic (Povinné) Téma, do kterého se výstup odešle.
Datatype Definuje, jak služba Functions zpracovává hodnotu parametru. Ve výchozím nastavení je hodnota získána jako řetězec a Functions se pokusí deserializovat řetězec na skutečný prostý starý java objekt (POJO). Když stringse vstup považuje za řetězec. Když binaryse zpráva přijme jako binární data a functions se pokusí deserializovat na skutečný typ parametru bajt[].
avroSchema (Volitelné) Schéma obecného záznamu při použití protokolu Avro (Aktuálně se nepodporuje pro Javu.)
maxMessageBytes (Volitelné) Maximální velikost odesílané výstupní zprávy (v MB) s výchozí hodnotou 1.
batchSize (Volitelné) Maximální počet zpráv v jedné sadě zpráv s výchozí hodnotou 10000.
enableIdempotence (Volitelné) Pokud je nastavena hodnota true, zaručuje, že zprávy jsou úspěšně vytvořeny přesně jednou a v původním pořadí produkce s výchozí hodnotou false
messageTimeoutMs (Volitelné) Časový limit místní zprávy v milisekundách. Tato hodnota se vynucuje pouze místně a omezuje čas čekání vytvořené zprávy na úspěšné doručení s výchozí 300000hodnotou . Čas 0 je nekonečný. Jedná se o maximální dobu použitou k doručení zprávy (včetně opakování). K chybě doručení dochází při překročení počtu opakování nebo vypršení časového limitu zprávy.
requestTimeoutMs (Volitelné) Časový limit potvrzení výstupního požadavku v milisekundách s výchozí hodnotou 5000.
maxRetries (Volitelné) Počet opakování pokusu o odeslání neúspěšné zprávy s výchozí hodnotou 2. Opakování může způsobit změny pořadí, pokud EnableIdempotence není nastavena na truehodnotu .
authenticationMode (Volitelné) Režim ověřování při použití ověřování SASL (Simple Authentication and Security Layer). Podporované hodnoty jsou Gssapi, Plain (výchozí), ScramSha256, ScramSha512.
uživatelské jméno (Volitelné) Uživatelské jméno pro ověřování SASL. Nepodporuje se, pokud AuthenticationMode je .Gssapi Další informace najdete v tématu Připojení .
heslo (Volitelné) Heslo pro ověřování SASL. Nepodporuje se, pokud AuthenticationMode je .Gssapi Další informace najdete v tématu Připojení .
protokol (Volitelné) Protokol zabezpečení používaný při komunikaci s zprostředkovateli. Podporované hodnoty jsou plaintext (výchozí), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Volitelné) Cesta k souboru certifikátu certifikační autority pro ověření certifikátu zprostředkovatele
sslCertificateLocation (Volitelné) Cesta k certifikátu klienta
sslKeyLocation (Volitelné) Cesta k privátnímu klíči klienta (PEM) používanému k ověřování
sslKeyPassword (Volitelné) Heslo pro certifikát klienta.

Konfigurace

Následující tabulka vysvětluje vlastnosti konfigurace vazby, které jste nastavili v souboru function.json .

vlastnost function.json Popis
type Musí být nastavena na kafkahodnotu .
direction Musí být nastavena na outhodnotu .
Jméno Název proměnné, která představuje zprostředkovaná data v kódu funkce.
brokerList (Povinné) Seznam zprostředkovatelů Kafka, do kterých se odesílá výstup. Další informace najdete v tématu Připojení .
topic (Povinné) Téma, do kterého se výstup odešle.
avroSchema (Volitelné) Schéma obecného záznamu při použití protokolu Avro
maxMessageBytes (Volitelné) Maximální velikost odesílané výstupní zprávy (v MB) s výchozí hodnotou 1.
batchSize (Volitelné) Maximální počet zpráv v jedné sadě zpráv s výchozí hodnotou 10000.
enableIdempotence (Volitelné) Pokud je nastavena hodnota true, zaručuje, že zprávy jsou úspěšně vytvořeny přesně jednou a v původním pořadí produkce s výchozí hodnotou false
messageTimeoutMs (Volitelné) Časový limit místní zprávy v milisekundách. Tato hodnota se vynucuje pouze místně a omezuje čas čekání vytvořené zprávy na úspěšné doručení s výchozí 300000hodnotou . Čas 0 je nekonečný. Jedná se o maximální dobu použitou k doručení zprávy (včetně opakování). K chybě doručení dochází při překročení počtu opakování nebo vypršení časového limitu zprávy.
requestTimeoutMs (Volitelné) Časový limit potvrzení výstupního požadavku v milisekundách s výchozí hodnotou 5000.
maxRetries (Volitelné) Počet opakování pokusu o odeslání neúspěšné zprávy s výchozí hodnotou 2. Opakování může způsobit změny pořadí, pokud EnableIdempotence není nastavena na truehodnotu .
authenticationMode (Volitelné) Režim ověřování při použití ověřování SASL (Simple Authentication and Security Layer). Podporované hodnoty jsou Gssapi, Plain (výchozí), ScramSha256, ScramSha512.
uživatelské jméno (Volitelné) Uživatelské jméno pro ověřování SASL. Nepodporuje se, pokud AuthenticationMode je .Gssapi Další informace najdete v tématu Připojení .
heslo (Volitelné) Heslo pro ověřování SASL. Nepodporuje se, pokud AuthenticationMode je .Gssapi Další informace najdete v tématu Připojení .
protokol (Volitelné) Protokol zabezpečení používaný při komunikaci s zprostředkovateli. Podporované hodnoty jsou plaintext (výchozí), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Volitelné) Cesta k souboru certifikátu certifikační autority pro ověření certifikátu zprostředkovatele
sslCertificateLocation (Volitelné) Cesta k certifikátu klienta
sslKeyLocation (Volitelné) Cesta k privátnímu klíči klienta (PEM) používanému k ověřování
sslKeyPassword (Volitelné) Heslo pro certifikát klienta.

Využití

S integrovaným serializací Avro a Protobuf se podporují klíče i typy hodnot.

Posun, oddíl a časové razítko události se generují za běhu. Uvnitř funkce lze nastavit pouze hodnoty a hlavičky. Téma je nastavené v function.json.

Ujistěte se, že máte přístup k tématu Kafka, ke kterému se pokoušíte psát. Vazbu nakonfigurujete pomocí přihlašovacích údajů pro přístup a připojení k tématu Kafka.

V plánu Premium musíte povolit monitorování škálování modulu runtime pro výstup Kafka, aby bylo možné škálovat na více instancí. Další informace najdete v tématu Povolení škálování modulu runtime.

Úplnou sadu podporovaných nastavení host.json pro trigger Kafka najdete v tématu host.json nastavení.

Propojení

Všechny informace o připojení vyžadované triggery a vazbami by se měly udržovat v nastavení aplikace, a ne v definicích vazeb v kódu. To platí pro přihlašovací údaje, které by nikdy neměly být uloženy ve vašem kódu.

Důležité

Nastavení přihlašovacích údajů musí odkazovat na nastavení aplikace. Nezakódujte v kódu ani v konfiguračních souborech pevně zakódujte přihlašovací údaje. Při místním spuštění použijte local.settings.json soubor pro přihlašovací údaje a nepublikujte local.settings.json soubor.

Při připojování ke spravovanému clusteru Kafka poskytovanému Confluentem v Azure se ujistěte, že ve vašem triggeru nebo vazbě jsou nastavené následující ověřovací přihlašovací údaje pro vaše prostředí Confluent Cloud:

Nastavení Doporučená hodnota Popis
BrokerList BootstrapServer Nastavení aplikace s názvem BootstrapServer obsahuje hodnotu serveru bootstrap nalezené na stránce nastavení confluent cloudu. Hodnota se podobá xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Uživatelské jméno ConfluentCloudUsername Nastavení aplikace s názvem ConfluentCloudUsername obsahuje přístupový klíč rozhraní API z webu Confluent Cloud.
Heslo ConfluentCloudPassword Nastavení aplikace s názvem ConfluentCloudPassword obsahuje tajný klíč rozhraní API získaný z webu Confluent Cloud.

Řetězcové hodnoty, které použijete pro tato nastavení, musí být přítomné jako nastavení aplikace v Azure nebo v Values kolekci v souboru local.settings.json během místního vývoje.

Měli byste také nastavit , ProtocolAuthenticationModea SslCaLocation v definicích vazeb.

Další kroky