分享方式:


Azure Functions 的 Apache Kafka 觸發程序

您可以使用 Azure Functions 中的 Apache Kafka 觸發程式來執行函式程式碼,以回應 Kafka 主題中的訊息。 您也可以使用 Kafka 輸出系結 ,從函式寫入主題。 如需安裝和組態詳細數據的詳細資訊,請參閱 Azure Functions 的 Apache Kafka 系結概觀

重要

Kafka 系結僅適用於彈性進階方案和專用 (App Service) 方案中的 Functions。 只有 3.x 版和更新版本的 Functions 運行時間才支援它們。

範例

觸發程式的使用方式取決於函式應用程式中所使用的 C# 形式,這可以是下列其中一種模式:

已編譯 C# 函式的隔離背景工作進程類別庫會在與運行時間隔離的進程中執行。

您使用的屬性取決於特定事件提供者。

下列範例示範 C# 函式,以 Kafka 事件的形式讀取和記錄 Kafka 訊息:

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

若要接收批次中的事件,請使用字串陣列做為輸入,如下列範例所示:

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

下列函式會記錄 Kafka 事件的訊息和標頭:

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

如需一組完整的工作 .NET 範例,請參閱 Kafka擴充功能存放庫

注意

如需一組對等的 TypeScript 範例,請參閱 Kafka 擴充功能存放庫

function.json檔案的特定屬性取決於您的事件提供者,在這些範例中為 Confluent 或 Azure 事件中樞。 下列範例顯示讀取和記錄 Kafka 訊息之函式的 Kafka 觸發程式。

下列function.json會定義特定提供者的觸發程式:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

然後,下列程式代碼會在觸發函式時執行:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

若要在批次中接收事件,請將 cardinality function.json 檔案中的 值設定為 many ,如下列範例所示:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

下列程式代碼接著會剖析事件陣列,並記錄事件資料:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

下列程式代碼也會記錄標頭資料:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

您可以定義傳遞至觸發程式之事件的一般 Avro 架構 。 下列function.json會使用一般 Avro 架構來定義特定提供者的觸發程式:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

然後,下列程式代碼會在觸發函式時執行:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

如需一組完整的 JavaScript 範例,請參閱 Kafka 擴充功能存放庫

function.json檔案的特定屬性取決於您的事件提供者,在這些範例中為 Confluent 或 Azure 事件中樞。 下列範例顯示讀取和記錄 Kafka 訊息之函式的 Kafka 觸發程式。

下列function.json會定義特定提供者的觸發程式:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

然後,下列程式代碼會在觸發函式時執行:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

若要在批次中接收事件,請將 cardinality function.json 檔案中的 值設定為 many ,如下列範例所示:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

下列程式代碼接著會剖析事件陣列,並記錄事件資料:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

下列程式代碼也會記錄標頭資料:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

您可以定義傳遞至觸發程式之事件的一般 Avro 架構 。 下列function.json會使用一般 Avro 架構來定義特定提供者的觸發程式:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

然後,下列程式代碼會在觸發函式時執行:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

如需一組完整的運作 PowerShell 範例,請參閱 Kafka 擴充功能存放庫

function.json檔案的特定屬性取決於您的事件提供者,在這些範例中為 Confluent 或 Azure 事件中樞。 下列範例顯示讀取和記錄 Kafka 訊息之函式的 Kafka 觸發程式。

下列function.json會定義特定提供者的觸發程式:

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

然後,下列程式代碼會在觸發函式時執行:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

若要在批次中接收事件,請將 cardinality function.json 檔案中的 值設定為 many ,如下列範例所示:

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

下列程式代碼接著會剖析事件陣列,並記錄事件資料:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

下列程式代碼也會記錄標頭資料:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

您可以定義傳遞至觸發程式之事件的一般 Avro 架構 。 下列function.json會使用一般 Avro 架構來定義特定提供者的觸發程式:

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

然後,下列程式代碼會在觸發函式時執行:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

如需一組完整的工作 Python 範例,請參閱 Kafka 擴充功能存放庫

您用來設定觸發程式的批註取決於特定事件提供者。

下列範例顯示 Java 函式,可讀取和記錄 Kafka 事件的內容:

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

若要接收批次中的事件,請使用輸入字串作為陣列,如下列範例所示:

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

