Share via


Azure Functions에 대한 Apache Kafka 트리거

Azure Functions의 Apache Kafka 트리거를 사용하여 Kafka 토픽의 메시지에 대한 응답으로 함수 코드를 실행할 수 있습니다. Kafka 출력 바인딩을 사용하여 함수에서 토픽으로 쓸 수도 있습니다. 설정 및 구성 세부 정보에 대한 내용은 Azure Functions 개요에 대한 Apache Kafka 바인딩을 참조하세요.

Important

Kafka 바인딩은 탄력적 프리미엄 플랜전용(App Service) 플랜의 Functions에만 사용할 수 있습니다. Functions 런타임 버전 3.x 이상에서만 지원됩니다.

예시

트리거의 사용은 함수 앱에서 사용되는 C# 형식에 따라 다르며 다음 중 하나일 수 있습니다.

격리된 작업자 프로세스 클래스 라이브러리 컴파일된 C# 함수는 런타임에서 격리된 프로세스에서 실행됩니다.

사용하는 특성은 이벤트 공급자에 따라 달라집니다.

다음 예제는 Kafka 메시지를 Kafka 이벤트로 읽고 로그하는 C# 함수를 보여 줍니다.

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

일괄 처리로 이벤트를 수신하려면 다음 예제와 같이 문자열 배열을 입력으로 사용합니다.

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

다음 함수는 Kafka 이벤트에 대한 메시지 및 헤더를 기록합니다.

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

작동하는 .NET 전체 예제는 Kafka 확장 리포지토리를 참조하세요.

참고 항목

상응하는 TypeScript 예제 세트는 Kafka 확장 리포지토리를 참조하세요.

function.json 파일의 특정 속성은 이벤트 공급자에 따라 달라지며, 다음 예제에서는 Confluent 또는 Azure Event Hubs입니다. 다음 예제에서는 Kafka 메시지를 읽고 기록하는 함수에 대한 Kafka 트리거를 보여 줍니다.

다음 function.json은 특정 공급자에 대한 트리거를 정의합니다.

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

다음 코드는 함수가 트리거될 때 실행됩니다.

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

일괄 처리에서 이벤트를 수신하려면 다음 예제에 표시된 대로 function.json 파일에서 cardinality 값을 many로 설정합니다.

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

다음 코드는 이벤트 배열을 구문 분석하고 이벤트 데이터를 기록합니다.

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

다음 코드는 헤더 데이터도 기록합니다.

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

트리거에 전달된 이벤트에 대한 제네릭 Avro 스키마를 정의할 수 있습니다. 다음 function.json은 제네릭 Avro 스키마를 사용하여 특정 공급자에 대한 트리거를 정의합니다.

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

다음 코드는 함수가 트리거될 때 실행됩니다.

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

작동하는 JavaScript 전체 예제는 Kafka 확장 리포지토리를 참조하세요.

function.json 파일의 특정 속성은 이벤트 공급자에 따라 달라지며, 다음 예제에서는 Confluent 또는 Azure Event Hubs입니다. 다음 예제에서는 Kafka 메시지를 읽고 기록하는 함수에 대한 Kafka 트리거를 보여 줍니다.

다음 function.json은 특정 공급자에 대한 트리거를 정의합니다.

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

다음 코드는 함수가 트리거될 때 실행됩니다.

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

일괄 처리에서 이벤트를 수신하려면 다음 예제에 표시된 대로 function.json 파일에서 cardinality 값을 many로 설정합니다.

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

다음 코드는 이벤트 배열을 구문 분석하고 이벤트 데이터를 기록합니다.

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

다음 코드는 헤더 데이터도 기록합니다.

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

트리거에 전달된 이벤트에 대한 제네릭 Avro 스키마를 정의할 수 있습니다. 다음 function.json은 제네릭 Avro 스키마를 사용하여 특정 공급자에 대한 트리거를 정의합니다.

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

다음 코드는 함수가 트리거될 때 실행됩니다.

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

작동하는 PowerShell 예제의 전체 집합은 Kafka 확장 리포지토리를 참조하세요.

