Partager via


Liaison de sortie Apache Kafka pour Azure Functions

La liaison de sortie permet à une application Azure Functions d’écrire des messages dans une rubrique Kafka.

Important

Les liaisons Kafka sont disponibles uniquement pour les fonctions sur le plan Elastic Premium et le plan (App Service) dédié. Elles sont uniquement prises en charge sur la version 3.x et les versions ultérieures du runtime Functions.

Exemple

L’utilisation de la liaison dépend de la modalité C# utilisée dans votre application de fonction, qui peut être l’une des suivantes :

Une bibliothèque de classes de processus Worker isolé est une fonction C# compilée exécutée dans un processus Worker isolé du runtime.

Les attributs que vous utilisez dépendent du fournisseur d’événements.

L’exemple suivant contient un type de retour personnalisé, MultipleOutputType, qui se compose d’une réponse HTTP et d’une sortie Kafka.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

Dans la classe MultipleOutputType, Kevent correspond à la variable de liaison de sortie de la liaison Kafka.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Pour envoyer un lot d’événements, transmettez un tableau de chaînes au type de sortie, comme illustré dans l’exemple suivant :

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

Le tableau de chaînes est défini en tant que propriété Kevents sur la classe sur laquelle la liaison de sortie est définie :

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

La fonction suivante ajoute des en-têtes aux données de sortie Kafka :

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Pour obtenir un ensemble complet d’exemples .NET opérationnels, consultez le référentiel d’extensions Kafka.

Notes

Pour obtenir un ensemble équivalent d’exemples TypeScript, consultez le référentiel d’extensions Kafka.

Les propriétés du fichier function.json dépendent de votre fournisseur d’événements. Dans ces exemples, il s’agit de Confluent ou d’Azure Event Hubs. Les exemples suivants montrent la liaison de sortie Kafka d’une fonction qui est déclenchée par une requête HTTP et envoie les données de la requête à la rubrique Kafka.

Dans les exemples ci-dessous, le fichier function.json suivant définit le déclencheur du fournisseur concerné :

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

Le code suivant envoie alors un message à la rubrique :

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

Le code suivant envoie plusieurs messages sous forme de tableau à la même rubrique :

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

L’exemple suivant montre comment envoyer un message d’événement avec en-têtes à la même rubrique Kafka :

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Pour obtenir un ensemble complet d’exemples JavaScript opérationnels, consultez le référentiel d’extensions Kafka.

Les propriétés du fichier function.json dépendent de votre fournisseur d’événements. Dans ces exemples, il s’agit de Confluent ou d’Azure Event Hubs. Les exemples suivants montrent la liaison de sortie Kafka d’une fonction qui est déclenchée par une requête HTTP et envoie les données de la requête à la rubrique Kafka.

Dans les exemples ci-dessous, le fichier function.json suivant définit le déclencheur du fournisseur concerné :

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

Le code suivant envoie alors un message à la rubrique :

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Le code suivant envoie plusieurs messages sous forme de tableau à la même rubrique :

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

L’exemple suivant montre comment envoyer un message d’événement avec en-têtes à la même rubrique Kafka :

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Pour obtenir un ensemble complet d’exemples PowerShell opérationnels, consultez le référentiel d’extensions Kafka.

Les propriétés du fichier function.json dépendent de votre fournisseur d’événements. Dans ces exemples, il s’agit de Confluent ou d’Azure Event Hubs. Les exemples suivants montrent la liaison de sortie Kafka d’une fonction qui est déclenchée par une requête HTTP et envoie les données de la requête à la rubrique Kafka.

Dans les exemples ci-dessous, le fichier function.json suivant définit le déclencheur du fournisseur concerné :

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

Le code suivant envoie alors un message à la rubrique :

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

Le code suivant envoie plusieurs messages sous forme de tableau à la même rubrique :

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

L’exemple suivant montre comment envoyer un message d’événement avec en-têtes à la même rubrique Kafka :

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

Pour obtenir un ensemble complet d’exemples Python opérationnels, consultez le référentiel d’extensions Kafka.