下列函式會記錄 Kafka 事件的訊息和標頭:

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

您可以定義傳遞至觸發程式之事件的一般 Avro 架構 。 下列函式會使用一般 Avro 架構來定義特定提供者的觸發程式:

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

如需 Confluent 的完整工作 Java 範例集,請參閱 Kafka 擴充功能存放庫

屬性

進程內隔離的背景工作進程 C# 連結庫都會使用 KafkaTriggerAttribute 來定義函式觸發程式。

下表說明您可以使用這個觸發程式屬性來設定的屬性:

參數 描述
BrokerList (必要)觸發程式所監視的 Kafka 訊息代理程式清單。 如需詳細資訊,請參閱 連線
主題 (必要)觸發程式所監視的主題。
ConsumerGroup (選擇性)觸發程式所使用的 Kafka 取用者群組。
AvroSchema (選擇性)使用 Avro 通訊協定時,一般記錄的架構。
AuthenticationMode (選擇性)使用簡單驗證和安全性層 (SASL) 驗證時的驗證模式。 支援的值為Gssapi、 (預設值)、、 ScramSha256ScramSha512Plain
使用者名稱 (選擇性)SASL 驗證的用戶名稱。 當 為GssapiAuthenticationMode不受支援。 如需詳細資訊,請參閱 連線
密碼 (選擇性)SASL 驗證的密碼。 當 為GssapiAuthenticationMode不受支援。 如需詳細資訊,請參閱 連線
通訊協定 (選擇性)與訊息代理程式通訊時所使用的安全性通訊協定。 支援的值為 plaintext (預設值)、、sslsasl_plaintextsasl_ssl
SslCaLocation (選擇性)用於驗證訊息代理程序憑證的 CA 憑證檔案路徑。
SslCertificateLocation (選擇性)用戶端憑證的路徑。
SslKeyLocation (選擇性)用於驗證之用戶端私鑰 (PEM) 的路徑。
SslKeyPassword (選擇性)用戶端憑證的密碼。

註釋

KafkaTrigger 註釋可讓您建立函式,以在收到主題時執行。 支援的選項包括下列元素:

元素 描述
name (必要)代表函式程式代碼中佇列或主題訊息的變數名稱。
brokerList (必要)觸發程式所監視的 Kafka 訊息代理程式清單。 如需詳細資訊,請參閱 連線
topic (必要)觸發程式所監視的主題。
基數 (選擇性)表示觸發程式輸入的基數。 支援的值為 ONE (預設值) 和 MANYONE當輸入是單一訊息,當MANY輸入是訊息數位時使用。 當您使用 MANY時,也必須設定 dataType
dataType 定義 Functions 如何處理參數值。 根據預設,會以字串形式取得值,Functions 會嘗試將字串還原串行化為實際的純舊 Java 物件 (POJO)。 當 為 時 string,輸入會視為字串。 當 為 時 binary,訊息會以二進位數據的形式接收,而 Functions 會嘗試將它還原串行化為實際的參數類型 byte[]。
consumerGroup (選擇性)觸發程式所使用的 Kafka 取用者群組。
avroSchema (選擇性)使用 Avro 通訊協定時,一般記錄的架構。
authenticationMode (選擇性)使用簡單驗證和安全性層 (SASL) 驗證時的驗證模式。 支援的值為Gssapi、 (預設值)、、 ScramSha256ScramSha512Plain
username (選擇性)SASL 驗證的用戶名稱。 當 為GssapiAuthenticationMode不受支援。 如需詳細資訊,請參閱 連線
password (選擇性)SASL 驗證的密碼。 當 為GssapiAuthenticationMode不受支援。 如需詳細資訊,請參閱 連線
protocol (選擇性)與訊息代理程式通訊時所使用的安全性通訊協定。 支援的值為 plaintext (預設值)、、sslsasl_plaintextsasl_ssl
sslCaLocation (選擇性)用於驗證訊息代理程序憑證的 CA 憑證檔案路徑。
sslCertificateLocation (選擇性)用戶端憑證的路徑。
sslKeyLocation (選擇性)用於驗證之用戶端私鑰 (PEM) 的路徑。
sslKeyPassword (選擇性)用戶端憑證的密碼。

組態

下表說明您在 function.json 檔案中設定的繫結設定屬性。