function.json 파일의 특정 속성은 이벤트 공급자에 따라 달라지며, 다음 예제에서는 Confluent 또는 Azure Event Hubs입니다. 다음 예제에서는 Kafka 메시지를 읽고 기록하는 함수에 대한 Kafka 트리거를 보여 줍니다.

다음 function.json은 특정 공급자에 대한 트리거를 정의합니다.

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

다음 코드는 함수가 트리거될 때 실행됩니다.

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

일괄 처리에서 이벤트를 수신하려면 다음 예제에 표시된 대로 function.json 파일에서 cardinality 값을 many로 설정합니다.

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

다음 코드는 이벤트 배열을 구문 분석하고 이벤트 데이터를 기록합니다.

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

다음 코드는 헤더 데이터도 기록합니다.

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

트리거에 전달된 이벤트에 대한 제네릭 Avro 스키마를 정의할 수 있습니다. 다음 function.json은 제네릭 Avro 스키마를 사용하여 특정 공급자에 대한 트리거를 정의합니다.

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

다음 코드는 함수가 트리거될 때 실행됩니다.

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

작동하는 Python 전체 예제는 Kafka 확장 리포지토리를 참조하세요.

트리거를 구성하는 데 사용하는 주석은 특정 이벤트 공급자에 따라 달라집니다.

다음 예제는 Kafka 이벤트의 콘텐츠를 읽고 기록하는 Java 함수를 보여 줍니다.

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

일괄 처리로 이벤트를 수신하려면 다음 예제와 같이 입력 문자열을 배열로 사용합니다.

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

다음 함수는 Kafka 이벤트에 대한 메시지 및 헤더를 기록합니다.

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

트리거에 전달된 이벤트에 대한 제네릭 Avro 스키마를 정의할 수 있습니다. 다음 함수는 제네릭 Avro 스키마를 사용하여 특정 공급자에 대한 트리거를 정의합니다.

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

Confluent에서 작동하는 Java 전체 예제는 Kafka 확장 리포지토리를 참조하세요.

특성

In Process격리된 작업자 프로세스 C# 라이브러리는 모두 KafkaTriggerAttribute를 사용하여 함수 트리거를 정의합니다.

다음 표에서는 이 트리거 특성을 사용하여 설정할 수 있는 특성을 설명합니다.

매개 변수 설명
BrokerList (필수) 트리거에서 모니터링하는 Kafka 브로커 목록입니다. 자세한 내용은 연결을 참조하세요.
항목 (필수) 트리거에서 모니터링하는 항목입니다.
ConsumerGroup (선택 사항) 트리거에서 사용하는 Kafka 소비자 그룹입니다.
AvroSchema (선택 사항) Avro 프로토콜을 사용하는 경우 제네릭 레코드의 스키마입니다.
AuthenticationMode (선택 사항) SASL(단순 인증 및 보안 계층) 인증을 사용할 때의 인증 모드입니다. 지원되는 값은 Gssapi, Plain(기본값), ScramSha256, ScramSha512입니다.
사용자 이름 (선택 사항) SASL 인증의 사용자 이름입니다. AuthenticationModeGssapi일 때는 지원되지 않습니다. 자세한 내용은 연결을 참조하세요.
암호 (선택 사항) SASL 인증의 암호입니다. AuthenticationModeGssapi일 때는 지원되지 않습니다. 자세한 내용은 연결을 참조하세요.
프로토콜 (선택 사항) broker와 통신할 때 사용되는 보안 프로토콜입니다. 지원되는 값은 plaintext(기본값), ssl, sasl_plaintext, sasl_ssl입니다.
SslCaLocation (선택 사항) broker의 인증서를 확인하기 위한 CA 인증서 파일의 경로입니다.
SslCertificateLocation (선택 사항) 클라이언트의 인증서 경로입니다.
SslKeyLocation (선택 사항) 인증에 사용되는 클라이언트의 프라이빗 키(PEM) 경로입니다.
SslKeyPassword (선택 사항) 클라이언트의 인증서 암호입니다.

주석

