مشغل Apache Kafka لدالات Azure

يمكنك استخدام مشغل Apache Kafka في دالات Azure لتشغيل التعليمات البرمجية للدالة استجابة للرسائل في مواضيع Kafka. يمكنك أيضاً استخدام ربط إخراج Kafka للكتابة من دالتك إلى موضوع. للحصول على معلومات حول تفاصيل الإعداد والتكوين، راجع نظرة عامة على روابط Apache Kafka لدالات Azure.

هام

روابط Kafka متاحة فقط لـ Functions على خطة Elastic Premium وخطة Dedicated (App Service). يتم دعمها فقط في الإصدار 3.x والإصدار الأحدث من وقت تشغيل الوظائف.

مثال

يعتمد استخدام المشغل على طريقة C# المستخدمة في تطبيق دالتك، والتي يمكن أن تكون أحد الأوضاع التالية:

تعمل مكتبة فئة معالجة عامل معزولة تعمل دالة C# المحولة برمجيا في عملية معزولة عن وقت التشغيل.

تعتمد السمات التي تستخدمها على موفر الأحداث المحددة.

يوضح المثال التالي دالة C# التي تقرأ وتسجل رسالة Kafka كحدث Kafka:

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

لتلقي الأحداث في دفعة، استخدم مصفوفة سلسلة كمدخلات، كما هو موضح في المثال التالي:

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

تسجل الدالة التالية الرسالة والعناوين لحدث Kafka:

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

للحصول على مجموعة كاملة من أمثلة .NET العاملة، راجع مستودع ملحق Kafka.

إشعار

للحصول على مجموعة مكافئة من أمثلة TypeScript، راجع مستودع ملحق Kafka

الخصائص المحددة لملف function.json تعتمد على موفر الحدث الخاص بك، والذي يكون في هذه الأمثلة إما Confluent أو Azure Event Hubs. تظهر الأمثلة التالية مشغل Kafka لدالة تقرأ رسالة Kafka وتسجلها.

يحدد function.json التالي المشغل للموفّر المحدد:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

ثم يتم تشغيل التعليمات البرمجية التالية عند تشغيل الدالة:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

لتلقي الأحداث في دفعة، قم بضبط القيمة cardinality إلى many في ملف function.json، كما هو موضح في الأمثلة التالية:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

ثم تقوم التعليمات البرمجية التالية بتحليل مصفوفة الأحداث وتسجيل بيانات الحدث:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

تسجل التعليمات البرمجية التالية أيضاً بيانات العنوان:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

يمكنك تعريف مخطط Avro عام للحدث الذي تم تمريره إلى المشغل. يحدد function.json التالي المشغل للموفّر المحدد مع مخطط Avro عام:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

ثم يتم تشغيل التعليمات البرمجية التالية عند تشغيل الدالة:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

للحصول على مجموعة كاملة من أمثلة JavaScript العاملة، راجع مستودع ملحق Kafka.

الخصائص المحددة لملف function.json تعتمد على موفر الحدث الخاص بك، والذي يكون في هذه الأمثلة إما Confluent أو Azure Event Hubs. تظهر الأمثلة التالية مشغل Kafka لدالة تقرأ رسالة Kafka وتسجلها.

يحدد function.json التالي المشغل للموفّر المحدد:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

ثم يتم تشغيل التعليمات البرمجية التالية عند تشغيل الدالة:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

لتلقي الأحداث في دفعة، قم بضبط القيمة cardinality إلى many في ملف function.json، كما هو موضح في الأمثلة التالية:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

ثم تقوم التعليمات البرمجية التالية بتحليل مصفوفة الأحداث وتسجيل بيانات الحدث:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

تسجل التعليمات البرمجية التالية أيضاً بيانات العنوان:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

يمكنك تعريف مخطط Avro عام للحدث الذي تم تمريره إلى المشغل. يحدد function.json التالي المشغل للموفّر المحدد مع مخطط Avro عام:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

ثم يتم تشغيل التعليمات البرمجية التالية عند تشغيل الدالة:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

للحصول على مجموعة كاملة من أمثلة PowerShell العاملة، راجع مستودع ملحق Kafka.

الخصائص المحددة لملف function.json تعتمد على موفر الحدث الخاص بك، والذي يكون في هذه الأمثلة إما Confluent أو Azure Event Hubs. تظهر الأمثلة التالية مشغل Kafka لدالة تقرأ رسالة Kafka وتسجلها.

