Apache Kafka-utdatabindning för Azure Functions

Med utdatabindningen kan en Azure Functions-app skriva meddelanden till ett Kafka-ämne.

Viktigt!

Kafka-bindningar är endast tillgängliga för Functions i planen Elastic Premium Plan and Dedicated (App Service). De stöds endast på version 3.x och senare version av Functions-körningen.

Exempel

Användningen av bindningen beror på vilken C#-modalitet som används i funktionsappen, vilket kan vara något av följande:

Ett isolerat arbetsprocessklassbibliotek kompilerade C#-funktioner körs i en process som är isolerad från körningen.

Vilka attribut du använder beror på den specifika händelseprovidern.

I följande exempel finns en anpassad returtyp som är MultipleOutputType, som består av ett HTTP-svar och en Kafka-utdata.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

I klassen MultipleOutputTypeär Kevent är utdatabindningsvariabeln för Kafka-bindningen.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Skicka en batch med händelser genom att skicka en strängmatris till utdatatypen enligt följande exempel:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

Strängmatrisen definieras som Kevents egenskap för klassen, där utdatabindningen definieras:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Följande funktion lägger till rubriker i Kafka-utdata:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

En fullständig uppsättning fungerande .NET-exempel finns i Kafka-tilläggets lagringsplats.

Kommentar

En motsvarande uppsättning TypeScript-exempel finns i Kafka-tilläggslagringsplatsen

De specifika egenskaperna för function.json-filen beror på din händelseprovider, som i dessa exempel antingen är Confluent eller Azure Event Hubs. I följande exempel visas en Kafka-utdatabindning för en funktion som utlöses av en HTTP-begäran och skickar data från begäran till Kafka-ämnet.

Följande function.json definierar utlösaren för den specifika providern i följande exempel:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

Följande kod skickar sedan ett meddelande till ämnet:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

Följande kod skickar flera meddelanden som en matris till samma ämne:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

I följande exempel visas hur du skickar ett händelsemeddelande med rubriker till samma Kafka-ämne:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

En fullständig uppsättning fungerande JavaScript-exempel finns i Kafka-tilläggets lagringsplats.

De specifika egenskaperna för function.json-filen beror på din händelseprovider, som i dessa exempel antingen är Confluent eller Azure Event Hubs. I följande exempel visas en Kafka-utdatabindning för en funktion som utlöses av en HTTP-begäran och skickar data från begäran till Kafka-ämnet.

Följande function.json definierar utlösaren för den specifika providern i följande exempel:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

Följande kod skickar sedan ett meddelande till ämnet:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Följande kod skickar flera meddelanden som en matris till samma ämne:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

I följande exempel visas hur du skickar ett händelsemeddelande med rubriker till samma Kafka-ämne:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

En fullständig uppsättning fungerande PowerShell-exempel finns i Kafka-tilläggets lagringsplats.

De specifika egenskaperna för function.json-filen beror på din händelseprovider, som i dessa exempel antingen är Confluent eller Azure Event Hubs. I följande exempel visas en Kafka-utdatabindning för en funktion som utlöses av en HTTP-begäran och skickar data från begäran till Kafka-ämnet.

Följande function.json definierar utlösaren för den specifika providern i följande exempel:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

Följande kod skickar sedan ett meddelande till ämnet:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

Följande kod skickar flera meddelanden som en matris till samma ämne:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

I följande exempel visas hur du skickar ett händelsemeddelande med rubriker till samma Kafka-ämne:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

En fullständig uppsättning fungerande Python-exempel finns i Kafka-tilläggslagringsplatsen.

De anteckningar som du använder för att konfigurera utdatabindningen beror på den specifika händelseprovidern.

Följande funktion skickar ett meddelande till Kafka-ämnet.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

I följande exempel visas hur du skickar flera meddelanden till ett Kafka-ämne.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

I det här exemplet ändras utdatabindningsparametern till strängmatris.