Les annotations que vous utilisez pour configurer la liaison de sortie dépendent du fournisseur d’événements.

La fonction suivante envoie un message à la rubrique Kafka.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

L’exemple suivant montre comment envoyer plusieurs messages à une rubrique Kafka.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

Dans cet exemple, le paramètre de liaison de sortie est remplacé par un tableau de chaînes.

Le dernier exemple utilise ces classes KafkaEntity et KafkaHeader :

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

L’exemple de fonction suivant envoie un message avec en-têtes à une rubrique Kafka.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Pour obtenir un ensemble complet d’exemples Java opérationnels pour Confluent, consultez le référentiel d’extensions Kafka.

Attributs

Les bibliothèques C# in-process et de processus Worker isolés utilisent l’attribut Kafka pour définir le déclencheur de fonction.

Le tableau suivant décrit les propriétés que vous pouvez définir à l’aide de cet attribut :

Paramètre Description
BrokerList (Obligatoire) Liste des répartiteurs Kafka auxquels la sortie est envoyée. Pour plus d’informations, consultez Connexions.
Rubrique (Obligatoire) Rubrique à laquelle la sortie est envoyée.
AvroSchema (Facultatif) Schéma d’un enregistrement générique lors de l’utilisation du protocole Avro.
MaxMessageBytes (Facultatif) Taille maximale du message de sortie envoyé (en Mo), définie par défaut sur 1.
BatchSize (Facultatif) Nombre maximal de messages regroupés au sein d’un même lot, défini par défaut sur 10000.
EnableIdempotence (Facultatif) Lorsque ce paramètre est défini sur true, les messages sont générés exactement une fois et dans l’ordre d’origine, avec une valeur par défaut de false.
MessageTimeoutMs (Facultatif) Délai d’expiration du message local, en millisecondes. Cette valeur, définie par défaut sur 300000, n’est appliquée que localement et limite le temps d’attente de remise d’un message généré. 0 correspond à un délai infini. Cette valeur représente le délai maximal de remise d’un message (nouvelles tentatives comprises). Une erreur de remise se produit lorsque le nombre de tentatives ou le délai d’expiration du message sont dépassés.
RequestTimeoutMs (Facultatif) Délai d’expiration de l’accusé de réception, en millisecondes, défini par défaut sur 5000.
MaxRetries (Facultatif) Nombre de tentatives d’envoi d’un message en échec, défini par défaut sur 2. Une nouvelle tentative peut entraîner une réorganisation, sauf si EnableIdempotence est défini sur true.
AuthenticationMode (Facultatif) Mode d’authentification lors de l’utilisation de l’authentification SASL (Simple Authentication and Security Layer). Les valeurs prises en charge sont Gssapi, Plain (par défaut), ScramSha256, ScramSha512.
Nom d’utilisateur (Facultatif) Nom d’utilisateur pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
Mot de passe (Facultatif) Mot de passe pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
Protocole (Facultatif) Protocole de sécurité utilisé lors de la communication avec les répartiteurs. Les valeurs prises en charge sont plaintext, ssl (par défaut), sasl_plaintext, sasl_ssl.
SslCaLocation (Facultatif) Chemin du fichier de certificat d’autorité de certification pour vérifier le certificat du répartiteur.
SslCertificateLocation (Facultatif) Chemin du certificat du client.
SslKeyLocation (Facultatif) Chemin de la clé privée du client (PEM) utilisée pour l’authentification.
SslKeyPassword (Facultatif) Mot de passe pour le certificat du client.

Annotations

L’annotation KafkaOutput vous permet de créer une fonction qui écrit dans une rubrique spécifique. Les options prises en charge incluent les éléments suivants :

