Apache Kafka-uitvoerbinding voor Azure Functions
Met de uitvoerbinding kan een Azure Functions-app berichten schrijven naar een Kafka-onderwerp.
Belangrijk
Kafka-bindingen zijn alleen beschikbaar voor Functions in het Elastic Premium-abonnement en het Toegewezen (App Service)-abonnement. Ze worden alleen ondersteund op versie 3.x en latere versie van de Functions-runtime.
Opmerking
Het gebruik van de binding is afhankelijk van de C#-modaliteit die wordt gebruikt in uw functie-app. Dit kan een van de volgende zijn:
Een geïsoleerde werkprocesklassebibliotheek gecompileerde C#-functie wordt uitgevoerd in een proces dat is geïsoleerd van de runtime.
De kenmerken die u gebruikt, zijn afhankelijk van de specifieke gebeurtenisprovider.
In het volgende voorbeeld ziet u een aangepast retourtype, dat MultipleOutputType
bestaat uit een HTTP-antwoord en een Kafka-uitvoer.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
In de klasse MultipleOutputType
Kevent
is de uitvoerbindingsvariabele voor de Kafka-binding.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Als u een batch gebeurtenissen wilt verzenden, geeft u een tekenreeksmatrix door aan het uitvoertype, zoals wordt weergegeven in het volgende voorbeeld:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
De tekenreeksmatrix wordt gedefinieerd als Kevents
eigenschap in de klasse, waarop de uitvoerbinding is gedefinieerd:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Met de volgende functie worden headers toegevoegd aan de Kafka-uitvoergegevens:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
Zie de Kafka-extensieopslagplaats voor een volledige set werkende .NET-voorbeelden.
De specifieke eigenschappen van het function.json-bestand zijn afhankelijk van uw gebeurtenisprovider, die in deze voorbeelden Confluent of Azure Event Hubs zijn. In de volgende voorbeelden ziet u een Kafka-uitvoerbinding voor een functie die wordt geactiveerd door een HTTP-aanvraag en gegevens van de aanvraag naar het Kafka-onderwerp verzendt.
De volgende function.json definieert de trigger voor de specifieke provider in deze voorbeelden:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
Met de volgende code wordt vervolgens een bericht naar het onderwerp verzonden:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message);
context.bindings.outputKafkaMessage = message;
context.res = {
// status: 200, /* Defaults to 200 */
body: 'Ok'
};
}
Met de volgende code worden meerdere berichten als een matrix naar hetzelfde onderwerp verzonden:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
context.bindings.outputKafkaMessages = ["one", "two"];
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
In het volgende voorbeeld ziet u hoe u een gebeurtenisbericht met kopteksten naar hetzelfde Kafka-onderwerp verzendt:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = message
? "Message received: " + message + ". The message transfered to the kafka broker."
: "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
Zie de Kafka-extensieopslagplaats voor een volledige set werkende JavaScript-voorbeelden.
De specifieke eigenschappen van het function.json-bestand zijn afhankelijk van uw gebeurtenisprovider, die in deze voorbeelden Confluent of Azure Event Hubs zijn. In de volgende voorbeelden ziet u een Kafka-uitvoerbinding voor een functie die wordt geactiveerd door een HTTP-aanvraag en gegevens van de aanvraag naar het Kafka-onderwerp verzendt.
De volgende function.json definieert de trigger voor de specifieke provider in deze voorbeelden:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
Met de volgende code wordt vervolgens een bericht naar het onderwerp verzonden:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
Met de volgende code worden meerdere berichten als een matrix naar hetzelfde onderwerp verzonden:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
In het volgende voorbeeld ziet u hoe u een gebeurtenisbericht met kopteksten naar hetzelfde Kafka-onderwerp verzendt:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
Zie de Kafka-extensieopslagplaats voor een volledige set werkende PowerShell-voorbeelden.
De specifieke eigenschappen van het function.json-bestand zijn afhankelijk van uw gebeurtenisprovider, die in deze voorbeelden Confluent of Azure Event Hubs zijn. In de volgende voorbeelden ziet u een Kafka-uitvoerbinding voor een functie die wordt geactiveerd door een HTTP-aanvraag en gegevens van de aanvraag naar het Kafka-onderwerp verzendt.
De volgende function.json definieert de trigger voor de specifieke provider in deze voorbeelden:
{
"scriptFile": "main.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"direction": "out",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
Met de volgende code wordt vervolgens een bericht naar het onderwerp verzonden:
import logging
import azure.functions as func
def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
Met de volgende code worden meerdere berichten als een matrix naar hetzelfde onderwerp verzonden:
import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json
def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
outputMessage.set(['one', 'two'])
return 'OK'
In het volgende voorbeeld ziet u hoe u een gebeurtenisbericht met kopteksten naar hetzelfde Kafka-onderwerp verzendt:
import logging
import azure.functions as func
import json
def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
Zie de Kafka-extensieopslagplaats voor een volledige set werkende Python-voorbeelden.
De aantekeningen die u gebruikt om de uitvoerbinding te configureren, zijn afhankelijk van de specifieke gebeurtenisprovider.
Met de volgende functie wordt een bericht verzonden naar het Kafka-onderwerp.
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
In het volgende voorbeeld ziet u hoe u meerdere berichten naar een Kafka-onderwerp verzendt.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
In dit voorbeeld wordt de parameter uitvoerbinding gewijzigd in tekenreeksmatrix.
In het laatste voorbeeld worden deze KafkaEntity
en KafkaHeader
klassen gebruikt:
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
Met de volgende voorbeeldfunctie wordt een bericht met kopteksten naar een Kafka-onderwerp verzonden.
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
Zie de Kafka-extensieopslagplaats voor een volledige set werkende Java-voorbeelden voor Confluent.
Kenmerken
Zowel in-process als geïsoleerde werkproces C#-bibliotheken gebruiken het Kafka
kenmerk om de functietrigger te definiëren.
In de volgende tabel worden de eigenschappen uitgelegd die u kunt instellen met behulp van dit kenmerk:
Parameter | Description |
---|---|
BrokerList | (Vereist) De lijst met Kafka-brokers waarnaar de uitvoer wordt verzonden. Zie Verbindingen voor meer informatie. |
Onderwerp | (Vereist) Het onderwerp waarnaar de uitvoer wordt verzonden. |
AvroSchema | (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol. |
MaxMessageBytes | (Optioneel) De maximale grootte van het uitvoerbericht dat wordt verzonden (in MB), met een standaardwaarde van 1 . |
BatchSize | (Optioneel) Maximum aantal berichten dat is gebatcheerd in één berichtenset, met een standaardwaarde van 10000 . |
EnableIdempotentie | (Optioneel) Wanneer deze is ingesteld true op , garandeert u dat berichten exact eenmaal en in de oorspronkelijke productievolgorde worden geproduceerd, met een standaardwaarde van false |
MessageTimeoutMs | (Optioneel) De time-out van het lokale bericht, in milliseconden. Deze waarde wordt alleen lokaal afgedwongen en beperkt de tijd die een geproduceerd bericht wacht op een geslaagde bezorging, met een standaardwaarde 300000 . Een tijd van 0 is oneindig. Deze waarde is de maximale tijd die wordt gebruikt voor het bezorgen van een bericht (inclusief nieuwe pogingen). Bezorgingsfout treedt op wanneer het aantal nieuwe pogingen of de time-out van het bericht wordt overschreden. |
RequestTimeoutMs | (Optioneel) De bevestigingstime-out van de uitvoeraanvraag, in milliseconden, met een standaardwaarde van 5000 . |
MaxRetries | (Optioneel) Het aantal keren dat een mislukt bericht opnieuw moet worden verzonden, met een standaardwaarde van 2 . Opnieuw proberen kan leiden tot opnieuw ordenen, tenzij EnableIdempotence is ingesteld op true . |
AuthenticationMode | (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn Gssapi , Plain (standaard), ScramSha256 . ScramSha512 |
Gebruikersnaam | (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
Wachtwoord | (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
Protocol | (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn plaintext (standaard), ssl , . sasl_ssl sasl_plaintext |
SslCaLocation | (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker. |
SslCertificateLocation | (Optioneel) Pad naar het certificaat van de client. |
SslKeyLocation | (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie. |
SslKeyPassword | (Optioneel) Wachtwoord voor het certificaat van de client. |
Aantekeningen
Met de KafkaOutput
aantekening kunt u een functie maken die naar een specifiek onderwerp schrijft. Ondersteunde opties omvatten de volgende elementen:
Element | Description |
---|---|
name | De naam van de variabele die de brokergegevens in functiecode vertegenwoordigt. |
brokerList | (Vereist) De lijst met Kafka-brokers waarnaar de uitvoer wordt verzonden. Zie Verbindingen voor meer informatie. |
onderwerp | (Vereist) Het onderwerp waarnaar de uitvoer wordt verzonden. |
Datatype | Definieert hoe Functions de parameterwaarde verwerkt. De waarde wordt standaard verkregen als een tekenreeks en Functions probeert de tekenreeks te deserialiseren naar een echt normaal oud Java-object (POJO). Wanneer string , wordt de invoer behandeld als alleen een tekenreeks. Wanneer binary , het bericht wordt ontvangen als binaire gegevens en Functions probeert het te deserialiseren naar een werkelijke parametertype byte[]. |
avroSchema | (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol. (Momenteel niet ondersteund voor Java.) |
maxMessageBytes | (Optioneel) De maximale grootte van het uitvoerbericht dat wordt verzonden (in MB), met een standaardwaarde van 1 . |
batchSize | (Optioneel) Maximum aantal berichten dat is gebatcheerd in één berichtenset, met een standaardwaarde van 10000 . |
enableIdempotence | (Optioneel) Wanneer deze is ingesteld true op , garandeert u dat berichten exact eenmaal en in de oorspronkelijke productievolgorde worden geproduceerd, met een standaardwaarde van false |
messageTimeoutMs | (Optioneel) De time-out van het lokale bericht, in milliseconden. Deze waarde wordt alleen lokaal afgedwongen en beperkt de tijd die een geproduceerd bericht wacht op een geslaagde bezorging, met een standaardwaarde 300000 . Een tijd van 0 is oneindig. Dit is de maximale tijd die wordt gebruikt voor het bezorgen van een bericht (inclusief nieuwe pogingen). Bezorgingsfout treedt op wanneer het aantal nieuwe pogingen of de time-out van het bericht wordt overschreden. |
requestTimeoutMs | (Optioneel) De bevestigingstime-out van de uitvoeraanvraag, in milliseconden, met een standaardwaarde van 5000 . |
maxRetries | (Optioneel) Het aantal keren dat een mislukt bericht opnieuw moet worden verzonden, met een standaardwaarde van 2 . Opnieuw proberen kan leiden tot opnieuw ordenen, tenzij EnableIdempotence is ingesteld op true . |
authenticationMode | (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn Gssapi , Plain (standaard), ScramSha256 . ScramSha512 |
gebruikersnaam | (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
password | (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
protocol | (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn plaintext (standaard), ssl , . sasl_ssl sasl_plaintext |
sslCaLocation | (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker. |
sslCertificateLocation | (Optioneel) Pad naar het certificaat van de client. |
sslKeyLocation | (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie. |
sslKeyPassword | (Optioneel) Wachtwoord voor het certificaat van de client. |
Configuratie
In de volgende tabel worden de bindingsconfiguratie-eigenschappen uitgelegd die u in het function.json-bestand hebt ingesteld.
eigenschap function.json | Beschrijving |
---|---|
type | Moet worden ingesteld op kafka . |
direction | Moet worden ingesteld op out . |
name | De naam van de variabele die de brokergegevens in functiecode vertegenwoordigt. |
brokerList | (Vereist) De lijst met Kafka-brokers waarnaar de uitvoer wordt verzonden. Zie Verbindingen voor meer informatie. |
onderwerp | (Vereist) Het onderwerp waarnaar de uitvoer wordt verzonden. |
avroSchema | (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol. |
maxMessageBytes | (Optioneel) De maximale grootte van het uitvoerbericht dat wordt verzonden (in MB), met een standaardwaarde van 1 . |
batchSize | (Optioneel) Maximum aantal berichten dat is gebatcheerd in één berichtenset, met een standaardwaarde van 10000 . |
enableIdempotence | (Optioneel) Wanneer deze is ingesteld true op , garandeert u dat berichten exact eenmaal en in de oorspronkelijke productievolgorde worden geproduceerd, met een standaardwaarde van false |
messageTimeoutMs | (Optioneel) De time-out van het lokale bericht, in milliseconden. Deze waarde wordt alleen lokaal afgedwongen en beperkt de tijd die een geproduceerd bericht wacht op een geslaagde bezorging, met een standaardwaarde 300000 . Een tijd van 0 is oneindig. Dit is de maximale tijd die wordt gebruikt voor het bezorgen van een bericht (inclusief nieuwe pogingen). Bezorgingsfout treedt op wanneer het aantal nieuwe pogingen of de time-out van het bericht wordt overschreden. |
requestTimeoutMs | (Optioneel) De bevestigingstime-out van de uitvoeraanvraag, in milliseconden, met een standaardwaarde van 5000 . |
maxRetries | (Optioneel) Het aantal keren dat een mislukt bericht opnieuw moet worden verzonden, met een standaardwaarde van 2 . Opnieuw proberen kan leiden tot opnieuw ordenen, tenzij EnableIdempotence is ingesteld op true . |
authenticationMode | (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn Gssapi , Plain (standaard), ScramSha256 . ScramSha512 |
gebruikersnaam | (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
password | (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
protocol | (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn plaintext (standaard), ssl , . sasl_ssl sasl_plaintext |
sslCaLocation | (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker. |
sslCertificateLocation | (Optioneel) Pad naar het certificaat van de client. |
sslKeyLocation | (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie. |
sslKeyPassword | (Optioneel) Wachtwoord voor het certificaat van de client. |
Gebruik
Zowel sleutels als waardentypen worden ondersteund met ingebouwde Avro - en Protobuf-serialisatie .
De offset, partitie en tijdstempel voor de gebeurtenis worden tijdens runtime gegenereerd. Alleen waarde en headers kunnen in de functie worden ingesteld. Het onderwerp is ingesteld in de function.json.
Zorg ervoor dat u toegang hebt tot het Kafka-onderwerp waarnaar u wilt schrijven. U configureert de binding met toegangs- en verbindingsreferenties voor het Kafka-onderwerp.
In een Premium-abonnement moet u runtimeschaalbewaking inschakelen voor de Kafka-uitvoer om uit te kunnen schalen naar meerdere exemplaren. Zie Schalen van runtime inschakelen voor meer informatie.
Zie host.json instellingen voor een volledige set ondersteunde host.json instellingen voor de Kafka-trigger.
Connecties
Alle verbindingsgegevens die vereist zijn voor uw triggers en bindingen, moeten worden onderhouden in toepassingsinstellingen en niet in de bindingsdefinities in uw code. Dit geldt voor referenties, die nooit in uw code moeten worden opgeslagen.
Belangrijk
Referentie-instellingen moeten verwijzen naar een toepassingsinstelling. Gebruik geen codereferenties in uw code- of configuratiebestanden. Wanneer u lokaal werkt, gebruikt u het local.settings.json-bestand voor uw referenties en publiceert u het local.settings.json bestand niet.
Wanneer u verbinding maakt met een beheerd Kafka-cluster dat wordt geleverd door Confluent in Azure, moet u ervoor zorgen dat de volgende verificatiereferenties voor uw Confluent Cloud-omgeving zijn ingesteld in uw trigger of binding:
Instelling | Aanbevolen waarde | Beschrijving |
---|---|---|
BrokerList | BootstrapServer |
De app-instelling met de naam BootstrapServer bevat de waarde van de bootstrap-server die is gevonden op de pagina met Confluent Cloud-instellingen. De waarde lijkt op xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 . |
Gebruikersnaam | ConfluentCloudUsername |
App-instelling met de naam ConfluentCloudUsername bevat de API-toegangssleutel van de Confluent Cloud-website. |
Wachtwoord | ConfluentCloudPassword |
App-instelling met de naam ConfluentCloudPassword bevat het API-geheim dat is verkregen van de Confluent Cloud-website. |
De tekenreekswaarden die u voor deze instellingen gebruikt, moeten aanwezig zijn als toepassingsinstellingen in Azure of in de Values
verzameling in het local.settings.json-bestand tijdens lokale ontwikkeling.
U moet ook de Protocol
, AuthenticationMode
en SslCaLocation
in uw bindingsdefinities instellen.