Delen via


Apache Kafka-uitvoerbinding voor Azure Functions

Met de uitvoerbinding kan een Azure Functions-app berichten schrijven naar een Kafka-onderwerp.

Belangrijk

Kafka-bindingen zijn alleen beschikbaar voor Functions in het Elastic Premium-abonnement en het Toegewezen (App Service)-abonnement. Ze worden alleen ondersteund op versie 3.x en latere versie van de Functions-runtime.

Opmerking

Het gebruik van de binding is afhankelijk van de C#-modaliteit die wordt gebruikt in uw functie-app. Dit kan een van de volgende zijn:

Een geïsoleerde werkprocesklassebibliotheek gecompileerde C#-functie wordt uitgevoerd in een proces dat is geïsoleerd van de runtime.

De kenmerken die u gebruikt, zijn afhankelijk van de specifieke gebeurtenisprovider.

In het volgende voorbeeld ziet u een aangepast retourtype, dat MultipleOutputTypebestaat uit een HTTP-antwoord en een Kafka-uitvoer.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

In de klasse MultipleOutputTypeKevent is de uitvoerbindingsvariabele voor de Kafka-binding.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Als u een batch gebeurtenissen wilt verzenden, geeft u een tekenreeksmatrix door aan het uitvoertype, zoals wordt weergegeven in het volgende voorbeeld:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

De tekenreeksmatrix wordt gedefinieerd als Kevents eigenschap in de klasse, waarop de uitvoerbinding is gedefinieerd:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Met de volgende functie worden headers toegevoegd aan de Kafka-uitvoergegevens:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Zie de Kafka-extensieopslagplaats voor een volledige set werkende .NET-voorbeelden.

De specifieke eigenschappen van het function.json-bestand zijn afhankelijk van uw gebeurtenisprovider, die in deze voorbeelden Confluent of Azure Event Hubs zijn. In de volgende voorbeelden ziet u een Kafka-uitvoerbinding voor een functie die wordt geactiveerd door een HTTP-aanvraag en gegevens van de aanvraag naar het Kafka-onderwerp verzendt.

De volgende function.json definieert de trigger voor de specifieke provider in deze voorbeelden:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

Met de volgende code wordt vervolgens een bericht naar het onderwerp verzonden:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

Met de volgende code worden meerdere berichten als een matrix naar hetzelfde onderwerp verzonden:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

In het volgende voorbeeld ziet u hoe u een gebeurtenisbericht met kopteksten naar hetzelfde Kafka-onderwerp verzendt:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Zie de Kafka-extensieopslagplaats voor een volledige set werkende JavaScript-voorbeelden.

De specifieke eigenschappen van het function.json-bestand zijn afhankelijk van uw gebeurtenisprovider, die in deze voorbeelden Confluent of Azure Event Hubs zijn. In de volgende voorbeelden ziet u een Kafka-uitvoerbinding voor een functie die wordt geactiveerd door een HTTP-aanvraag en gegevens van de aanvraag naar het Kafka-onderwerp verzendt.

De volgende function.json definieert de trigger voor de specifieke provider in deze voorbeelden:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

Met de volgende code wordt vervolgens een bericht naar het onderwerp verzonden:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Met de volgende code worden meerdere berichten als een matrix naar hetzelfde onderwerp verzonden:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

In het volgende voorbeeld ziet u hoe u een gebeurtenisbericht met kopteksten naar hetzelfde Kafka-onderwerp verzendt:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Zie de Kafka-extensieopslagplaats voor een volledige set werkende PowerShell-voorbeelden.

De specifieke eigenschappen van het function.json-bestand zijn afhankelijk van uw gebeurtenisprovider, die in deze voorbeelden Confluent of Azure Event Hubs zijn. In de volgende voorbeelden ziet u een Kafka-uitvoerbinding voor een functie die wordt geactiveerd door een HTTP-aanvraag en gegevens van de aanvraag naar het Kafka-onderwerp verzendt.

De volgende function.json definieert de trigger voor de specifieke provider in deze voorbeelden:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

