你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure Functions 的 Apache Kafka 输出绑定

输出绑定允许 Azure Functions 应用将消息写入 Kafka 主题。

重要

Kafka 绑定仅适用于弹性高级计划和专用(应用服务)计划中的 Functions。 它们仅在 Functions 运行时的版本 3.x 及更高版本上受支持。

示例

绑定的用法取决于函数应用中使用的 C# 模式,可以是以下模式之一:

进程内类库是编译的 C# 函数,该函数在与 Functions 运行时相同的进程中运行。

要使用的属性取决于特定的事件提供程序。

以下示例演示了一个 C# 函数,该函数使用 HTTP GET 请求中提供的数据将单个消息发送到 Kafka 主题。

[FunctionName("KafkaOutput")]
public static IActionResult Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
    [Kafka("BrokerList",
            "topic",
            Username = "ConfluentCloudUserName",
            Password = "ConfluentCloudPassword",
            Protocol = BrokerProtocol.SaslSsl,
           AuthenticationMode = BrokerAuthenticationMode.Plain
    )] out string eventData,
    ILogger log)
{
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.Query["message"];

    string responseMessage = "Ok";            
    eventData = message;

    return new OkObjectResult(responseMessage);
}

若要批量发送事件,请使用 KafkaEventData 对象数组,如以下示例所示:

    [FunctionName("KafkaOutputMany")]
    public static IActionResult Output(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
        [Kafka("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
                Protocol = BrokerProtocol.SaslSsl,
               AuthenticationMode = BrokerAuthenticationMode.Plain
        )] out KafkaEventData<string>[] eventDataArr,
        ILogger log)
    {
        log.LogInformation("C# HTTP trigger function processed a request.");
        eventDataArr = new KafkaEventData<string>[2];
        eventDataArr[0] = new KafkaEventData<string>("one");
        eventDataArr[1] = new KafkaEventData<string>("two");
        return new OkObjectResult("Ok");
    }
}

以下函数将标头添加到 Kafka 输出数据:

{
    [FunctionName("KafkaOutputWithHeaders")]
    public static IActionResult Output(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
        [Kafka("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
                Protocol = BrokerProtocol.SaslSsl,
               AuthenticationMode = BrokerAuthenticationMode.Plain
        )] out KafkaEventData<string> eventData,
        ILogger log)
    {
        log.LogInformation("C# HTTP trigger function processed a request.");

        string message = req.Query["message"];        
        eventData = new KafkaEventData<string>(message);
        eventData.Headers.Add("test", System.Text.Encoding.UTF8.GetBytes("dotnet")); 

        return new OkObjectResult("Ok");
    }

如需整套可用 .NET 示例,请参阅 Kafka 扩展存储库

注意

如需一组等效的 TypeScript 示例,请参阅 Kafka 扩展存储库

function.json 文件的特定属性取决于事件提供程序,在这些示例中,事件提供程序为 Confluent 或 Azure 事件中心。 以下示例演示由 HTTP 请求触发的函数的 Kafka 输出绑定,并将来自请求的数据发送到 Kafka 主题。

以下 function.json 定义了这些示例中特定提供程序的触发器:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

然后,以下代码向主题发送消息:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

以下代码将多个消息作为数组发送到同一主题:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

以下示例演示如何将包含标头的事件消息发送到同一 Kafka 主题:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

如需整套可用 JavaScript 示例,请参阅 Kafka 扩展存储库

function.json 文件的特定属性取决于事件提供程序,在这些示例中,事件提供程序为 Confluent 或 Azure 事件中心。 以下示例演示由 HTTP 请求触发的函数的 Kafka 输出绑定,并将来自请求的数据发送到 Kafka 主题。

以下 function.json 定义了这些示例中特定提供程序的触发器:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

然后,以下代码向主题发送消息:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

以下代码将多个消息作为数组发送到同一主题:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

以下示例演示如何将包含标头的事件消息发送到同一 Kafka 主题:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

如需整套可用 PowerShell 示例,请参阅 Kafka 扩展存储库

function.json 文件的特定属性取决于事件提供程序,在这些示例中,事件提供程序为 Confluent 或 Azure 事件中心。 以下示例演示由 HTTP 请求触发的函数的 Kafka 输出绑定,并将来自请求的数据发送到 Kafka 主题。

以下 function.json 定义了这些示例中特定提供程序的触发器:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

然后,以下代码向主题发送消息:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

以下代码将多个消息作为数组发送到同一主题:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

以下示例演示如何将包含标头的事件消息发送到同一 Kafka 主题:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

如需整套可用 Python 示例,请参阅 Kafka 扩展存储库

用于配置输出绑定的注释取决于特定的事件提供程序。

以下函数向 Kafka 主题发送消息。

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

以下示例演示如何将多个消息发送到 Kafka 主题。

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

在此示例中,输出绑定参数更改为字符串数组。

最后一个示例用于这些 KafkaEntity 类和 KafkaHeader 类:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

以下示例函数向 Kafka 主题发送包含标头的消息。

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

如需适用于 Confluent 的整套可用 Java 示例,请参阅 Kafka 扩展存储库

属性

进程内独立进程 C# 库都使用 Kafka 特性来定义函数触发器。

下表说明了可使用特性设置的属性:

参数 说明
BrokerList (必需)输出发送到的 Kafka 中转站列表。 有关详细信息,请参阅连接
主题 (必需)输出发送到主题。
AvroSchema (可选)使用 Avro 协议时的通用记录的架构。
MaxMessageBytes (可选)发送的输出消息的最大大小 (MB),默认值为 1
BatchSize (可选)在单个消息集中批处理的最大消息数,默认值为 10000
EnableIdempotence (可选)设置为 true 可保证消息按原始生成顺序成功仅生成一次,默认值为 false
MessageTimeoutMs (可选)本地消息超时(毫秒)。 此值仅在本地强制执行,并限制生成的消息等待成功发送的时间,默认值为 300000。 时间为 0 表示无限时间。 此值是用于发送消息(包括重试)的最大时间。 超过重试次数或消息超时后,会产生发送错误。
RequestTimeoutMs (可选)输出请求的确认超时(毫秒),默认值为 5000
MaxRetries (可选)发送失败消息的重试次数,默认值为 2。 除非 EnableIdempotence 设置为 true,否则重试可能会导致重新排序。
AuthenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证模式。 支持的值为 GssapiPlain(默认值)、ScramSha256ScramSha512
用户名 (可选)用于 SASL 身份验证的用户名。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
密码 (可选)用于 SASL 身份验证的密码。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
协议 (可选)与代理通信时使用的安全协议。 支持的值为 plaintext(默认值)、sslsasl_plaintextsasl_ssl
SslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
SslCertificateLocation (可选)客户端证书的路径。
SslKeyLocation (可选)用于身份验证的客户端私钥 (PEM) 的路径。
SslKeyPassword (可选)客户端证书的密码。

批注

通过 KafkaOutput 批注,可以创建写入特定主题的函数。 支持的选项包括以下元素:

元素 说明
name 表示函数代码中代理数据的变量的名称。
brokerList (必需)输出发送到的 Kafka 中转站列表。 有关详细信息,请参阅连接
topic (必需)输出发送到主题。
dataType 定义 Functions 如何处理参数值。 默认情况下,该值将作为字符串获取,Functions 会尝试将该字符串反序列化为实际的普通 Java 对象 (POJO)。 当输入为 string 时,输入按原样被视为字符串。 当输入为 binary 时,消息将作为二进制数据接收,Functions 会尝试将其反序列化为实际的参数类型 byte[]。
avroSchema (可选)使用 Avro 协议时的通用记录的架构。
maxMessageBytes (可选)发送的输出消息的最大大小 (MB),默认值为 1
batchSize (可选)在单个消息集中批处理的最大消息数,默认值为 10000
enableIdempotence (可选)设置为 true 可保证消息按原始生成顺序成功仅生成一次,默认值为 false
messageTimeoutMs (可选)本地消息超时(毫秒)。 此值仅在本地强制执行,并限制生成的消息等待成功发送的时间,默认值为 300000。 时间为 0 表示无限时间。 这是用于发送消息(包括重试)的最大时间。 超过重试次数或消息超时后,会产生发送错误。
requestTimeoutMs (可选)输出请求的确认超时(毫秒),默认值为 5000
maxRetries (可选)发送失败消息的重试次数,默认值为 2。 除非 EnableIdempotence 设置为 true,否则重试可能会导致重新排序。
authenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证模式。 支持的值为 GssapiPlain(默认值)、ScramSha256ScramSha512
username (可选)用于 SASL 身份验证的用户名。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
password (可选)用于 SASL 身份验证的密码。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
protocol (可选)与代理通信时使用的安全协议。 支持的值为 plaintext(默认值)、sslsasl_plaintextsasl_ssl
sslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
sslCertificateLocation (可选)客户端证书的路径。
sslKeyLocation (可选)用于身份验证的客户端私钥 (PEM) 的路径。
sslKeyPassword (可选)客户端证书的密码。

配置

下表解释了在 function.json 文件中设置的绑定配置属性。

“function.json”属性 说明
type 必须设置为 kafka
direction 必须设置为 out
name 表示函数代码中代理数据的变量的名称。
brokerList (必需)输出发送到的 Kafka 中转站列表。 有关详细信息,请参阅连接
topic (必需)输出发送到主题。
avroSchema (可选)使用 Avro 协议时的通用记录的架构。
maxMessageBytes (可选)发送的输出消息的最大大小 (MB),默认值为 1
batchSize (可选)在单个消息集中批处理的最大消息数,默认值为 10000
enableIdempotence (可选)设置为 true 可保证消息按原始生成顺序成功仅生成一次,默认值为 false
messageTimeoutMs (可选)本地消息超时(毫秒)。 此值仅在本地强制执行,并限制生成的消息等待成功发送的时间,默认值为 300000。 时间为 0 表示无限时间。 这是用于发送消息(包括重试)的最大时间。 超过重试次数或消息超时后,会产生发送错误。
requestTimeoutMs (可选)输出请求的确认超时(毫秒),默认值为 5000
maxRetries (可选)发送失败消息的重试次数,默认值为 2。 除非 EnableIdempotence 设置为 true,否则重试可能会导致重新排序。
authenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证模式。 支持的值为 GssapiPlain(默认值)、ScramSha256ScramSha512
username (可选)用于 SASL 身份验证的用户名。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
password (可选)用于 SASL 身份验证的密码。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
protocol (可选)与代理通信时使用的安全协议。 支持的值为 plaintext(默认值)、sslsasl_plaintextsasl_ssl
sslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
sslCertificateLocation (可选)客户端证书的路径。
sslKeyLocation (可选)用于身份验证的客户端私钥 (PEM) 的路径。
sslKeyPassword (可选)客户端证书的密码。

使用情况

内置 AvroProtobuf 序列化支持键和值类型。

事件的偏移、分区和时间戳在运行时生成。 在函数内只能设置值和标头。 主题在 function.json 中设置。

请确保有权访问你尝试写入的 Kafka 主题。 为绑定配置对 Kafka 主题的访问和连接凭据。

在高级计划中,必须为 Kafka 输出启用运行时缩放监视才能横向扩展为多个实例。 有关详细信息,请参阅启用运行时缩放

有关 Kafka 触发器支持的整套 host.json 设置,请参阅 host.json 设置

连接

触发器和绑定所需的所有连接信息应在应用程序设置中维护,而不能在代码中的绑定定义中维护。 对于凭据而言就是如此,永远不应该将凭据存储在代码中。

重要

凭据设置必须引用应用程序设置。 不要在代码或配置文件中硬编码凭据。 在本地运行时,请为凭据使用 local.settings.json 文件,且不要发布 local.settings.json 文件。

连接到 Azure 中的 Confluent 提供的托管 Kafka 群集时,请确保在触发器或绑定中为 Confluent Cloud 环境设置以下身份验证凭据:

设置 建议的值 说明
BrokerList BootstrapServer 名为 BootstrapServer 的应用设置包含 Confluent Cloud 设置页中的启动服务器值。 该值类似于 xyz-xyzxzy.westeurope.azure.confluent.cloud:9092
用户名 ConfluentCloudUsername 名为 ConfluentCloudUsername 的应用设置包含来自 Confluent Cloud 网站的 API 访问密钥。
密码 ConfluentCloudPassword 名为 ConfluentCloudPassword 的应用设置包含从 Confluent Cloud 网站获取的 API 机密。

在本地开发期间,用于这些设置的字符串值必须作为 Azure 中的应用程序设置存在,或出现在 local.settings.json 文件Values 集合中。

此外,应在绑定定义中设置 ProtocolAuthenticationModeSslCaLocation

后续步骤