KafkaTrigger 주석을 사용하면 토픽을 수신할 때 실행되는 함수를 만들 수 있습니다. 지원되는 옵션에는 다음 요소가 포함됩니다.

요소 설명
이름 (필수) 함수 코드에서 큐 또는 토픽 메시지를 나타내는 변수의 이름입니다.
brokerList (필수) 트리거에서 모니터링하는 Kafka 브로커 목록입니다. 자세한 내용은 연결을 참조하세요.
topic (필수) 트리거에서 모니터링하는 항목입니다.
cardinality (선택 사항) 트리거 입력의 카디널리티를 나타냅니다. 지원되는 값은 ONE(기본값) 및 MANY입니다. 입력이 단일 메시지인 경우 ONE을 사용하고 입력이 메시지 배열인 경우 MANY를 사용합니다. MANY를 사용할 경우 dataType도 설정해야 합니다.
dataType Functions에서 매개 변수 값을 처리하는 방법을 정의합니다. 기본적으로 값은 문자열로 가져오며, Functions는 문자열을 실제 POJO(Plain-Old Java Object)로 역직렬화하려고 합니다. string인 경우 입력은 문자열로만 처리됩니다. binary인 경우 메시지가 이진 데이터로 수신되고 Functions에서 실제 매개 변수 형식 byte[]로 역직렬화하려고 시도합니다.
consumerGroup (선택 사항) 트리거에서 사용하는 Kafka 소비자 그룹입니다.
avroSchema (선택 사항) Avro 프로토콜을 사용하는 경우 제네릭 레코드의 스키마입니다.
authenticationMode (선택 사항) SASL(단순 인증 및 보안 계층) 인증을 사용할 때의 인증 모드입니다. 지원되는 값은 Gssapi, Plain(기본값), ScramSha256, ScramSha512입니다.
username (선택 사항) SASL 인증의 사용자 이름입니다. AuthenticationModeGssapi일 때는 지원되지 않습니다. 자세한 내용은 연결을 참조하세요.
password (선택 사항) SASL 인증의 암호입니다. AuthenticationModeGssapi일 때는 지원되지 않습니다. 자세한 내용은 연결을 참조하세요.
protocol (선택 사항) broker와 통신할 때 사용되는 보안 프로토콜입니다. 지원되는 값은 plaintext(기본값), ssl, sasl_plaintext, sasl_ssl입니다.
sslCaLocation (선택 사항) broker의 인증서를 확인하기 위한 CA 인증서 파일의 경로입니다.
sslCertificateLocation (선택 사항) 클라이언트의 인증서 경로입니다.
sslKeyLocation (선택 사항) 인증에 사용되는 클라이언트의 프라이빗 키(PEM) 경로입니다.
sslKeyPassword (선택 사항) 클라이언트의 인증서 암호입니다.

구성

다음 표에서는 function.json 파일에 설정된 바인딩 구성 속성을 설명합니다.