Élément Description
name Nom de la variable qui représente les données réparties dans le code de la fonction.
brokerList (Obligatoire) Liste des répartiteurs Kafka auxquels la sortie est envoyée. Pour plus d’informations, consultez Connexions.
topic (Obligatoire) Rubrique à laquelle la sortie est envoyée.
dataType Définit comment Functions gère la valeur du paramètre. Par défaut, la valeur est obtenue sous la forme d’une chaîne, et Functions tente de désérialiser la chaîne en objet POJO (Plain-Old Java Object) réel. Quand string, l’entrée est traitée comme une simple chaîne. Quand binary, le message est reçu sous forme de données binaires, et Functions tente de le désérialiser en type de paramètre réel byte[].
avroSchema (Facultatif) Schéma d’un enregistrement générique lors de l’utilisation du protocole Avro. (Actuellement non pris en charge pour Java.)
maxMessageBytes (Facultatif) Taille maximale du message de sortie envoyé (en Mo), définie par défaut sur 1.
batchSize (Facultatif) Nombre maximal de messages regroupés au sein d’un même lot, défini par défaut sur 10000.
enableIdempotence (Facultatif) Lorsque ce paramètre est défini sur true, les messages sont générés exactement une fois et dans l’ordre d’origine, avec une valeur par défaut de false.
messageTimeoutMs (Facultatif) Délai d’expiration du message local, en millisecondes. Cette valeur, définie par défaut sur 300000, n’est appliquée que localement et limite le temps d’attente de remise d’un message généré. 0 correspond à un délai infini. Cette valeur représente le délai maximal de remise d’un message (nouvelles tentatives comprises). Une erreur de remise se produit lorsque le nombre de tentatives ou le délai d’expiration du message sont dépassés.
requestTimeoutMs (Facultatif) Délai d’expiration de l’accusé de réception, en millisecondes, défini par défaut sur 5000.
maxRetries (Facultatif) Nombre de tentatives d’envoi d’un message en échec, défini par défaut sur 2. Une nouvelle tentative peut entraîner une réorganisation, sauf si EnableIdempotence est défini sur true.
authenticationMode (Facultatif) Mode d’authentification lors de l’utilisation de l’authentification SASL (Simple Authentication and Security Layer). Les valeurs prises en charge sont Gssapi, Plain (par défaut), ScramSha256, ScramSha512.
username (Facultatif) Nom d’utilisateur pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
mot de passe (Facultatif) Mot de passe pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
protocol (Facultatif) Protocole de sécurité utilisé lors de la communication avec les répartiteurs. Les valeurs prises en charge sont plaintext, ssl (par défaut), sasl_plaintext, sasl_ssl.
sslCaLocation (Facultatif) Chemin du fichier de certificat d’autorité de certification pour vérifier le certificat du répartiteur.
sslCertificateLocation (Facultatif) Chemin du certificat du client.
sslKeyLocation (Facultatif) Chemin de la clé privée du client (PEM) utilisée pour l’authentification.
sslKeyPassword (Facultatif) Mot de passe pour le certificat du client.

Configuration

Le tableau suivant décrit les propriétés de configuration de liaison que vous définissez dans le fichier function.json.