Det sista exemplet används för dessa KafkaEntity och KafkaHeader klasser:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

Följande exempelfunktion skickar ett meddelande med rubriker till ett Kafka-ämne.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

En fullständig uppsättning fungerande Java-exempel för Confluent finns i Kafka-tilläggets lagringsplats.

Attribut

C#-bibliotek för både process- och isolerad arbetsprocess använder attributet för att definiera funktionsutlösaren Kafka .

I följande tabell förklaras de egenskaper som du kan ange med det här attributet:

Parameter Description
BrokerList (Krävs) Listan över Kafka-koordinatorer som utdata skickas till. Mer information finns i Anslut ions.
Avsnitt (Krävs) Det ämne som utdata skickas till.
AvroSchema (Valfritt) Schema för en allmän post när du använder Avro-protokollet.
MaxMessageBytes (Valfritt) Den maximala storleken på utdatameddelandet som skickas (i MB), med standardvärdet 1.
BatchSize (Valfritt) Maximalt antal meddelanden som batchats i en enda meddelandeuppsättning, med standardvärdet 10000.
EnableIdempotence (Valfritt) När värdet är inställt truepå garanterar du att meddelanden skapas exakt en gång och i den ursprungliga beställningen, med ett standardvärde på false
MessageTimeoutMs (Valfritt) Tidsgränsen för det lokala meddelandet, i millisekunder. Det här värdet framtvingas endast lokalt och begränsar den tid ett genererat meddelande väntar på en lyckad leverans med standardvärdet 300000. En tid av 0 är oändlig. Det här värdet är den maximala tid som används för att leverera ett meddelande (inklusive återförsök). Leveransfel uppstår när antalet återförsök eller tidsgränsen för meddelanden överskrids.
RequestTimeoutMs (Valfritt) Tidsgränsen för bekräftelse av utdatabegäran, i millisekunder, med standardvärdet 5000.
MaxRetries (Valfritt) Antalet gånger som ett meddelande skickas igen, med standardvärdet 2. Omförsök kan orsaka omordning, såvida inte EnableIdempotence är inställt på true.
AuthenticationMode (Valfritt) Autentiseringsläget när du använder SASL-autentisering (Simple Authentication and Security Layer). De värden som stöds är Gssapi, Plain (standard), ScramSha256, . ScramSha512
Användarnamn (Valfritt) Användarnamnet för SASL-autentisering. Stöds inte när AuthenticationMode är Gssapi. Mer information finns i Anslut ions.
Lösenord (Valfritt) Lösenordet för SASL-autentisering. Stöds inte när AuthenticationMode är Gssapi. Mer information finns i Anslut ions.
Protokoll (Valfritt) Säkerhetsprotokollet som används vid kommunikation med asynkrona meddelandeköer. De värden som stöds är plaintext (standard), ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (Valfritt) Sökväg till CA-certifikatfilen för att verifiera asynkron meddelandekös certifikat.
SslCertificateLocation (Valfritt) Sökväg till klientens certifikat.
SslKeyLocation (Valfritt) Sökväg till klientens privata nyckel (PEM) som används för autentisering.
SslKeyPassword (Valfritt) Lösenord för klientens certifikat.

Kommentarer

Med anteckningen KafkaOutput kan du skapa en funktion som skriver till ett visst ämne. Alternativen som stöds omfattar följande element:

Element Description
Namn Namnet på variabeln som representerar de asynkrona data i funktionskoden.
brokerList (Krävs) Listan över Kafka-koordinatorer som utdata skickas till. Mer information finns i Anslut ions.
Avsnitt (Krävs) Det ämne som utdata skickas till.
Datatyp Definierar hur Functions hanterar parametervärdet. Som standard hämtas värdet som en sträng och Functions försöker deserialisera strängen till det faktiska oformaterade Java-objektet (POJO). När stringbehandlas indata som bara en sträng. När binarytas meddelandet emot som binära data och Functions försöker deserialisera det till en faktisk parametertyp byte[].
avroSchema (Valfritt) Schema för en allmän post när du använder Avro-protokollet.
maxMessageBytes (Valfritt) Den maximala storleken på utdatameddelandet som skickas (i MB), med standardvärdet 1.
batchSize (Valfritt) Maximalt antal meddelanden som batchats i en enda meddelandeuppsättning, med standardvärdet 10000.
enableIdempotence (Valfritt) När värdet är inställt truepå garanterar du att meddelanden skapas exakt en gång och i den ursprungliga beställningen, med ett standardvärde på false
messageTimeoutMs (Valfritt) Tidsgränsen för det lokala meddelandet, i millisekunder. Det här värdet framtvingas endast lokalt och begränsar den tid ett genererat meddelande väntar på en lyckad leverans med standardvärdet 300000. En tid av 0 är oändlig. Det här är den maximala tid som används för att leverera ett meddelande (inklusive återförsök). Leveransfel uppstår när antalet återförsök eller tidsgränsen för meddelanden överskrids.
requestTimeoutMs (Valfritt) Tidsgränsen för bekräftelse av utdatabegäran, i millisekunder, med standardvärdet 5000.
maxRetries (Valfritt) Antalet gånger som ett meddelande skickas igen, med standardvärdet 2. Omförsök kan orsaka omordning, såvida inte EnableIdempotence är inställt på true.
authenticationMode (Valfritt) Autentiseringsläget när du använder SASL-autentisering (Simple Authentication and Security Layer). De värden som stöds är Gssapi, Plain (standard), ScramSha256, . ScramSha512
Användarnamn (Valfritt) Användarnamnet för SASL-autentisering. Stöds inte när AuthenticationMode är Gssapi. Mer information finns i Anslut ions.
Lösenord (Valfritt) Lösenordet för SASL-autentisering. Stöds inte när AuthenticationMode är Gssapi. Mer information finns i Anslut ions.
Protokollet (Valfritt) Säkerhetsprotokollet som används vid kommunikation med asynkrona meddelandeköer. De värden som stöds är plaintext (standard), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Valfritt) Sökväg till CA-certifikatfilen för att verifiera asynkron meddelandekös certifikat.
sslCertificateLocation (Valfritt) Sökväg till klientens certifikat.
sslKeyLocation (Valfritt) Sökväg till klientens privata nyckel (PEM) som används för autentisering.
sslKeyPassword (Valfritt) Lösenord för klientens certifikat.

Konfiguration

I följande tabell förklaras de bindningskonfigurationsegenskaper som du anger i filen function.json .