function.json 속성 설명
type (필수) kafkaTrigger로 설정해야 합니다.
direction (필수) in로 설정해야 합니다.
이름 (필수) 함수 코드에서 조정된 데이터를 나타내는 변수의 이름입니다.
brokerList (필수) 트리거에서 모니터링하는 Kafka 브로커 목록입니다. 자세한 내용은 연결을 참조하세요.
topic (필수) 트리거에서 모니터링하는 항목입니다.
cardinality (선택 사항) 트리거 입력의 카디널리티를 나타냅니다. 지원되는 값은 ONE(기본값) 및 MANY입니다. 입력이 단일 메시지인 경우 ONE을 사용하고 입력이 메시지 배열인 경우 MANY를 사용합니다. MANY를 사용할 경우 dataType도 설정해야 합니다.
dataType Functions에서 매개 변수 값을 처리하는 방법을 정의합니다. 기본적으로 값은 문자열로 가져오며, Functions는 문자열을 실제 POJO(Plain-Old Java Object)로 역직렬화하려고 합니다. string인 경우 입력은 문자열로만 처리됩니다. binary인 경우 메시지가 이진 데이터로 수신되고 Functions에서 실제 매개 변수 형식 byte[]로 역직렬화하려고 시도합니다.
consumerGroup (선택 사항) 트리거에서 사용하는 Kafka 소비자 그룹입니다.
avroSchema (선택 사항) Avro 프로토콜을 사용하는 경우 제네릭 레코드의 스키마입니다.
authenticationMode (선택 사항) SASL(단순 인증 및 보안 계층) 인증을 사용할 때의 인증 모드입니다. 지원되는 값은 Gssapi, Plain(기본값), ScramSha256, ScramSha512입니다.
username (선택 사항) SASL 인증의 사용자 이름입니다. AuthenticationModeGssapi일 때는 지원되지 않습니다. 자세한 내용은 연결을 참조하세요.
password (선택 사항) SASL 인증의 암호입니다. AuthenticationModeGssapi일 때는 지원되지 않습니다. 자세한 내용은 연결을 참조하세요.
protocol (선택 사항) broker와 통신할 때 사용되는 보안 프로토콜입니다. 지원되는 값은 plaintext(기본값), ssl, sasl_plaintext, sasl_ssl입니다.
sslCaLocation (선택 사항) broker의 인증서를 확인하기 위한 CA 인증서 파일의 경로입니다.
sslCertificateLocation (선택 사항) 클라이언트의 인증서 경로입니다.
sslKeyLocation (선택 사항) 인증에 사용되는 클라이언트의 프라이빗 키(PEM) 경로입니다.
sslKeyPassword (선택 사항) 클라이언트의 인증서 암호입니다.

사용

Kafka 이벤트는 현재 JSON 페이로드인 문자열 및 문자열 배열로 지원됩니다.

Kafka 메시지는 JSON 페이로드인 문자열 및 문자열 배열로 함수에 전달됩니다.

프리미엄 플랜에서는 Kafka 출력이 여러 인스턴스로 스케일 아웃될 수 있도록 런타임 스케일링 모니터링을 사용하도록 설정해야 합니다. 자세한 내용은 런타임 스케일링 사용을 참조하세요.

Azure Portal에서 코드 + 테스트 페이지의 테스트/실행 기능을 사용하여 Kafka 트리거를 사용할 수 없습니다. 대신 테스트 이벤트를 트리거에 의해 모니터링되는 토픽으로 직접 보내야 합니다.

Kafka 트리거를 지원하는 host.json 전체 설정은 host.json 설정을 참조하세요.

연결

트리거 및 바인딩에 필요한 모든 연결 정보는 코드의 바인딩 정의가 아닌 애플리케이션 설정에서 유지 관리되어야 합니다. 이는 절대로 코드에 저장하면 안 되는 자격 증명에 적용됩니다.

Important

자격 증명 설정은 애플리케이션 설정을 참조해야 합니다. 코드 또는 구성 파일에서 자격 증명을 하드 코딩하지 마세요. 로컬로 실행하는 경우 자격 증명에 local.settings.json 파일을 사용하고 local.settings.json 파일을 게시하지 마세요.

Azure의 Confluent에서 제공하는 관리형 Kafka 클러스터에 연결할 때 Confluent Cloud 환경에 대한 다음 인증 자격 증명이 트리거 또는 바인딩에 설정되어 있는지 확인합니다.

설정 권장 값 설명
BrokerList BootstrapServer BootstrapServer라는 앱 설정에는 Confluent Cloud 설정 페이지에 있는 부트스트랩 서버의 값이 포함됩니다. 값은 xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 형식입니다.
사용자 이름 ConfluentCloudUsername ConfluentCloudUsername이라는 앱 설정에는 Confluent Cloud 웹 사이트의 API 액세스 키가 포함됩니다.
암호 ConfluentCloudPassword ConfluentCloudPassword라는 앱 설정에는 Confluent Cloud 웹 사이트의 API 비밀이 포함됩니다.

로컬 개발 중에 이러한 설정에 사용하는 문자열 값은 Azure의 애플리케이션 설정으로 또는 local.settings.json 파일Values 컬렉션에 있어야 합니다.

또한 바인딩 정의에서 Protocol, AuthenticationModeSslCaLocation을 설정해야 합니다.

다음 단계