Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Powiązanie wyjściowe umożliwia aplikacji usługi Azure Functions zapisywanie komunikatów w temacie platformy Kafka.
Ważne
Powiązania platformy Kafka są dostępne tylko dla funkcji w ramach planu Elastic Premium i dedykowanego (App Service). Są one obsługiwane tylko w wersji 3.x i nowszej środowiska uruchomieniowego usługi Functions.
Przykład
Użycie powiązania zależy od modalności języka C# używanej w aplikacji funkcji, co może być jednym z następujących elementów:
Izolowana biblioteka klas procesów roboczych skompilowana funkcja języka C# jest uruchamiana w procesie odizolowanym od środowiska uruchomieniowego.
Używane atrybuty zależą od określonego dostawcy zdarzeń.
Poniższy przykład ma niestandardowy typ zwracany, czyli MultipleOutputType
, który składa się z odpowiedzi HTTP i danych wyjściowych platformy Kafka.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
W klasie MultipleOutputType
Kevent
jest zmienną powiązania wyjściowego dla powiązania platformy Kafka.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Aby wysłać partię zdarzeń, przekaż tablicę ciągów do typu danych wyjściowych, jak pokazano w poniższym przykładzie:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
Tablica ciągów jest definiowana jako Kevents
właściwość klasy, na której zdefiniowano powiązanie wyjściowe:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Następująca funkcja dodaje nagłówki do danych wyjściowych platformy Kafka:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
Pełny zestaw działających przykładów platformy .NET można znaleźć w repozytorium rozszerzeń platformy Kafka.
Uwaga
Aby zapoznać się z równoważnym zestawem przykładów języka TypeScript, zobacz repozytorium rozszerzeń platformy Kafka
Określone właściwości pliku function.json zależą od dostawcy zdarzeń, który w tych przykładach to Confluent lub Azure Event Hubs. W poniższych przykładach pokazano powiązanie danych wyjściowych platformy Kafka dla funkcji wyzwalanej przez żądanie HTTP i wysyłanie danych z żądania do tematu platformy Kafka.
W poniższych function.json zdefiniowano wyzwalacz dla określonego dostawcy w następujących przykładach:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
Poniższy kod wysyła następnie komunikat do tematu:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message);
context.bindings.outputKafkaMessage = message;
context.res = {
// status: 200, /* Defaults to 200 */
body: 'Ok'
};
}
Poniższy kod wysyła wiele komunikatów jako tablicę do tego samego tematu:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
context.bindings.outputKafkaMessages = ["one", "two"];
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
W poniższym przykładzie pokazano, jak wysłać komunikat zdarzenia z nagłówkami do tego samego tematu platformy Kafka:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = message
? "Message received: " + message + ". The message transfered to the kafka broker."
: "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
Pełny zestaw działających przykładów języka JavaScript można znaleźć w repozytorium rozszerzeń platformy Kafka.
Określone właściwości pliku function.json zależą od dostawcy zdarzeń, który w tych przykładach to Confluent lub Azure Event Hubs. W poniższych przykładach pokazano powiązanie danych wyjściowych platformy Kafka dla funkcji wyzwalanej przez żądanie HTTP i wysyłanie danych z żądania do tematu platformy Kafka.
W poniższych function.json zdefiniowano wyzwalacz dla określonego dostawcy w następujących przykładach:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
Poniższy kod wysyła następnie komunikat do tematu:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
Poniższy kod wysyła wiele komunikatów jako tablicę do tego samego tematu:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
W poniższym przykładzie pokazano, jak wysłać komunikat zdarzenia z nagłówkami do tego samego tematu platformy Kafka:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
Pełny zestaw działających przykładów programu PowerShell można znaleźć w repozytorium rozszerzeń platformy Kafka.
Określone właściwości pliku function.json zależą od dostawcy zdarzeń, który w tych przykładach to Confluent lub Azure Event Hubs. W poniższych przykładach pokazano powiązanie danych wyjściowych platformy Kafka dla funkcji wyzwalanej przez żądanie HTTP i wysyłanie danych z żądania do tematu platformy Kafka.
W poniższych function.json zdefiniowano wyzwalacz dla określonego dostawcy w następujących przykładach:
{
"scriptFile": "main.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"direction": "out",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
Poniższy kod wysyła następnie komunikat do tematu:
import logging
import azure.functions as func
def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
Poniższy kod wysyła wiele komunikatów jako tablicę do tego samego tematu:
import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json
def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
outputMessage.set(['one', 'two'])
return 'OK'
W poniższym przykładzie pokazano, jak wysłać komunikat zdarzenia z nagłówkami do tego samego tematu platformy Kafka:
import logging
import azure.functions as func
import json
def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
Pełny zestaw działających przykładów języka Python można znaleźć w repozytorium rozszerzeń platformy Kafka.
Adnotacje używane do konfigurowania powiązania wyjściowego zależą od określonego dostawcy zdarzeń.
Poniższa funkcja wysyła komunikat do tematu platformy Kafka.
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
W poniższym przykładzie pokazano, jak wysyłać wiele komunikatów do tematu platformy Kafka.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
W tym przykładzie parametr powiązania wyjściowego został zmieniony na tablicę ciągów.
W ostatnim przykładzie użyto następujących KafkaEntity
klas i KafkaHeader
:
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
Poniższa przykładowa funkcja wysyła komunikat z nagłówkami do tematu platformy Kafka.
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
Pełny zestaw działających przykładów języka Java dla platformy Confluent można znaleźć w repozytorium rozszerzeń platformy Kafka.
Atrybuty
Biblioteki języka C# procesu roboczego w procesie przetwarzania procesów przetwarzania procesów procesów przetwarzania w procesie przetwarzania izolowanego używają atrybutu Kafka
do zdefiniowania wyzwalacza funkcji.
W poniższej tabeli opisano właściwości, które można ustawić przy użyciu tego atrybutu:
Parametr | Opis |
---|---|
Lista brokerów | (Wymagane) Lista brokerów platformy Kafka, do których są wysyłane dane wyjściowe. Aby uzyskać więcej informacji, zobacz Połączenia . |
Temat | (Wymagane) Temat, do którego są wysyłane dane wyjściowe. |
AvroSchema | (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro. |
MaxMessageBytes | (Opcjonalnie) Maksymalny rozmiar wysyłanego komunikatu wyjściowego (w MB) z wartością 1 domyślną . |
BatchSize | (Opcjonalnie) Maksymalna liczba komunikatów wsadowych w jednym zestawie komunikatów z wartością 10000 domyślną . |
EnableIdempotence | (Opcjonalnie) W przypadku ustawienia opcji true gwarantuje, że komunikaty są generowane dokładnie raz i w oryginalnej kolejności produkcji z wartością domyślną false |
MessageTimeoutMs | (Opcjonalnie) Limit czasu komunikatu lokalnego( w milisekundach). Ta wartość jest wymuszana tylko lokalnie i ogranicza czas oczekiwania wygenerowanego komunikatu na pomyślne dostarczenie z domyślną wartością 300000 . Czas 0 jest nieskończony. Ta wartość to maksymalny czas używany do dostarczenia komunikatu (w tym ponownych prób). Błąd dostarczania występuje, gdy liczba ponownych prób lub przekroczenie limitu czasu komunikatu. |
RequestTimeoutMs | (Opcjonalnie) Limit czasu potwierdzenia żądania wyjściowego (w milisekundach) z wartością domyślną 5000 . |
Maksymalna liczba ponownych prób | (Opcjonalnie) Liczba ponownych prób wysłania komunikatu zakończonego niepowodzeniem z wartością domyślną 2 . Ponawianie próby może spowodować zmiana kolejności, chyba że EnableIdempotence ustawiono wartość true . |
AuthenticationMode | (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to Gssapi , Plain (wartość domyślna), ScramSha256 , ScramSha512 . |
Nazwa użytkownika | (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi . Aby uzyskać więcej informacji, zobacz Połączenia . |
Hasło | (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi . Aby uzyskać więcej informacji, zobacz Połączenia . |
Protokół | (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to plaintext (wartość domyślna), ssl , sasl_plaintext , sasl_ssl . |
SslCaLocation | (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera. |
SslCertificateLocation | (Opcjonalnie) Ścieżka do certyfikatu klienta. |
SslKeyLocation | (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania. |
SslKeyPassword | (Opcjonalnie) Hasło do certyfikatu klienta. |
Adnotacje
Adnotacja KafkaOutput
umożliwia utworzenie funkcji, która zapisuje w określonym temacie. Obsługiwane opcje obejmują następujące elementy:
Element | opis |
---|---|
name | Nazwa zmiennej reprezentującej dane obsługiwane przez brokera w kodzie funkcji. |
brokerList | (Wymagane) Lista brokerów platformy Kafka, do których są wysyłane dane wyjściowe. Aby uzyskać więcej informacji, zobacz Połączenia . |
topic | (Wymagane) Temat, do którego są wysyłane dane wyjściowe. |
Datatype | Definiuje sposób obsługi wartości parametru przez funkcję Functions. Domyślnie wartość jest uzyskiwana jako ciąg, a usługa Functions próbuje wykonać deserializacji ciągu do rzeczywistego zwykłego obiektu Java (POJO). Gdy string element wejściowy jest traktowany jako tylko ciąg. Gdy binary komunikat zostanie odebrany jako dane binarne, a usługa Functions próbuje wykonać deserializacji go do rzeczywistego bajtu typu parametru[]. |
avroSchema | (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro. (Obecnie nieobsługiwane dla języka Java). |
maxMessageBytes | (Opcjonalnie) Maksymalny rozmiar wysyłanego komunikatu wyjściowego (w MB) z wartością 1 domyślną . |
batchSize | (Opcjonalnie) Maksymalna liczba komunikatów wsadowych w jednym zestawie komunikatów z wartością 10000 domyślną . |
enableIdempotence | (Opcjonalnie) W przypadku ustawienia opcji true gwarantuje, że komunikaty są generowane dokładnie raz i w oryginalnej kolejności produkcji z wartością domyślną false |
messageTimeoutMs | (Opcjonalnie) Limit czasu komunikatu lokalnego( w milisekundach). Ta wartość jest wymuszana tylko lokalnie i ogranicza czas oczekiwania wygenerowanego komunikatu na pomyślne dostarczenie z domyślną wartością 300000 . Czas 0 jest nieskończony. Jest to maksymalny czas używany do dostarczania komunikatu (w tym ponownych prób). Błąd dostarczania występuje, gdy liczba ponownych prób lub przekroczenie limitu czasu komunikatu. |
requestTimeoutMs | (Opcjonalnie) Limit czasu potwierdzenia żądania wyjściowego (w milisekundach) z wartością domyślną 5000 . |
maxRetries | (Opcjonalnie) Liczba ponownych prób wysłania komunikatu zakończonego niepowodzeniem z wartością domyślną 2 . Ponawianie próby może spowodować zmiana kolejności, chyba że EnableIdempotence ustawiono wartość true . |
authenticationMode | (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to Gssapi , Plain (wartość domyślna), ScramSha256 , ScramSha512 . |
nazwa użytkownika | (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi . Aby uzyskać więcej informacji, zobacz Połączenia . |
hasło | (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi . Aby uzyskać więcej informacji, zobacz Połączenia . |
protokół | (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to plaintext (wartość domyślna), ssl , sasl_plaintext , sasl_ssl . |
sslCaLocation | (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera. |
sslCertificateLocation | (Opcjonalnie) Ścieżka do certyfikatu klienta. |
sslKeyLocation | (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania. |
sslKeyPassword | (Opcjonalnie) Hasło do certyfikatu klienta. |
Konfigurowanie
W poniższej tabeli opisano właściwości konfiguracji powiązania ustawione w pliku function.json .
właściwość function.json | opis |
---|---|
type | Musi być ustawiona wartość kafka . |
direction | Musi być ustawiona wartość out . |
name | Nazwa zmiennej reprezentującej dane obsługiwane przez brokera w kodzie funkcji. |
brokerList | (Wymagane) Lista brokerów platformy Kafka, do których są wysyłane dane wyjściowe. Aby uzyskać więcej informacji, zobacz Połączenia . |
topic | (Wymagane) Temat, do którego są wysyłane dane wyjściowe. |
avroSchema | (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro. |
maxMessageBytes | (Opcjonalnie) Maksymalny rozmiar wysyłanego komunikatu wyjściowego (w MB) z wartością 1 domyślną . |
batchSize | (Opcjonalnie) Maksymalna liczba komunikatów wsadowych w jednym zestawie komunikatów z wartością 10000 domyślną . |
enableIdempotence | (Opcjonalnie) W przypadku ustawienia opcji true gwarantuje, że komunikaty są generowane dokładnie raz i w oryginalnej kolejności produkcji z wartością domyślną false |
messageTimeoutMs | (Opcjonalnie) Limit czasu komunikatu lokalnego( w milisekundach). Ta wartość jest wymuszana tylko lokalnie i ogranicza czas oczekiwania wygenerowanego komunikatu na pomyślne dostarczenie z domyślną wartością 300000 . Czas 0 jest nieskończony. Jest to maksymalny czas używany do dostarczania komunikatu (w tym ponownych prób). Błąd dostarczania występuje, gdy liczba ponownych prób lub przekroczenie limitu czasu komunikatu. |
requestTimeoutMs | (Opcjonalnie) Limit czasu potwierdzenia żądania wyjściowego (w milisekundach) z wartością domyślną 5000 . |
maxRetries | (Opcjonalnie) Liczba ponownych prób wysłania komunikatu zakończonego niepowodzeniem z wartością domyślną 2 . Ponawianie próby może spowodować zmiana kolejności, chyba że EnableIdempotence ustawiono wartość true . |
authenticationMode | (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to Gssapi , Plain (wartość domyślna), ScramSha256 , ScramSha512 . |
nazwa użytkownika | (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi . Aby uzyskać więcej informacji, zobacz Połączenia . |
hasło | (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi . Aby uzyskać więcej informacji, zobacz Połączenia . |
protokół | (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to plaintext (wartość domyślna), ssl , sasl_plaintext , sasl_ssl . |
sslCaLocation | (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera. |
sslCertificateLocation | (Opcjonalnie) Ścieżka do certyfikatu klienta. |
sslKeyLocation | (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania. |
sslKeyPassword | (Opcjonalnie) Hasło do certyfikatu klienta. |
Użycie
Zarówno klucze, jak i typy wartości są obsługiwane z wbudowaną serializacji Avro i Protobuf .
Przesunięcie, partycja i sygnatura czasowa zdarzenia są generowane w czasie wykonywania. W funkcji można ustawić tylko wartości i nagłówki. Temat jest ustawiony w function.json.
Upewnij się, że masz dostęp do tematu platformy Kafka, do którego próbujesz napisać. Powiązanie należy skonfigurować przy użyciu poświadczeń dostępu i połączenia do tematu platformy Kafka.
W planie Premium należy włączyć monitorowanie skalowania w czasie wykonywania dla danych wyjściowych platformy Kafka, aby móc skalować w poziomie do wielu wystąpień. Aby dowiedzieć się więcej, zobacz Włączanie skalowania środowiska uruchomieniowego.
Aby uzyskać pełny zestaw obsługiwanych ustawień host.json wyzwalacza platformy Kafka, zobacz host.json ustawienia.
Połączenia
Wszystkie informacje o połączeniu wymagane przez wyzwalacze i powiązania powinny być przechowywane w ustawieniach aplikacji, a nie w definicjach powiązań w kodzie. Dotyczy to poświadczeń, które nigdy nie powinny być przechowywane w kodzie.
Ważne
Ustawienia poświadczeń muszą odwoływać się do ustawienia aplikacji. Nie należy zapisywać poświadczeń w kodzie ani plikach konfiguracji. W przypadku uruchamiania lokalnego użyj pliku local.settings.json dla poświadczeń i nie publikuj pliku local.settings.json.
Podczas nawiązywania połączenia z zarządzanym klastrem Platformy Kafka udostępnianym przez platformę Confluent na platformie Azure upewnij się, że następujące poświadczenia uwierzytelniania dla środowiska platformy Confluent Cloud zostały ustawione w wyzwalaczu lub powiązaniu:
Ustawienie | Zalecana wartość | opis |
---|---|---|
Lista brokerów | BootstrapServer |
Ustawienie aplikacji o nazwie BootstrapServer zawiera wartość serwera bootstrap znalezionego na stronie ustawień chmury Confluent. Wartość przypomina xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 . |
Nazwa użytkownika | ConfluentCloudUsername |
Ustawienie aplikacji o nazwie ConfluentCloudUsername zawiera klucz dostępu interfejsu API z witryny internetowej Confluent Cloud. |
Hasło | ConfluentCloudPassword |
Ustawienie aplikacji o nazwie ConfluentCloudPassword zawiera wpis tajny interfejsu API uzyskany z witryny internetowej platformy Confluent Cloud. |
Wartości ciągu używane dla tych ustawień muszą być obecne jako ustawienia aplikacji na platformie Azure lub w Values
kolekcji w pliku local.settings.json podczas programowania lokalnego.
Należy również ustawić Protocol
definicje powiązań , AuthenticationMode
i SslCaLocation
.