egenskapen function.json beskrivning
typ Måste anges till kafka.
riktning Måste anges till out.
Namn Namnet på variabeln som representerar de asynkrona data i funktionskoden.
brokerList (Krävs) Listan över Kafka-koordinatorer som utdata skickas till. Mer information finns i Anslut ions.
Avsnitt (Krävs) Det ämne som utdata skickas till.
avroSchema (Valfritt) Schema för en allmän post när du använder Avro-protokollet.
maxMessageBytes (Valfritt) Den maximala storleken på utdatameddelandet som skickas (i MB), med standardvärdet 1.
batchSize (Valfritt) Maximalt antal meddelanden som batchats i en enda meddelandeuppsättning, med standardvärdet 10000.
enableIdempotence (Valfritt) När värdet är inställt truepå garanterar du att meddelanden skapas exakt en gång och i den ursprungliga beställningen, med ett standardvärde på false
messageTimeoutMs (Valfritt) Tidsgränsen för det lokala meddelandet, i millisekunder. Det här värdet framtvingas endast lokalt och begränsar den tid ett genererat meddelande väntar på en lyckad leverans med standardvärdet 300000. En tid av 0 är oändlig. Det här är den maximala tid som används för att leverera ett meddelande (inklusive återförsök). Leveransfel uppstår när antalet återförsök eller tidsgränsen för meddelanden överskrids.
requestTimeoutMs (Valfritt) Tidsgränsen för bekräftelse av utdatabegäran, i millisekunder, med standardvärdet 5000.
maxRetries (Valfritt) Antalet gånger som ett meddelande skickas igen, med standardvärdet 2. Omförsök kan orsaka omordning, såvida inte EnableIdempotence är inställt på true.
authenticationMode (Valfritt) Autentiseringsläget när du använder SASL-autentisering (Simple Authentication and Security Layer). De värden som stöds är Gssapi, Plain (standard), ScramSha256, . ScramSha512
Användarnamn (Valfritt) Användarnamnet för SASL-autentisering. Stöds inte när AuthenticationMode är Gssapi. Mer information finns i Anslut ions.
Lösenord (Valfritt) Lösenordet för SASL-autentisering. Stöds inte när AuthenticationMode är Gssapi. Mer information finns i Anslut ions.
Protokollet (Valfritt) Säkerhetsprotokollet som används vid kommunikation med asynkrona meddelandeköer. De värden som stöds är plaintext (standard), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Valfritt) Sökväg till CA-certifikatfilen för att verifiera asynkron meddelandekös certifikat.
sslCertificateLocation (Valfritt) Sökväg till klientens certifikat.
sslKeyLocation (Valfritt) Sökväg till klientens privata nyckel (PEM) som används för autentisering.
sslKeyPassword (Valfritt) Lösenord för klientens certifikat.

Användning

Både nycklar och värdetyper stöds med inbyggd Avro- och Protobuf-serialisering.

Förskjutningen, partitionen och tidsstämpeln för händelsen genereras vid körning. Endast värde och rubriker kan anges i funktionen. Ämnet anges i function.json.

Se till att ha åtkomst till det Kafka-ämne som du försöker skriva till. Du konfigurerar bindningen med åtkomst- och anslutningsautentiseringsuppgifter till Kafka-ämnet.

I en Premium-plan måste du aktivera körningsskalningsövervakning för att Kafka-utdata ska kunna skalas ut till flera instanser. Mer information finns i Aktivera körningsskalning.

En fullständig uppsättning host.json inställningar som stöds för Kafka-utlösaren finns i host.json inställningar.

anslutningar

All anslutningsinformation som krävs av dina utlösare och bindningar ska bevaras i programinställningarna och inte i bindningsdefinitionerna i koden. Detta gäller för autentiseringsuppgifter, som aldrig ska lagras i koden.

Viktigt!

Inställningar för autentiseringsuppgifter måste referera till en programinställning. Hårdkoda inte autentiseringsuppgifter i koden eller konfigurationsfilerna. När du kör lokalt använder du filen local.settings.json för dina autentiseringsuppgifter och publicerar inte local.settings.json-filen.

När du ansluter till ett hanterat Kafka-kluster som tillhandahålls av Confluent i Azure kontrollerar du att följande autentiseringsuppgifter för Confluent Cloud-miljön anges i utlösaren eller bindningen:

Inställning Rekommenderat värde beskrivning
BrokerList BootstrapServer Appinställningen med namnet BootstrapServer innehåller värdet för bootstrap-servern som finns på sidan Inställningar för Confluent Cloud. Värdet liknar xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Användarnamn ConfluentCloudUsername Appinställningen med namnet ConfluentCloudUsername innehåller API-åtkomstnyckeln från Confluent Cloud-webbplatsen.
Lösenord ConfluentCloudPassword Appinställningen med namnet ConfluentCloudPassword innehåller API-hemligheten som hämtats från Confluent Cloud-webbplatsen.

Strängvärdena som du använder för de här inställningarna måste finnas som programinställningar i Azure eller i Values samlingen i filen local.settings.json under den lokala utvecklingen.

Du bör också ange Protocol, AuthenticationModeoch SslCaLocation i bindningsdefinitionerna.

Nästa steg