Met de volgende code wordt vervolgens een bericht naar het onderwerp verzonden:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

Met de volgende code worden meerdere berichten als een matrix naar hetzelfde onderwerp verzonden:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

In het volgende voorbeeld ziet u hoe u een gebeurtenisbericht met kopteksten naar hetzelfde Kafka-onderwerp verzendt:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

Zie de Kafka-extensieopslagplaats voor een volledige set werkende Python-voorbeelden.

De aantekeningen die u gebruikt om de uitvoerbinding te configureren, zijn afhankelijk van de specifieke gebeurtenisprovider.

Met de volgende functie wordt een bericht verzonden naar het Kafka-onderwerp.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

In het volgende voorbeeld ziet u hoe u meerdere berichten naar een Kafka-onderwerp verzendt.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

In dit voorbeeld wordt de parameter uitvoerbinding gewijzigd in tekenreeksmatrix.

In het laatste voorbeeld worden deze KafkaEntity en KafkaHeader klassen gebruikt:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

Met de volgende voorbeeldfunctie wordt een bericht met kopteksten naar een Kafka-onderwerp verzonden.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Zie de Kafka-extensieopslagplaats voor een volledige set werkende Java-voorbeelden voor Confluent.

Kenmerken

Zowel in-process als geïsoleerde werkproces C#-bibliotheken gebruiken het Kafka kenmerk om de functietrigger te definiëren.

In de volgende tabel worden de eigenschappen uitgelegd die u kunt instellen met behulp van dit kenmerk:

Parameter Description
BrokerList (Vereist) De lijst met Kafka-brokers waarnaar de uitvoer wordt verzonden. Zie Verbindingen voor meer informatie.
Onderwerp (Vereist) Het onderwerp waarnaar de uitvoer wordt verzonden.
AvroSchema (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol.
MaxMessageBytes (Optioneel) De maximale grootte van het uitvoerbericht dat wordt verzonden (in MB), met een standaardwaarde van 1.
BatchSize (Optioneel) Maximum aantal berichten dat is gebatcheerd in één berichtenset, met een standaardwaarde van 10000.
EnableIdempotentie (Optioneel) Wanneer deze is ingesteld trueop , garandeert u dat berichten exact eenmaal en in de oorspronkelijke productievolgorde worden geproduceerd, met een standaardwaarde van false
MessageTimeoutMs (Optioneel) De time-out van het lokale bericht, in milliseconden. Deze waarde wordt alleen lokaal afgedwongen en beperkt de tijd die een geproduceerd bericht wacht op een geslaagde bezorging, met een standaardwaarde 300000. Een tijd van 0 is oneindig. Deze waarde is de maximale tijd die wordt gebruikt voor het bezorgen van een bericht (inclusief nieuwe pogingen). Bezorgingsfout treedt op wanneer het aantal nieuwe pogingen of de time-out van het bericht wordt overschreden.
RequestTimeoutMs (Optioneel) De bevestigingstime-out van de uitvoeraanvraag, in milliseconden, met een standaardwaarde van 5000.
MaxRetries (Optioneel) Het aantal keren dat een mislukt bericht opnieuw moet worden verzonden, met een standaardwaarde van 2. Opnieuw proberen kan leiden tot opnieuw ordenen, tenzij EnableIdempotence is ingesteld op true.
AuthenticationMode (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn Gssapi, Plain (standaard), ScramSha256. ScramSha512
Gebruikersnaam (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie.
Wachtwoord (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie.
Protocol (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn plaintext (standaard), ssl, . sasl_sslsasl_plaintext
SslCaLocation (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker.
SslCertificateLocation (Optioneel) Pad naar het certificaat van de client.
SslKeyLocation (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie.
SslKeyPassword (Optioneel) Wachtwoord voor het certificaat van de client.

Aantekeningen

Met de KafkaOutput aantekening kunt u een functie maken die naar een specifiek onderwerp schrijft. Ondersteunde opties omvatten de volgende elementen:

Element Description
name De naam van de variabele die de brokergegevens in functiecode vertegenwoordigt.
brokerList (Vereist) De lijst met Kafka-brokers waarnaar de uitvoer wordt verzonden. Zie Verbindingen voor meer informatie.
onderwerp (Vereist) Het onderwerp waarnaar de uitvoer wordt verzonden.
Datatype Definieert hoe Functions de parameterwaarde verwerkt. De waarde wordt standaard verkregen als een tekenreeks en Functions probeert de tekenreeks te deserialiseren naar een echt normaal oud Java-object (POJO). Wanneer string, wordt de invoer behandeld als alleen een tekenreeks. Wanneer binary, het bericht wordt ontvangen als binaire gegevens en Functions probeert het te deserialiseren naar een werkelijke parametertype byte[].
avroSchema (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol. (Momenteel niet ondersteund voor Java.)
maxMessageBytes (Optioneel) De maximale grootte van het uitvoerbericht dat wordt verzonden (in MB), met een standaardwaarde van 1.
batchSize (Optioneel) Maximum aantal berichten dat is gebatcheerd in één berichtenset, met een standaardwaarde van 10000.
enableIdempotence (Optioneel) Wanneer deze is ingesteld trueop , garandeert u dat berichten exact eenmaal en in de oorspronkelijke productievolgorde worden geproduceerd, met een standaardwaarde van false
messageTimeoutMs (Optioneel) De time-out van het lokale bericht, in milliseconden. Deze waarde wordt alleen lokaal afgedwongen en beperkt de tijd die een geproduceerd bericht wacht op een geslaagde bezorging, met een standaardwaarde 300000. Een tijd van 0 is oneindig. Dit is de maximale tijd die wordt gebruikt voor het bezorgen van een bericht (inclusief nieuwe pogingen). Bezorgingsfout treedt op wanneer het aantal nieuwe pogingen of de time-out van het bericht wordt overschreden.
requestTimeoutMs (Optioneel) De bevestigingstime-out van de uitvoeraanvraag, in milliseconden, met een standaardwaarde van 5000.
maxRetries (Optioneel) Het aantal keren dat een mislukt bericht opnieuw moet worden verzonden, met een standaardwaarde van 2. Opnieuw proberen kan leiden tot opnieuw ordenen, tenzij EnableIdempotence is ingesteld op true.
authenticationMode (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn Gssapi, Plain (standaard), ScramSha256. ScramSha512
gebruikersnaam (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie.
password (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie.
protocol (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn plaintext (standaard), ssl, . sasl_sslsasl_plaintext
sslCaLocation (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker.
sslCertificateLocation (Optioneel) Pad naar het certificaat van de client.
sslKeyLocation (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie.
sslKeyPassword (Optioneel) Wachtwoord voor het certificaat van de client.

Configuratie

In de volgende tabel worden de bindingsconfiguratie-eigenschappen uitgelegd die u in het function.json-bestand hebt ingesteld.

eigenschap function.json Beschrijving
type Moet worden ingesteld op kafka.
direction Moet worden ingesteld op out.
name De naam van de variabele die de brokergegevens in functiecode vertegenwoordigt.
brokerList (Vereist) De lijst met Kafka-brokers waarnaar de uitvoer wordt verzonden. Zie Verbindingen voor meer informatie.
onderwerp (Vereist) Het onderwerp waarnaar de uitvoer wordt verzonden.
avroSchema (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol.
maxMessageBytes (Optioneel) De maximale grootte van het uitvoerbericht dat wordt verzonden (in MB), met een standaardwaarde van 1.
batchSize (Optioneel) Maximum aantal berichten dat is gebatcheerd in één berichtenset, met een standaardwaarde van 10000.
enableIdempotence (Optioneel) Wanneer deze is ingesteld trueop , garandeert u dat berichten exact eenmaal en in de oorspronkelijke productievolgorde worden geproduceerd, met een standaardwaarde van false
messageTimeoutMs (Optioneel) De time-out van het lokale bericht, in milliseconden. Deze waarde wordt alleen lokaal afgedwongen en beperkt de tijd die een geproduceerd bericht wacht op een geslaagde bezorging, met een standaardwaarde 300000. Een tijd van 0 is oneindig. Dit is de maximale tijd die wordt gebruikt voor het bezorgen van een bericht (inclusief nieuwe pogingen). Bezorgingsfout treedt op wanneer het aantal nieuwe pogingen of de time-out van het bericht wordt overschreden.
requestTimeoutMs (Optioneel) De bevestigingstime-out van de uitvoeraanvraag, in milliseconden, met een standaardwaarde van 5000.
maxRetries (Optioneel) Het aantal keren dat een mislukt bericht opnieuw moet worden verzonden, met een standaardwaarde van 2. Opnieuw proberen kan leiden tot opnieuw ordenen, tenzij EnableIdempotence is ingesteld op true.
authenticationMode (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn Gssapi, Plain (standaard), ScramSha256. ScramSha512
gebruikersnaam (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie.
password (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie.
protocol (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn plaintext (standaard), ssl, . sasl_sslsasl_plaintext
sslCaLocation (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker.
sslCertificateLocation (Optioneel) Pad naar het certificaat van de client.
sslKeyLocation (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie.
sslKeyPassword (Optioneel) Wachtwoord voor het certificaat van de client.

Gebruik

Zowel sleutels als waardentypen worden ondersteund met ingebouwde Avro - en Protobuf-serialisatie .

De offset, partitie en tijdstempel voor de gebeurtenis worden tijdens runtime gegenereerd. Alleen waarde en headers kunnen in de functie worden ingesteld. Het onderwerp is ingesteld in de function.json.

Zorg ervoor dat u toegang hebt tot het Kafka-onderwerp waarnaar u wilt schrijven. U configureert de binding met toegangs- en verbindingsreferenties voor het Kafka-onderwerp.

In een Premium-abonnement moet u runtimeschaalbewaking inschakelen voor de Kafka-uitvoer om uit te kunnen schalen naar meerdere exemplaren. Zie Schalen van runtime inschakelen voor meer informatie.

Zie host.json instellingen voor een volledige set ondersteunde host.json instellingen voor de Kafka-trigger.

Connecties

Alle verbindingsgegevens die vereist zijn voor uw triggers en bindingen, moeten worden onderhouden in toepassingsinstellingen en niet in de bindingsdefinities in uw code. Dit geldt voor referenties, die nooit in uw code moeten worden opgeslagen.

Belangrijk

Referentie-instellingen moeten verwijzen naar een toepassingsinstelling. Gebruik geen codereferenties in uw code- of configuratiebestanden. Wanneer u lokaal werkt, gebruikt u het local.settings.json-bestand voor uw referenties en publiceert u het local.settings.json bestand niet.

Wanneer u verbinding maakt met een beheerd Kafka-cluster dat wordt geleverd door Confluent in Azure, moet u ervoor zorgen dat de volgende verificatiereferenties voor uw Confluent Cloud-omgeving zijn ingesteld in uw trigger of binding:

Instelling Aanbevolen waarde Beschrijving
BrokerList BootstrapServer De app-instelling met de naam BootstrapServer bevat de waarde van de bootstrap-server die is gevonden op de pagina met Confluent Cloud-instellingen. De waarde lijkt op xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Gebruikersnaam ConfluentCloudUsername App-instelling met de naam ConfluentCloudUsername bevat de API-toegangssleutel van de Confluent Cloud-website.
Wachtwoord ConfluentCloudPassword App-instelling met de naam ConfluentCloudPassword bevat het API-geheim dat is verkregen van de Confluent Cloud-website.

De tekenreekswaarden die u voor deze instellingen gebruikt, moeten aanwezig zijn als toepassingsinstellingen in Azure of in de Values verzameling in het local.settings.json-bestand tijdens lokale ontwikkeling.

U moet ook de Protocol, AuthenticationModeen SslCaLocation in uw bindingsdefinities instellen.

Volgende stappen