يحدد function.json التالي المشغل للموفّر المحدد:

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

ثم يتم تشغيل التعليمات البرمجية التالية عند تشغيل الدالة:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

لتلقي الأحداث في دفعة، قم بضبط القيمة cardinality إلى many في ملف function.json، كما هو موضح في الأمثلة التالية:

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

ثم تقوم التعليمات البرمجية التالية بتحليل مصفوفة الأحداث وتسجيل بيانات الحدث:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

تسجل التعليمات البرمجية التالية أيضاً بيانات العنوان:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

يمكنك تعريف مخطط Avro عام للحدث الذي تم تمريره إلى المشغل. يحدد function.json التالي المشغل للموفّر المحدد مع مخطط Avro عام:

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

ثم يتم تشغيل التعليمات البرمجية التالية عند تشغيل الدالة:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

للحصول على مجموعة كاملة من أمثلة Python العاملة، راجع مستودع ملحق Kafka.

تعتمد التعليقات التوضيحية التي تستخدمها لتكوين المشغل على موفّر الحدث المحدد.

يوضح المثال التالي دالة Java التي تقرأ محتوى حدث Kafka وتسجله:

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

لتلقي الأحداث في دُفعة، استخدم سلسلة إدخال كمصفوفة، كما هو موضح في المثال التالي:

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

تسجل الدالة التالية الرسالة والعناوين لحدث Kafka:

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

يمكنك تعريف مخطط Avro عام للحدث الذي تم تمريره إلى المشغل. تحدد الدالة التالية مشغلاً لموفّر معين مع مخطط Avro عام:

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

للحصول على مجموعة كاملة من أمثلة Java العاملة لـ Confluent، راجع مستودع ملحق Kafka.

السمات

تستخدم KafkaTriggerAttribute كل من مكتبات العملية العملية وعملية العامل المعزولة C# لتعريف مشغل الدالة.

يوضح الجدول التالي الخصائص التي يمكنك تعيينها باستخدام سمة المشغل هذه:

المعلمة ‏‏الوصف‬
BrokerList (مطلوب) قائمة وسطاء Kafka التي يراقبها المشغل. لمزيد من المعلومات، راجع الاتصالات.
الموضوع (مطلوب) الموضوع الذي يراقبه المشغل.
مجموعة المستهلكين (اختياري) مجموعة مستهلكي Kafka المستخدمة من قبل المشغل.
AvroSchema (اختياري) مخطط سجل عام عند استخدام بروتوكول Avro.
AuthenticationMode (اختياري) وضع المصادقة عند استخدام المصادقة البسيطة ومصادقة طبقة الأمان (SASL). القيم المدعومة هي Gssapi، وPlain (افتراضية)، وScramSha256، وScramSha512.
اسم المستخدم (اختياري) اسم المستخدم لمصادقة SASL. غير معتمد عندما يكون AuthenticationMode عبارة عن Gssapi. لمزيد من المعلومات، راجع الاتصالات.
كلمة المرور (اختياري) كلمة المرور لمصادقة SASL. غير معتمد عندما يكون AuthenticationMode عبارة عن Gssapi. لمزيد من المعلومات، راجع الاتصالات.
البروتوكول (اختياري) بروتوكول الأمان المستخدم عند الاتصال بالوسطاء. القيم المدعومة هي plaintext (افتراضية)، وssl، وsasl_plaintext، وsasl_ssl.
SslCaLocation (اختياري) المسار إلى ملف شهادة المرجع المصدق للتحقق من شهادة الوسيط.
SslCertificateLocation (اختياري) المسار إلى شهادة العميل.
SslKeyLocation (اختياري) المسار إلى المفتاح الخاص للعميل (PEM) المستخدم للمصادقة.
SslKeyPassword (اختياري) كلمة المرور لشهادة العميل.

تعليقات توضيحية

يتيح لك التعليق التوضيحي KafkaTrigger إنشاء دالة يتم تشغيلها عند تلقي موضوع ما. الخيارات المدعومة تتضمن العناصر التالية:

العنصر ‏‏الوصف‬
الاسم (مطلوب) اسم المتغيّر الذي يمثل قائمة الانتظار أو رسالة الموضوع في التعليمة البرمجية للدالة.
brokerList (مطلوب) قائمة وسطاء Kafka التي يراقبها المشغل. لمزيد من المعلومات، راجع الاتصالات.
الموضوع (مطلوب) الموضوع الذي يراقبه المشغل.
cardinality (اختياري) يشير إلى العلاقة الأساسية لإدخال المشغل. القيم المدعومة هي ONE (افتراضية)، وMANY. استخدم ONE عندما يكون الإدخال رسالة واحدة وMANY عندما يكون الإدخال عبارة عن مصفوفة من الرسائل. عند استخدام MANY، يجب عليك أيضاً إعداد dataType.
Datatype تحدد كيفية معالجة Functions لقيمة المعلمة. بشكل افتراضي، يتم الحصول على القيمة كسلسلة وتحاول الدالات إلغاء تسلسل السلسلة إلى عنصر Java الفعلي القديم العادي (POJO). عند string، يتم التعامل مع الإدخال كسلسلة فقط. عند binary، يتم تلقي الرسالة كبيانات ثنائية، وتحاول Functions إلغاء تسلسلها إلى نوع معلمة فعلي بوحدة بايت[].
consumerGroup (اختياري) مجموعة مستهلكي Kafka المستخدمة من قبل المشغل.
avroSchema (اختياري) مخطط سجل عام عند استخدام بروتوكول Avro.
وضع المصادقة (اختياري) وضع المصادقة عند استخدام المصادقة البسيطة ومصادقة طبقة الأمان (SASL). القيم المدعومة هي Gssapi، وPlain (افتراضية)، وScramSha256، وScramSha512.
اسم المستخدم (اختياري) اسم المستخدم لمصادقة SASL. غير معتمد عندما يكون AuthenticationMode عبارة عن Gssapi. لمزيد من المعلومات، راجع الاتصالات.
كلمة المرور (اختياري) كلمة المرور لمصادقة SASL. غير معتمد عندما يكون AuthenticationMode عبارة عن Gssapi. لمزيد من المعلومات، راجع الاتصالات.
البروتوكول (اختياري) بروتوكول الأمان المستخدم عند الاتصال بالوسطاء. القيم المدعومة هي plaintext (افتراضية)، وssl، وsasl_plaintext، وsasl_ssl.
sslCaLocation (اختياري) المسار إلى ملف شهادة المرجع المصدق للتحقق من شهادة الوسيط.
sslCertificateLocation (اختياري) المسار إلى شهادة العميل.
sslKeyLocation (اختياري) المسار إلى المفتاح الخاص للعميل (PEM) المستخدم للمصادقة.
sslKeyPassword (اختياري) كلمة المرور لشهادة العميل.

التكوين

يشرح الجدول الآتي خصائص تكوين ربط البيانات التي عليك تعيينها في ملف function.json.

خاصية function.json ‏‏الوصف
النوع (مطلوب) يجب الإعداد على kafkaTrigger.
الاتجاه (مطلوب) يجب الإعداد على in.
الاسم (مطلوب) اسم المتغير الذي يمثل البيانات الوسيطة في التعليمة البرمجية للدالة.
brokerList (مطلوب) قائمة وسطاء Kafka التي يراقبها المشغل. لمزيد من المعلومات، راجع الاتصالات.
الموضوع (مطلوب) الموضوع الذي يراقبه المشغل.
cardinality (اختياري) يشير إلى العلاقة الأساسية لإدخال المشغل. القيم المدعومة هي ONE (افتراضية)، وMANY. استخدم ONE عندما يكون الإدخال رسالة واحدة وMANY عندما يكون الإدخال عبارة عن مصفوفة من الرسائل. عند استخدام MANY، يجب عليك أيضاً إعداد dataType.
Datatype تحدد كيفية معالجة Functions لقيمة المعلمة. بشكل افتراضي، يتم الحصول على القيمة كسلسلة وتحاول الدالات إلغاء تسلسل السلسلة إلى عنصر Java الفعلي القديم العادي (POJO). عند string، يتم التعامل مع الإدخال كسلسلة فقط. عند binary، يتم تلقي الرسالة كبيانات ثنائية، وتحاول Functions إلغاء تسلسلها إلى نوع معلمة فعلي بوحدة بايت[].
consumerGroup (اختياري) مجموعة مستهلكي Kafka المستخدمة من قبل المشغل.
avroSchema (اختياري) مخطط سجل عام عند استخدام بروتوكول Avro.
وضع المصادقة (اختياري) وضع المصادقة عند استخدام المصادقة البسيطة ومصادقة طبقة الأمان (SASL). القيم المدعومة هي Gssapi، وPlain (افتراضية)، وScramSha256، وScramSha512.
اسم المستخدم (اختياري) اسم المستخدم لمصادقة SASL. غير معتمد عندما يكون AuthenticationMode عبارة عن Gssapi. لمزيد من المعلومات، راجع الاتصالات.
كلمة المرور (اختياري) كلمة المرور لمصادقة SASL. غير معتمد عندما يكون AuthenticationMode عبارة عن Gssapi. لمزيد من المعلومات، راجع الاتصالات.
البروتوكول (اختياري) بروتوكول الأمان المستخدم عند الاتصال بالوسطاء. القيم المدعومة هي plaintext (افتراضية)، وssl، وsasl_plaintext، وsasl_ssl.
sslCaLocation (اختياري) المسار إلى ملف شهادة المرجع المصدق للتحقق من شهادة الوسيط.
sslCertificateLocation (اختياري) المسار إلى شهادة العميل.
sslKeyLocation (اختياري) المسار إلى المفتاح الخاص للعميل (PEM) المستخدم للمصادقة.
sslKeyPassword (اختياري) كلمة المرور لشهادة العميل.

