Триггер Apache Kafka для Функций Azure
Триггер Apache Kafka можно использовать в Функциях Azure для запуска кода функции в ответ на сообщения в разделах Kafka. Вы также можете использовать выходную привязку Kafka для записи из функции в раздел. Сведения о настройке и конфигурации см. в статье Обзор привязок Apache Kafka для Функций Azure.
Внимание
Привязки Kafka доступны только для Функций в составе эластичного плана "Премиум" и плана "Выделенный (Служба приложений)". Они поддерживаются только в среде выполнения Функций версии 3.x и выше.
Пример
Использование триггера зависит от модальности C#, используемой в приложении-функции. Это может быть один из следующих вариантов:
Изолированная библиотека классов рабочих процессов, скомпилированная функция C# выполняется в процессе, изолированном от среды выполнения.
Используемые атрибуты зависят от конкретного поставщика событий.
В следующем примере показана функция C#, которая считывает и регистрирует сообщение Kafka как событие Kafka.
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
Чтобы получить события в пакете, используйте в качестве входных данных массив строк, как показано в следующем примере:
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
Следующая функция регистрирует сообщение и заголовки для события Kafka.
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
Полный набор рабочих примеров .NET см. в репозитории расширений Kafka.
Примечание.
Эквивалентный набор примеров TypeScript см. в репозитории расширений Kafka
Конкретные свойства файла function.json зависят от поставщика событий. В данных примерах это Confluent или Центры событий Azure. В следующих примерах показан триггер Kafka для функции, которая считывает и регистрирует сообщение Kafka.
Следующий файл function.json определяет триггер для конкретного поставщика.
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "saslSsl",
"authenticationMode": "plain",
"consumerGroup" : "$Default",
"dataType": "string"
}
]
}
Затем при активации функции выполняется следующий код:
module.exports = async function (context, event) {
// context.log.info(event)
context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};
Чтобы получить события в пакете, задайте для параметра cardinality
в файле function.json значение many
, как показано в приведенных ниже примерах.
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "event",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%"
}
]
}
Затем следующий код анализирует массив событий и регистрирует данные события:
module.exports = async function (context, events) {
function print(event) {
var eventJson = JSON.parse(event)
context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
}
events.map(print);
};
Следующий код также регистрирует данные заголовка:
module.exports = async function (context, event) {
function print(kevent) {
var keventJson = JSON.parse(kevent)
context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
context.log.info(`Headers for this message:`)
let headers = keventJson.Headers;
headers.forEach(element => {
context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`)
});
}
event.map(print);
};
Вы можете определить универсальную схему Avro для события, переданного триггеру. Следующий файл function.json определяет триггер для конкретного поставщика, используя универсальную схему Avro.
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaAvroGenericSingle",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
Затем при активации функции выполняется следующий код:
module.exports = async function (context, event) {
context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};
Полный набор рабочих примеров JavaScript см. в репозитории расширений Kafka.
Конкретные свойства файла function.json зависят от поставщика событий. В данных примерах это Confluent или Центры событий Azure. В следующих примерах показан триггер Kafka для функции, которая считывает и регистрирует сообщение Kafka.
Следующий файл function.json определяет триггер для конкретного поставщика.
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
Затем при активации функции выполняется следующий код:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Чтобы получить события в пакете, задайте для параметра cardinality
в файле function.json значение many
, как показано в приведенных ниже примерах.
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
Затем следующий код анализирует массив событий и регистрирует данные события:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
$event = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $event.Value"
}
Следующий код также регистрирует данные заголовка:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
foreach ($kafkaEvent in $kafkaEvents) {
$kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
Write-Output "Headers for this message:"
foreach ($header in $kevent.Headers) {
$DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
$Key = $header.Key
Write-Output "Key: $Key Value: $DecodedValue"
}
}
Вы можете определить универсальную схему Avro для события, переданного триггеру. Следующий файл function.json определяет триггер для конкретного поставщика, используя универсальную схему Avro.
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvent",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
Затем при активации функции выполняется следующий код:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Полный набор рабочих примеров PowerShell см. в репозитории расширений Kafka.
Конкретные свойства файла function.json зависят от поставщика событий. В данных примерах это Confluent или Центры событий Azure. В следующих примерах показан триггер Kafka для функции, которая считывает и регистрирует сообщение Kafka.
Следующий файл function.json определяет триггер для конкретного поставщика.
{
"scriptFile": "main.py",
"bindings": [
{
"type": "kafkaTrigger",
"name": "kevent",
"topic": "topic",
"brokerList": "%BrokerList%",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"consumerGroup" : "functions",
"protocol": "saslSsl",
"authenticationMode": "plain"
}
]
}
Затем при активации функции выполняется следующий код:
import logging
from azure.functions import KafkaEvent
def main(kevent : KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
Чтобы получить события в пакете, задайте для параметра cardinality
в файле function.json значение many
, как показано в приведенных ниже примерах.
{
"scriptFile": "main.py",
"bindings": [
{
"type" : "kafkaTrigger",
"direction": "in",
"name" : "kevents",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"topic" : "message_python",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"dataType": "string",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"BrokerList" : "%BrokerList%"
}
]
}
Затем следующий код анализирует массив событий и регистрирует данные события:
import logging
import typing
from azure.functions import KafkaEvent
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
logging.info(event.get_body())
Следующий код также регистрирует данные заголовка:
import logging
import typing
from azure.functions import KafkaEvent
import json
import base64
def main(kevents : typing.List[KafkaEvent]):
for event in kevents:
event_dec = event.get_body().decode('utf-8')
event_json = json.loads(event_dec)
logging.info("Python Kafka trigger function called for message " + event_json["Value"])
headers = event_json["Headers"]
for header in headers:
logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))
Вы можете определить универсальную схему Avro для события, переданного триггеру. Следующий файл function.json определяет триггер для конкретного поставщика, используя универсальную схему Avro.
{
"scriptFile": "main.py",
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaTriggerAvroGeneric",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
Затем при активации функции выполняется следующий код:
import logging
from azure.functions import KafkaEvent
def main(kafkaTriggerAvroGeneric : KafkaEvent):
logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
logging.info(kafkaTriggerAvroGeneric.metadata)
Полный набор рабочих примеров Python см. в репозитории расширений Kafka.
Заметки, используемые для настройки триггера, зависят от конкретного поставщика событий.
В следующем примере показана функция Java, которая считывает и регистрирует содержимое события Kafka.
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) String kafkaEventData,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
}
Чтобы получить события в пакете, используйте входную строку в качестве массива, как показано в следующем примере:
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
Следующая функция регистрирует сообщение и заголовки для события Kafka.
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
Вы можете определить универсальную схему Avro для события, переданного триггеру. Следующая функция определяет триггер для конкретного поставщика, используя универсальную схему Avro.
private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";
@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
@KafkaTrigger(
name = "kafkaAvroGenericSingle",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "ConfluentCloudUsername",
password = "ConfluentCloudPassword",
avroSchema = schema,
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL) Payment payment,
final ExecutionContext context) {
context.getLogger().info(payment.toString());
}
Полный набор рабочих примеров Java для Confluent см. в репозитории расширений Kafka.
Атрибуты
Библиотеки C# в процессе и изолированном рабочем процессе используются KafkaTriggerAttribute
для определения триггера функции.
В следующей таблице описаны свойства, которые можно задать с помощью этого атрибута триггера:
Параметр | Описание |
---|---|
BrokerList | (Обязательно) Список брокеров Kafka, отслеживаемых триггером. Дополнительные сведения см. в разделе Подключения. |
Раздел | (Обязательно) Раздел, отслеживаемый триггером. |
ConsumerGroup | (Необязательно) Группа потребителей Kafka, используемая триггером. |
AvroSchema | (Необязательно) Схема универсальной записи при использовании протокола Avro. |
AuthenticationMode | (Необязательно) Режим, используемый при простой проверке подлинности и проверке подлинности уровня безопасности (SASL). Поддерживаемые значения: Gssapi , Plain (по умолчанию), ScramSha256 , ScramSha512 . |
Username | (Необязательно) Имя пользователя для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi . Дополнительные сведения см. в разделе Подключения. |
Пароль | (Необязательно) Пароль для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi . Дополнительные сведения см. в разделе Подключения. |
Протокол | (Необязательно) Протокол безопасности, используемый при взаимодействии с брокерами. Поддерживаемые значения: plaintext (по умолчанию), ssl , sasl_plaintext , sasl_ssl . |
SslCaLocation | (Необязательно) Путь к файлу сертификата ЦС для проверки сертификата брокера. |
SslCertificateLocation | (Необязательно) Путь к сертификату клиента. |
SslKeyLocation | (Необязательно) Путь к закрытому ключу клиента (PEM), используемому для проверки подлинности. |
SslKeyPassword | (Необязательно) Пароль для сертификата клиента. |
Заметки
Заметка KafkaTrigger
позволяет создать функцию, которая выполняется при получении раздела. Поддерживаемые варианты включают следующие элементы:
Элемент | Description |
---|---|
name | (Обязательно) Имя переменной, представляющей сообщение очереди или раздела в коде функции. |
brokerList | (Обязательно) Список брокеров Kafka, отслеживаемых триггером. Дополнительные сведения см. в разделе Подключения. |
topic | (Обязательно) Раздел, отслеживаемый триггером. |
кратность | (Необязательно) Указывает кратность входных данных триггера. Поддерживаемые значения: ONE (по умолчанию) и MANY . Используйте ONE , если входные данные являются одним сообщением, и MANY , если входные данные являются массивом сообщений. При использовании MANY необходимо также задать параметр dataType . |
dataType | Определяет, как Функции обрабатывают значение параметра. По умолчанию полученное значение представляет собой строку, и Функции пытаются десериализовать ее до объекта POJO. При string входные данные обрабатываются просто как строка. При binary сообщение приходит в формате двоичных данных и Функции пытаются десериализовать его до фактического типа параметра byte[]. |
consumerGroup | (Необязательно) Группа потребителей Kafka, используемая триггером. |
avroSchema | (Необязательно) Схема универсальной записи при использовании протокола Avro. |
authenticationMode | (Необязательно) Режим, используемый при простой проверке подлинности и проверке подлинности уровня безопасности (SASL). Поддерживаемые значения: Gssapi , Plain (по умолчанию), ScramSha256 , ScramSha512 . |
username | (Необязательно) Имя пользователя для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi . Дополнительные сведения см. в разделе Подключения. |
пароль | (Необязательно) Пароль для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi . Дополнительные сведения см. в разделе Подключения. |
protocol | (Необязательно) Протокол безопасности, используемый при взаимодействии с брокерами. Поддерживаемые значения: plaintext (по умолчанию), ssl , sasl_plaintext , sasl_ssl . |
sslCaLocation | (Необязательно) Путь к файлу сертификата ЦС для проверки сертификата брокера. |
sslCertificateLocation | (Необязательно) Путь к сертификату клиента. |
sslKeyLocation | (Необязательно) Путь к закрытому ключу клиента (PEM), используемому для проверки подлинности. |
sslKeyPassword | (Необязательно) Пароль для сертификата клиента. |
Настройка
В следующей таблице описываются свойства конфигурации привязки, которые задаются в файле function.json.
Свойство в function.json | Описание |
---|---|
type | Обязательное. Необходимо задать значение kafkaTrigger . |
direction | Обязательное. Необходимо задать значение in . |
name | (Обязательно) Имя переменной, представляющей данные, полученные через брокер, в коде функции. |
brokerList | (Обязательно) Список брокеров Kafka, отслеживаемых триггером. Дополнительные сведения см. в разделе Подключения. |
topic | (Обязательно) Раздел, отслеживаемый триггером. |
кратность | (Необязательно) Указывает кратность входных данных триггера. Поддерживаемые значения: ONE (по умолчанию) и MANY . Используйте ONE , если входные данные являются одним сообщением, и MANY , если входные данные являются массивом сообщений. При использовании MANY необходимо также задать параметр dataType . |
dataType | Определяет, как Функции обрабатывают значение параметра. По умолчанию полученное значение представляет собой строку, и Функции пытаются десериализовать ее до объекта POJO. При string входные данные обрабатываются просто как строка. При binary сообщение приходит в формате двоичных данных и Функции пытаются десериализовать его до фактического типа параметра byte[]. |
consumerGroup | (Необязательно) Группа потребителей Kafka, используемая триггером. |
avroSchema | (Необязательно) Схема универсальной записи при использовании протокола Avro. |
authenticationMode | (Необязательно) Режим, используемый при простой проверке подлинности и проверке подлинности уровня безопасности (SASL). Поддерживаемые значения: Gssapi , Plain (по умолчанию), ScramSha256 , ScramSha512 . |
username | (Необязательно) Имя пользователя для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi . Дополнительные сведения см. в разделе Подключения. |
пароль | (Необязательно) Пароль для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi . Дополнительные сведения см. в разделе Подключения. |
protocol | (Необязательно) Протокол безопасности, используемый при взаимодействии с брокерами. Поддерживаемые значения: plaintext (по умолчанию), ssl , sasl_plaintext , sasl_ssl . |
sslCaLocation | (Необязательно) Путь к файлу сертификата ЦС для проверки сертификата брокера. |
sslCertificateLocation | (Необязательно) Путь к сертификату клиента. |
sslKeyLocation | (Необязательно) Путь к закрытому ключу клиента (PEM), используемому для проверки подлинности. |
sslKeyPassword | (Необязательно) Пароль для сертификата клиента. |
Использование
События Kafka в настоящее время поддерживаются как строки и массивы строк, которые являются полезными данными JSON.
Сообщения Kafka передаются функции в виде строк и массивов строк, которые являются полезными данными JSON.
В плане "Премиум" необходимо включить мониторинг масштабирования среды выполнения для выходных данных Kafka, чтобы иметь возможность горизонтально увеличить масштаб до нескольких экземпляров. Дополнительные сведения см. в статье Включение масштабирования среды выполнения.
Для работы с триггерами Kafka нельзя использовать функцию "Тест и запуск" страницы "Код и тест" на портале Azure. Вместо этого необходимо отправлять тестовые события непосредственно в раздел, отслеживаемый триггером.
Полный набор поддерживаемых параметров host.json для триггера Kafka см. в статье Параметры host.json.
Связи
Все сведения о подключении, необходимые триггерам и привязкам, должны храниться в параметрах приложения, а не в определениях привязок в коде. Это верно для учетных данных, которые никогда не должны храниться в коде.
Внимание
Параметры учетных данных должны ссылаться на параметр приложения. Не прописывайте учетные данные в файлах кода или конфигурации. При локальном запуске используйте файл local.settings.json для учетных данных и не публикуйте его.
При подключении к управляемому кластеру Kafka, предоставляемому Confluent в Azure, убедитесь, что в триггере или привязке заданы следующие учетные данные проверки подлинности для среды Confluent Cloud:
Параметр | Рекомендуемое значение | Description |
---|---|---|
BrokerList | BootstrapServer |
Параметр приложения с именем BootstrapServer содержит значение сервера начальной загрузки, найденное на странице параметров Confluent Cloud. Значение напоминает xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 . |
Username | ConfluentCloudUsername |
Параметр приложения с именем ConfluentCloudUsername содержит ключ доступа API, полученный от веб-сайта Confluent Cloud. |
Пароль | ConfluentCloudPassword |
Параметр приложения с именем ConfluentCloudPassword содержит секрет API, полученный от веб-сайта Confluent Cloud. |
Строковые значения, используемые для этих параметров, должны присутствовать в качестве параметров приложения в Azure или коллекции Values
в файле local.settings.json во время локальной разработки.
Необходимо также задать Protocol
, AuthenticationMode
и SslCaLocation
в определениях привязок.