Propriété function.json Description
type Cette propriété doit être définie sur kafka.
direction Cette propriété doit être définie sur out.
name Nom de la variable qui représente les données réparties dans le code de la fonction.
brokerList (Obligatoire) Liste des répartiteurs Kafka auxquels la sortie est envoyée. Pour plus d’informations, consultez Connexions.
topic (Obligatoire) Rubrique à laquelle la sortie est envoyée.
avroSchema (Facultatif) Schéma d’un enregistrement générique lors de l’utilisation du protocole Avro.
maxMessageBytes (Facultatif) Taille maximale du message de sortie envoyé (en Mo), définie par défaut sur 1.
batchSize (Facultatif) Nombre maximal de messages regroupés au sein d’un même lot, défini par défaut sur 10000.
enableIdempotence (Facultatif) Lorsque ce paramètre est défini sur true, les messages sont générés exactement une fois et dans l’ordre d’origine, avec une valeur par défaut de false.
messageTimeoutMs (Facultatif) Délai d’expiration du message local, en millisecondes. Cette valeur, définie par défaut sur 300000, n’est appliquée que localement et limite le temps d’attente de remise d’un message généré. 0 correspond à un délai infini. Cette valeur représente le délai maximal de remise d’un message (nouvelles tentatives comprises). Une erreur de remise se produit lorsque le nombre de tentatives ou le délai d’expiration du message sont dépassés.
requestTimeoutMs (Facultatif) Délai d’expiration de l’accusé de réception, en millisecondes, défini par défaut sur 5000.
maxRetries (Facultatif) Nombre de tentatives d’envoi d’un message en échec, défini par défaut sur 2. Une nouvelle tentative peut entraîner une réorganisation, sauf si EnableIdempotence est défini sur true.
authenticationMode (Facultatif) Mode d’authentification lors de l’utilisation de l’authentification SASL (Simple Authentication and Security Layer). Les valeurs prises en charge sont Gssapi, Plain (par défaut), ScramSha256, ScramSha512.
username (Facultatif) Nom d’utilisateur pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
mot de passe (Facultatif) Mot de passe pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
protocol (Facultatif) Protocole de sécurité utilisé lors de la communication avec les répartiteurs. Les valeurs prises en charge sont plaintext, ssl (par défaut), sasl_plaintext, sasl_ssl.
sslCaLocation (Facultatif) Chemin du fichier de certificat d’autorité de certification pour vérifier le certificat du répartiteur.
sslCertificateLocation (Facultatif) Chemin du certificat du client.
sslKeyLocation (Facultatif) Chemin de la clé privée du client (PEM) utilisée pour l’authentification.
sslKeyPassword (Facultatif) Mot de passe pour le certificat du client.

Utilisation

Les deux types de clés et de valeurs sont pris en charge avec la sérialisation Avro et Protobuf intégrée.

Le décalage, la partition et l’horodatage de l’événement sont générés au moment de l’exécution. Seuls la valeur et les en-têtes peuvent être définis à l’intérieur de la fonction. La rubrique est définie dans le fichier function.json.

Vérifiez que vous avez accès à la rubrique Kafka sur laquelle vous essayez d’écrire. Pour configurer la liaison, vous devez utiliser les informations d’accès et de connexion à la rubrique Kafka.

Dans un plan Premium, vous devez activer la surveillance de la mise à l’échelle du runtime afin que la sortie Kafka puisse effectuer un scale-out vers plusieurs instances. Pour plus d’informations, consultez Activer la mise à l’échelle du runtime.

Pour obtenir un ensemble complet de paramètres host.json pris en charge pour le déclencheur Kafka, consultez Paramètres host.json.

Connexions

Toutes les informations de connexion requises par vos déclencheurs et liaisons doivent être conservées dans des paramètres d’application, et non dans les définitions de liaison de votre code. Cela est valable pour les informations d’identification, qui ne doivent jamais être stockées dans votre code.

Important

Les paramètres d’informations d’identification doivent référencer un paramètre d’application. Ne codez pas d’informations d’identification en dur dans vos fichiers de code ou de configuration. Lors de l’exécution locale, utilisez le fichier local.settings.json pour vos informations d’identification, et ne publiez pas le fichier local.settings.json.

Lors de la connexion à un cluster Kafka managé fourni par Confluent dans Azure, veillez à ce que les informations d’identification d’authentification suivantes pour votre environnement Confluent Cloud soient définies dans votre déclencheur ou liaison :

Paramètre Valeur recommandée Description
BrokerList BootstrapServer Le paramètre d’application nommé BootstrapServer contient la valeur du serveur de démarrage trouvé sur la page de paramètres Confluent Cloud. La valeur est semblable à xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Nom d’utilisateur ConfluentCloudUsername Le paramètre d’application nommé ConfluentCloudUsername contient la clé d’accès à l’API du site web de Confluent Cloud.
Mot de passe ConfluentCloudPassword Le paramètre d’application nommé ConfluentCloudPassword contient le secret de l’API obtenu sur le site web de Confluent Cloud.

Les valeurs de chaîne que vous utilisez pour ces paramètres doivent être présentes en tant que paramètres d’application dans Azure ou dans la collection Values dans le fichier local.settings.json file pendant le développement local.

Vous devez également définir Protocol, AuthenticationMode et SslCaLocation dans vos définitions de liaison.

Étapes suivantes