الاستخدام

يتم دعم أحداث Kafka حالياً كسلاسل ومصفوفات سلاسل عبارة عن حمولات JSON.

رسائل Kafka التي يتم تمريرها إلى الدالة كسلاسل ومصفوفات سلاسل هي حمولات JSON.

في خطة Premium، يجب عليك تمكين مراقبة مقياس وقت التشغيل لإخراج Kafka لتتمكن من التوسع إلى مثيلات متعددة. لمعرفة المزيد، راجع تمكين تحجيم وقت التشغيل.

لا يمكنك استخدام ميزة الاختبار/التشغيل لصفحة Code + Test في مدخل Microsoft Azure للعمل مع مشغلات Kafka. يجب بدلا من ذلك إرسال أحداث الاختبار مباشرة إلى الموضوع الذي تتم مراقبته بواسطة المشغل.

للحصول على مجموعة كاملة من إعدادات host.json المدعومة لمشغل Kafka، راجع إعدادات host.json.

الاتصالات

يجب الاحتفاظ بجميع معلومات الاتصال المطلوبة من قبل المشغلات والروابط في إعدادات التطبيق وليس في تعريفات الربط في التعليمات البرمجية الخاصة بك. ينطبق هذا على بيانات الاعتماد، والتي يجب ألا يتم تخزينها أبداً في تعليماتك البرمجية.

هام

يجب أن تشير إعدادات بيانات الاعتماد إلى إعداد تطبيق. لا تقم باستخدام بيانات اعتماد التعليمات البرمجية المضمنة في ملفات التعليمات البرمجية أو التكوين. عند التشغيل محلياً، استخدم ملف local.settings.json لبيانات اعتمادك، ولا تنشر ملف local.settings.json.

عند الاتصال بمجموعة Kafka المدارة التي يوفرها Confluent في Azure، تأكد من تعيين بيانات اعتماد المصادقة التالية لبيئة Confluent Cloud في المشغل أو الربط:

الإعدادات القيمة الموصى بها ‏‏الوصف
BrokerList BootstrapServer يحتوي إعداد التطبيق المسمى BootstrapServer على قيمة خادم bootstrap الموجود في صفحة إعدادات Confluent Cloud. تشبه القيمة xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
اسم المستخدم ConfluentCloudUsername يحتوي إعداد التطبيق المسمى ConfluentCloudUsername على مفتاح وصول واجهة برمجة التطبيقات من موقع ويب Confluent Cloud.
كلمة المرور ConfluentCloudPassword يحتوي إعداد التطبيق المسمى ConfluentCloudPassword على البيانات السرية لواجهة برمجة التطبيقات التي تم الحصول عليها من موقع ويب Confluent Cloud.

يجب أن تكون قيم السلسلة التي تستخدمها لهذه الإعدادات موجودة كإعدادات تطبيق في Azure أو في مجموعة Values في ملف local.settings.json أثناء التطوير المحلي.

يجب عليك أيضاً إعداد Protocol وAuthenticationMode وSslCaLocation في تعريفات الربط الخاصة بك.

الخطوات التالية