Apache Kafka kimeneti kötés az Azure Functionshez
A kimeneti kötés lehetővé teszi, hogy az Azure Functions-alkalmazások üzeneteket írjanak egy Kafka-témakörbe.
Fontos
A Kafka-kötések csak az Elastic Premium Csomag és a Dedikált (App Service) csomaghoz tartozó Functions esetében érhetők el. Ezek csak a Functions-futtatókörnyezet 3.x és újabb verziójában támogatottak.
Példa
A kötés használata a függvényalkalmazásban használt C#-modalitástól függ, amely az alábbiak egyike lehet:
A C# függvényt lefordított izolált feldolgozói folyamatosztály-kódtár a futtatókörnyezettől elkülönített folyamatban fut.
A használt attribútumok az adott eseményszolgáltatótól függenek.
Az alábbi példa egy egyéni visszatérési típussal rendelkezik MultipleOutputType
, amely EGY HTTP-válaszból és egy Kafka-kimenetből áll.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
Az osztályban MultipleOutputType
Kevent
a Kafka-kötés kimeneti kötési változója.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Eseményköteg küldéséhez adjon át egy sztringtömböt a kimeneti típusnak, ahogyan az a következő példában látható:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
A sztringtömb az osztály tulajdonságaként Kevents
van definiálva, amelyen a kimeneti kötés definiálva van:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
A következő függvény fejléceket ad hozzá a Kafka kimeneti adataihoz:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
A működő .NET-példák teljes készletéért tekintse meg a Kafka-bővítménytárat.
Feljegyzés
A TypeScript-példák egyenértékű készletét lásd a Kafka-bővítménytárban
A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Olyan függvény Kafka kimeneti kötését mutatják be, amelyet egy HTTP-kérés aktivál, és adatokat küld a kérelemből a Kafka-témakörnek.
Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit az alábbi példákban:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
A következő kód ezután üzenetet küld a témakörnek:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message);
context.bindings.outputKafkaMessage = message;
context.res = {
// status: 200, /* Defaults to 200 */
body: 'Ok'
};
}
A következő kód több üzenetet küld tömbként ugyanarra a témakörre:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
context.bindings.outputKafkaMessages = ["one", "two"];
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
Az alábbi példa bemutatja, hogyan küldhet egy élőfejet tartalmazó eseményüzenetet ugyanarra a Kafka-témakörre:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = message
? "Message received: " + message + ". The message transfered to the kafka broker."
: "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
A működő JavaScript-példák teljes készletét a Kafka-bővítménytárban tekinti meg.
A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Olyan függvény Kafka kimeneti kötését mutatják be, amelyet egy HTTP-kérés aktivál, és adatokat küld a kérelemből a Kafka-témakörnek.
Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit az alábbi példákban:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
A következő kód ezután üzenetet küld a témakörnek:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
A következő kód több üzenetet küld tömbként ugyanarra a témakörre:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
Az alábbi példa bemutatja, hogyan küldhet egy élőfejet tartalmazó eseményüzenetet ugyanarra a Kafka-témakörre:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
A működő PowerShell-példák teljes halmazát a Kafka-bővítménytárban tekinti meg.
A function.json fájl konkrét tulajdonságai az eseményszolgáltatótól függenek, amelyek ezekben a példákban a Confluent vagy az Azure Event Hubs. Az alábbi példák egy Olyan függvény Kafka kimeneti kötését mutatják be, amelyet egy HTTP-kérés aktivál, és adatokat küld a kérelemből a Kafka-témakörnek.
Az alábbi function.json határozza meg az adott szolgáltató eseményindítóit az alábbi példákban:
{
"scriptFile": "main.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"direction": "out",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
A következő kód ezután üzenetet küld a témakörnek:
import logging
import azure.functions as func
def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
A következő kód több üzenetet küld tömbként ugyanarra a témakörre:
import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json
def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
outputMessage.set(['one', 'two'])
return 'OK'
Az alábbi példa bemutatja, hogyan küldhet egy élőfejet tartalmazó eseményüzenetet ugyanarra a Kafka-témakörre:
import logging
import azure.functions as func
import json
def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
A működő Python-példák teljes halmazát a Kafka-bővítménytárban tekinti meg.
A kimeneti kötés konfigurálásához használt széljegyzetek az adott eseményszolgáltatótól függenek.
Az alábbi függvény üzenetet küld a Kafka-témakörnek.
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
Az alábbi példa bemutatja, hogyan küldhet több üzenetet egy Kafka-témakörnek.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
Ebben a példában a kimeneti kötés paramétere sztringtömbre változik.
Az utolsó példa ezeket KafkaEntity
és KafkaHeader
osztályokat használja:
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
Az alábbi példafüggvény fejléceket tartalmazó üzenetet küld egy Kafka-témakörnek.
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
A Confluenthez készült működő Java-példák teljes készletét a Kafka-bővítménytárban tekinti meg.
Attribútumok
A folyamaton belüli és az izolált feldolgozói folyamat C# kódtárai az Kafka
attribútummal határozzák meg a függvény-eseményindítót.
Az alábbi táblázat az attribútum használatával beállítható tulajdonságokat ismerteti:
Paraméter | Leírás |
---|---|
BrokerList | (Kötelező) Azon Kafka-közvetítők listája, amelyeknek a kimenetet elküldik. További információért tekintse meg a Csatlakozás. |
Téma | (Kötelező) Az a témakör, amelyre a kimenetet elküldik. |
AvroSchema | (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor. |
MaxMessageBytes | (Nem kötelező) Az elküldött kimeneti üzenet maximális mérete (MB-ban), alapértelmezett értékével 1 . |
BatchSize | (Nem kötelező) Egyetlen üzenetkészletben kötegelve lévő üzenetek maximális száma, amelynek alapértelmezett értéke .10000 |
EnableIdempotence | (Nem kötelező) Ha be van true állítva, garantálja, hogy az üzeneteket pontosan egyszer és az eredeti előállítási sorrendben, az alapértelmezett érték false |
MessageTimeoutMs | (Nem kötelező) A helyi üzenet időtúllépése ezredmásodpercben. Ezt az értéket csak helyileg kényszeríti ki a rendszer, és korlátozza a létrehozott üzenet sikeres kézbesítésre való várakozási idejét, alapértelmezett 300000 beállítással. Az idő 0 végtelen. Ez az érték az üzenet kézbesítéséhez használt maximális idő (beleértve az újrapróbálkozásokat is). Kézbesítési hiba akkor fordul elő, ha túllépi az újrapróbálkozások számát vagy az üzenet időtúllépését. |
RequestTimeoutMs | (Nem kötelező) A kimeneti kérelem nyugtázási időtúllépése ezredmásodpercben, alapértelmezés szerint 5000 a . |
MaxRetries | (Nem kötelező) A sikertelen üzenetek küldésének újrapróbálkozásának száma az alapértelmezett beállítással 2 . Az újrapróbálkozás átrendezést okozhat, hacsak nincs EnableIdempotence beállítva true . |
AuthenticationMode | (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi : , Plain (alapértelmezett), ScramSha256 . ScramSha512 |
Felhasználónév | (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi . További információért tekintse meg a Csatlakozás. |
Jelszó | (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi . További információért tekintse meg a Csatlakozás. |
Protokoll | (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl , sasl_plaintext . sasl_ssl |
SslCaLocation | (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez. |
SslCertificateLocation | (Nem kötelező) Az ügyfél tanúsítványának elérési útja. |
SslKeyLocation | (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja. |
SslKeyPassword | (Nem kötelező) Az ügyfél tanúsítványának jelszava. |
Jegyzetek
A KafkaOutput
jegyzetekkel létrehozhat egy függvényt, amely egy adott témakörbe ír. A támogatott beállítások a következő elemeket tartalmazzák:
Elem | Leírás |
---|---|
név | Annak a változónak a neve, amely a függvénykódban a közvetített adatokat jelöli. |
brokerList | (Kötelező) Azon Kafka-közvetítők listája, amelyeknek a kimenetet elküldik. További információért tekintse meg a Csatlakozás. |
témakör | (Kötelező) Az a témakör, amelyre a kimenetet elküldik. |
Adattípus | Meghatározza, hogy a Functions hogyan kezeli a paraméter értékét. Alapértelmezés szerint az érték sztringként lesz lekérve, és a Functions megpróbálja deszerializálni a sztringet a tényleges egyszerű régi Java-objektumra (POJO). Amikor string a bemenetet csak sztringként kezeli a rendszer. Amikor binary az üzenet bináris adatként érkezik, és a Functions megpróbálja deszerializálni azt egy tényleges paramétertípus-bájtra[]. |
avroSchema | (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor. |
maxMessageBytes | (Nem kötelező) Az elküldött kimeneti üzenet maximális mérete (MB-ban), alapértelmezett értékével 1 . |
batchSize | (Nem kötelező) Egyetlen üzenetkészletben kötegelve lévő üzenetek maximális száma, amelynek alapértelmezett értéke .10000 |
enableIdempotence | (Nem kötelező) Ha be van true állítva, garantálja, hogy az üzeneteket pontosan egyszer és az eredeti előállítási sorrendben, az alapértelmezett érték false |
messageTimeoutMs | (Nem kötelező) A helyi üzenet időtúllépése ezredmásodpercben. Ezt az értéket csak helyileg kényszeríti ki a rendszer, és korlátozza a létrehozott üzenet sikeres kézbesítésre való várakozási idejét, alapértelmezett 300000 beállítással. Az idő 0 végtelen. Ez az üzenet kézbesítésének maximális időtartama (beleértve az újrapróbálkozásokat is). Kézbesítési hiba akkor fordul elő, ha túllépi az újrapróbálkozások számát vagy az üzenet időtúllépését. |
requestTimeoutMs | (Nem kötelező) A kimeneti kérelem nyugtázási időtúllépése ezredmásodpercben, alapértelmezés szerint 5000 a . |
maxRetries | (Nem kötelező) A sikertelen üzenetek küldésének újrapróbálkozásának száma az alapértelmezett beállítással 2 . Az újrapróbálkozás átrendezést okozhat, hacsak nincs EnableIdempotence beállítva true . |
authenticationMode | (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi : , Plain (alapértelmezett), ScramSha256 . ScramSha512 |
Felhasználónév | (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi . További információért tekintse meg a Csatlakozás. |
Jelszó | (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi . További információért tekintse meg a Csatlakozás. |
Protokoll | (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl , sasl_plaintext . sasl_ssl |
sslCaLocation | (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez. |
sslCertificateLocation | (Nem kötelező) Az ügyfél tanúsítványának elérési útja. |
sslKeyLocation | (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja. |
sslKeyPassword | (Nem kötelező) Az ügyfél tanúsítványának jelszava. |
Konfiguráció
Az alábbi táblázat a function.json fájlban beállított kötéskonfigurációs tulajdonságokat ismerteti.
function.json tulajdonság | Leírás |
---|---|
type | A beállításnak a kafka következőnek kell lennie: . |
direction | A beállításnak a out következőnek kell lennie: . |
név | Annak a változónak a neve, amely a függvénykódban a közvetített adatokat jelöli. |
brokerList | (Kötelező) Azon Kafka-közvetítők listája, amelyeknek a kimenetet elküldik. További információért tekintse meg a Csatlakozás. |
témakör | (Kötelező) Az a témakör, amelyre a kimenetet elküldik. |
avroSchema | (Nem kötelező) Általános rekord sémája az Avro protokoll használatakor. |
maxMessageBytes | (Nem kötelező) Az elküldött kimeneti üzenet maximális mérete (MB-ban), alapértelmezett értékével 1 . |
batchSize | (Nem kötelező) Egyetlen üzenetkészletben kötegelve lévő üzenetek maximális száma, amelynek alapértelmezett értéke .10000 |
enableIdempotence | (Nem kötelező) Ha be van true állítva, garantálja, hogy az üzeneteket pontosan egyszer és az eredeti előállítási sorrendben, az alapértelmezett érték false |
messageTimeoutMs | (Nem kötelező) A helyi üzenet időtúllépése ezredmásodpercben. Ezt az értéket csak helyileg kényszeríti ki a rendszer, és korlátozza a létrehozott üzenet sikeres kézbesítésre való várakozási idejét, alapértelmezett 300000 beállítással. Az idő 0 végtelen. Ez az üzenet kézbesítésének maximális időtartama (beleértve az újrapróbálkozásokat is). Kézbesítési hiba akkor fordul elő, ha túllépi az újrapróbálkozások számát vagy az üzenet időtúllépését. |
requestTimeoutMs | (Nem kötelező) A kimeneti kérelem nyugtázási időtúllépése ezredmásodpercben, alapértelmezés szerint 5000 a . |
maxRetries | (Nem kötelező) A sikertelen üzenetek küldésének újrapróbálkozásának száma az alapértelmezett beállítással 2 . Az újrapróbálkozás átrendezést okozhat, hacsak nincs EnableIdempotence beállítva true . |
authenticationMode | (Nem kötelező) A hitelesítési mód az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítés használatakor. A támogatott értékek a következőkGssapi : , Plain (alapértelmezett), ScramSha256 . ScramSha512 |
Felhasználónév | (Nem kötelező) A SASL-hitelesítés felhasználóneve. Nem támogatott, ha AuthenticationMode az Gssapi . További információért tekintse meg a Csatlakozás. |
Jelszó | (Nem kötelező) Az SASL-hitelesítés jelszava. Nem támogatott, ha AuthenticationMode az Gssapi . További információért tekintse meg a Csatlakozás. |
Protokoll | (Nem kötelező) A közvetítőkkel való kommunikációhoz használt biztonsági protokoll. A támogatott értékek a következők plaintext : (alapértelmezett), ssl , sasl_plaintext . sasl_ssl |
sslCaLocation | (Nem kötelező) A hitelesítésszolgáltató tanúsítványfájljának elérési útja a közvetítő tanúsítványának ellenőrzéséhez. |
sslCertificateLocation | (Nem kötelező) Az ügyfél tanúsítványának elérési útja. |
sslKeyLocation | (Nem kötelező) A hitelesítéshez használt ügyfél titkos kulcsának (PEM) elérési útja. |
sslKeyPassword | (Nem kötelező) Az ügyfél tanúsítványának jelszava. |
Használat
A kulcsok és az értéktípusok beépített Avro- és Protobuf-szerializálással is támogatottak.
Az esemény eltolása, partíciója és időbélyege futásidőben jön létre. Csak érték és fejlécek állíthatók be a függvényen belül. A témakör a function.json van beállítva.
Győződjön meg arról, hogy rendelkezik hozzáféréssel ahhoz a Kafka-témakörhöz, amelyhez írni próbál. A kötést hozzáférési és kapcsolati hitelesítő adatokkal konfigurálja a Kafka-témakörhöz.
Prémium csomag esetén engedélyeznie kell a futtatókörnyezeti skálázás figyelését ahhoz, hogy a Kafka-kimenet több példányra is felskálázható legyen. További információ: Futtatókörnyezeti skálázás engedélyezése.
A Kafka-eseményindító támogatott host.json beállításainak teljes halmazát host.json beállítások között találhatja meg.
Kapcsolatok
Az eseményindítók és kötések által igényelt kapcsolati információkat az alkalmazásbeállításokban, és nem a kód kötésdefinícióiban kell tartani. Ez igaz a hitelesítő adatokra, amelyeket soha nem szabad a kódban tárolni.
Fontos
A hitelesítő adatok beállításainak egy alkalmazásbeállításra kell hivatkoznia. A kódban vagy a konfigurációs fájlokban ne írja be a hitelesítő adatokat. Helyi futtatáskor használja a local.settings.json fájlt a hitelesítő adataihoz, és ne tegye közzé a local.settings.json fájlt.
Amikor az Azure-ban a Confluent által biztosított felügyelt Kafka-fürthöz csatlakozik, győződjön meg arról, hogy a Confluent Cloud-környezet következő hitelesítési hitelesítő adatai be vannak állítva az eseményindítóban vagy kötésben:
Beállítás | Javasolt érték | Leírás |
---|---|---|
BrokerList | BootstrapServer |
Az elnevezett BootstrapServer alkalmazásbeállítás a Confluent Cloud beállításai lapon található bootstrap-kiszolgáló értékét tartalmazza. Az érték a következőhöz xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 hasonló: . |
Felhasználónév | ConfluentCloudUsername |
A névvel ellátott ConfluentCloudUsername alkalmazásbeállítás tartalmazza a Confluent Cloud webhely api-hozzáférési kulcsát. |
Jelszó | ConfluentCloudPassword |
A névvel ellátott ConfluentCloudPassword alkalmazásbeállítás tartalmazza a Confluent Cloud webhelyről beszerzett API-titkos kódot. |
Az ezekhez a beállításokhoz használt sztringértékeknek alkalmazásbeállításokként kell szerepelniük az Azure-ban vagy a Values
local.settings.json fájl gyűjteményében a helyi fejlesztés során.
A , AuthenticationMode
és SslCaLocation
a Protocol
kötésdefiníciókat is be kell állítania.