function.json 屬性 描述
type (必要)必須設定為 kafkaTrigger
direction (必要)必須設定為 in
name (必要)代表函式程式碼中代理數據的變數名稱。
brokerList (必要)觸發程式所監視的 Kafka 訊息代理程式清單。 如需詳細資訊,請參閱 連線
topic (必要)觸發程式所監視的主題。
基數 (選擇性)表示觸發程式輸入的基數。 支援的值為 ONE (預設值) 和 MANYONE當輸入是單一訊息,當MANY輸入是訊息數位時使用。 當您使用 MANY時,也必須設定 dataType
dataType 定義 Functions 如何處理參數值。 根據預設,會以字串形式取得值,Functions 會嘗試將字串還原串行化為實際的純舊 Java 物件 (POJO)。 當 為 時 string,輸入會視為字串。 當 為 時 binary,訊息會以二進位數據的形式接收,而 Functions 會嘗試將它還原串行化為實際的參數類型 byte[]。
consumerGroup (選擇性)觸發程式所使用的 Kafka 取用者群組。
avroSchema (選擇性)使用 Avro 通訊協定時,一般記錄的架構。
authenticationMode (選擇性)使用簡單驗證和安全性層 (SASL) 驗證時的驗證模式。 支援的值為Gssapi、 (預設值)、、 ScramSha256ScramSha512Plain
username (選擇性)SASL 驗證的用戶名稱。 當 為GssapiAuthenticationMode不受支援。 如需詳細資訊,請參閱 連線
password (選擇性)SASL 驗證的密碼。 當 為GssapiAuthenticationMode不受支援。 如需詳細資訊,請參閱 連線
protocol (選擇性)與訊息代理程式通訊時所使用的安全性通訊協定。 支援的值為 plaintext (預設值)、、sslsasl_plaintextsasl_ssl
sslCaLocation (選擇性)用於驗證訊息代理程序憑證的 CA 憑證檔案路徑。
sslCertificateLocation (選擇性)用戶端憑證的路徑。
sslKeyLocation (選擇性)用於驗證之用戶端私鑰 (PEM) 的路徑。
sslKeyPassword (選擇性)用戶端憑證的密碼。

使用方式

Kafka 事件目前支援為 JSON 承載的字串和字串陣列。

Kafka 訊息會當做 JSON 承載的字串和字串數位傳遞至函式。

在進階方案中,您必須啟用 Kafka 輸出的運行時間調整監視,才能相應放大至多個實例。 若要深入瞭解,請參閱 啟用運行時間調整

您無法使用 Azure 入口網站中 [程式代碼 + 測試] 頁面的 [測試/執行] 功能來使用 Kafka 觸發程式。 您必須改為將測試事件直接傳送至觸發程式所監視的主題。

如需 Kafka 觸發程式支援的一組完整host.json設定,請參閱 host.json設定

連線

觸發程式和系結所需的所有連接資訊都應該在應用程式設定中維護,而不是在程式碼中的系結定義中維護。 這適用於不應該儲存在程式碼中的認證。

重要

認證設定必須參考 應用程式設定。 不要在您的程式代碼或組態檔中硬式編碼認證。 在本機執行時,請針對您的認證使用 local.settings.json 檔案 ,而且不會發佈local.settings.json檔案。

線上到 Azure 中 Confluent 提供的受控 Kafka 叢集時,請確定您的 Confluent Cloud 環境的下列驗證認證是在觸發程式或系結中設定:

設定 建議值 描述
BrokerList BootstrapServer 名為 BootstrapServer 的應用程式設定包含 Confluent Cloud 設定頁面中找到的啟動程式伺服器值。 值類似於 xyz-xyzxzy.westeurope.azure.confluent.cloud:9092
使用者名稱 ConfluentCloudUsername 名為 ConfluentCloudUsername 的應用程式設定包含來自 Confluent Cloud 網站的 API 存取金鑰。
密碼 ConfluentCloudPassword 名為 ConfluentCloudPassword 的應用程式設定包含從 Confluent Cloud 網站取得的 API 秘密。

您用於這些設定的字串值必須以應用程式設定的形式出現在 AzureValues本機開發期間local.settings.json檔案集合中。

您也應該在繫 Protocol結定義中設定、 AuthenticationModeSslCaLocation

下一步