ربط إخراج Apache Kafka لـ Azure Functions

ربط الإخراج يسمح لتطبيق Azure Functions بكتابة رسائل إلى موضوع Kafka.

هام

روابط Kafka متاحة فقط لـ Functions على خطة Elastic Premium وخطة Dedicated (App Service). يتم دعمها فقط في الإصدار 3.x والإصدار الأحدث من وقت تشغيل الوظائف.

مثال

يعتمد استخدام الربط على أسلوب C# المستخدم في تطبيق الوظائف، والذي يمكن أن يكون واحدًا مما يلي:

تعمل مكتبة فئة معالجة عامل معزولة تعمل دالة C# المحولة برمجيا في عملية معزولة عن وقت التشغيل.

تعتمد السمات التي تستخدمها على موفر الأحداث المحددة.

المثال التالي يحتوي على نوع إرجاع مخصص هو MultipleOutputType، والذي يتكون من استجابة HTTP وإخراج Kafka.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

في الفئة MultipleOutputType، Kevent هو متغير ربط الإخراج لربط Kafka.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

لإرسال دُفعة من الأحداث، مرر صفيف سلسلة إلى نوع الإخراج، كما هو موضح في المثال التالي:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

يتم تعريف صفيف السلسلة كخاصية Kevents في الفئة التي يتم تعريف ربط الإخراج عليها:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

الدالة التالية تضيف رؤوسًا إلى بيانات إخراج Kafka:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

للحصول على مجموعة كاملة من أمثلة .NET العاملة، راجع مستودع ملحق Kafka.

إشعار

للحصول على مجموعة مكافئة من أمثلة TypeScript، راجع مستودع ملحق Kafka

الخصائص المحددة لملف function.json تعتمد على موفر الحدث الخاص بك، والذي يكون في هذه الأمثلة إما Confluent أو Azure Event Hubs. الأمثلة التالية تظهر ربط إخراج Kafka لدالة يتم تشغيلها بواسطة طلب HTTP وترسل البيانات من الطلب إلى موضوع Kafka.

يحدد function.json التالي المشغل للموفر المحدد في هذه الأمثلة:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

التعليمات البرمجية التالية ترسل رسالة إلى الموضوع:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

التعليمات البرمجية التالية ترسل رسائل متعددة كصفيف إلى نفس الموضوع:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

المثال التالي يوضح كيفية إرسال رسالة حدث مع رؤوس إلى نفس موضوع Kafka:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

للحصول على مجموعة كاملة من أمثلة JavaScript العاملة، راجع مستودع ملحق Kafka.

الخصائص المحددة لملف function.json تعتمد على موفر الحدث الخاص بك، والذي يكون في هذه الأمثلة إما Confluent أو Azure Event Hubs. الأمثلة التالية تظهر ربط إخراج Kafka لدالة يتم تشغيلها بواسطة طلب HTTP وترسل البيانات من الطلب إلى موضوع Kafka.

يحدد function.json التالي المشغل للموفر المحدد في هذه الأمثلة:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

التعليمات البرمجية التالية ترسل رسالة إلى الموضوع:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

التعليمات البرمجية التالية ترسل رسائل متعددة كصفيف إلى نفس الموضوع:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

المثال التالي يوضح كيفية إرسال رسالة حدث مع رؤوس إلى نفس موضوع Kafka:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

للحصول على مجموعة كاملة من أمثلة PowerShell العاملة، راجع مستودع ملحق Kafka.

الخصائص المحددة لملف function.json تعتمد على موفر الحدث الخاص بك، والذي يكون في هذه الأمثلة إما Confluent أو Azure Event Hubs. الأمثلة التالية تظهر ربط إخراج Kafka لدالة يتم تشغيلها بواسطة طلب HTTP وترسل البيانات من الطلب إلى موضوع Kafka.

يحدد function.json التالي المشغل للموفر المحدد في هذه الأمثلة:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

التعليمات البرمجية التالية ترسل رسالة إلى الموضوع:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

التعليمات البرمجية التالية ترسل رسائل متعددة كصفيف إلى نفس الموضوع:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

المثال التالي يوضح كيفية إرسال رسالة حدث مع رؤوس إلى نفس موضوع Kafka:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

للحصول على مجموعة كاملة من أمثلة Python العاملة، راجع مستودع ملحق Kafka.

تعتمد التعليقات التوضيحية التي تستخدمها لتكوين ربط الإخراج على موفر الحدث المحدد.

الدالة التالية ترسل رسالة إلى موضوع Kafka.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

المثال التالي يوضح كيفية إرسال رسائل متعددة إلى موضوع Kafka.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

في هذا المثال، معلمة ربط الإخراج يتم تغييرها إلى صفيف سلسلة.

يستخدم المثال الأخير لهذه الفئات KafkaEntity وKafkaHeader:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

دالة المثال التالي ترسل رسالة مع رؤوس إلى موضوع Kafka.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

للحصول على مجموعة كاملة من أمثلة Java العاملة لـ Confluent، راجع مستودع ملحق Kafka.

السمات

تستخدم مكتبات C# العملية والعامل المعزولة السمة Kafka لتعريف مشغل الدالة.

الجدول التالي يشرح الخصائص التي يمكنك تعيينها باستخدام هذه السمة:

المعلمة ‏‏الوصف‬
BrokerList (مطلوب) قائمة وسطاء Kafka التي يتم إرسال الإخراج إليها. لمزيد من المعلومات، راجع الاتصالات.
الموضوع (مطلوب) الموضوع الذي يتم إرسال الإخراج إليه.
AvroSchema (اختياري) مخطط سجل عام عند استخدام بروتوكول Avro.
MaxMessageBytes (اختياري) الحد الأقصى لحجم رسالة الإخراج التي يتم إرسالها (بالميغابايت)، بقيمة افتراضية قدرها 1.
حجم الدفعات (اختياري) الحد الأقصى لعدد الرسائل المجمعة في مجموعة رسائل واحدة، بقيمة افتراضية قدرها 10000.
EnableIdempotence (اختياري) عند التعيين إلى true، تضمن أن الرسائل يتم إنتاجها بنجاح مرة واحدة بالضبط، وبترتيب الإنتاج الأصلي، بقيمة افتراضية قدرها false
MessageTimeoutMs (اختياري) مهلة الرسالة المحلية، بالمللي ثانية. هذه القيمة يتم فرضها محليًا فقط، وتحد من الوقت الذي تنتظر فيه الرسالة المنتجة التسليم الناجح، بقيمة افتراضية قدرها 300000. وقت 0 لا نهائي. هذه القيمة هي الحد الأقصى للوقت المستخدم لتسليم رسالة (بما في ذلك إعادة المحاولة). خطأ التسليم يحدث عند تجاوز عدد مرات إعادة المحاولة أو مهلة الرسالة.
RequestTimeoutMs (اختياري) مهلة الإقرار لطلب الإخراج، بالمللي ثانية، بقيمة افتراضية قدرها 5000.
MaxRetries (اختياري) عدد مرات إعادة محاولة إرسال رسالة فاشلة، بقيمة افتراضية قدرها 2. إعادة المحاولة قد تتسبب في إعادة الترتيب، ما لم يتم تعيين EnableIdempotence إلى true.
AuthenticationMode (اختياري) وضع المصادقة عند استخدام المصادقة البسيطة ومصادقة طبقة الأمان (SASL). القيم المدعومة هي Gssapi، وPlain (افتراضية)، وScramSha256، وScramSha512.
اسم المستخدم (اختياري) اسم المستخدم لمصادقة SASL. غير معتمد عندما يكون AuthenticationMode عبارة عن Gssapi. لمزيد من المعلومات، راجع الاتصالات.
كلمة المرور (اختياري) كلمة المرور لمصادقة SASL. غير معتمد عندما يكون AuthenticationMode عبارة عن Gssapi. لمزيد من المعلومات، راجع الاتصالات.
البروتوكول (اختياري) بروتوكول الأمان المستخدم عند الاتصال بالوسطاء. القيم المدعومة هي plaintext (افتراضية)، وssl، وsasl_plaintext، وsasl_ssl.
SslCaLocation (اختياري) المسار إلى ملف شهادة المرجع المصدق للتحقق من شهادة الوسيط.
SslCertificateLocation (اختياري) المسار إلى شهادة العميل.
SslKeyLocation (اختياري) المسار إلى المفتاح الخاص للعميل (PEM) المستخدم للمصادقة.
SslKeyPassword (اختياري) كلمة المرور لشهادة العميل.

تعليقات توضيحية

يسمح لك التعليق التوضيحي KafkaOutput بإنشاء دالة تكتب إلى موضوع معين. الخيارات المدعومة تتضمن العناصر التالية:

العنصر ‏‏الوصف‬
الاسم اسم المتغير الذي يمثل البيانات الوسيطة في التعليمة البرمجية للدالة.
brokerList (مطلوب) قائمة وسطاء Kafka التي يتم إرسال الإخراج إليها. لمزيد من المعلومات، راجع الاتصالات.
موضوع (مطلوب) الموضوع الذي يتم إرسال الإخراج إليه.
نوع البيانات تحدد كيفية معالجة Functions لقيمة المعلمة. بشكل افتراضي، يتم الحصول على القيمة كسلسلة وتحاول الدالات إلغاء تسلسل السلسلة إلى عنصر Java الفعلي القديم العادي (POJO). عند string، يتم التعامل مع الإدخال كسلسلة فقط. عند binary، يتم تلقي الرسالة كبيانات ثنائية، وتحاول Functions إلغاء تسلسلها إلى نوع معلمة فعلي بوحدة بايت[].
avroSchema (اختياري) مخطط سجل عام عند استخدام بروتوكول Avro. (غير مدعوم حاليا ل Java.)
maxMessageBytes (اختياري) الحد الأقصى لحجم رسالة الإخراج التي يتم إرسالها (بالميغابايت)، بقيمة افتراضية قدرها 1.
batchSize (اختياري) الحد الأقصى لعدد الرسائل المجمعة في مجموعة رسائل واحدة، بقيمة افتراضية قدرها 10000.
enableIdempotence (اختياري) عند التعيين إلى true، تضمن أن الرسائل يتم إنتاجها بنجاح مرة واحدة بالضبط، وبترتيب الإنتاج الأصلي، بقيمة افتراضية قدرها false
messageTimeoutMs (اختياري) مهلة الرسالة المحلية، بالمللي ثانية. هذه القيمة يتم فرضها محليًا فقط، وتحد من الوقت الذي تنتظر فيه الرسالة المنتجة التسليم الناجح، بقيمة افتراضية قدرها 300000. وقت 0 لا نهائي. هذا هو الحد الأقصى للوقت المستخدم لتسليم رسالة (بما في ذلك إعادة المحاولة). خطأ التسليم يحدث عند تجاوز عدد مرات إعادة المحاولة أو مهلة الرسالة.
requestTimeoutMs (اختياري) مهلة الإقرار لطلب الإخراج، بالمللي ثانية، بقيمة افتراضية قدرها 5000.
maxRetries (اختياري) عدد مرات إعادة محاولة إرسال رسالة فاشلة، بقيمة افتراضية قدرها 2. إعادة المحاولة قد تتسبب في إعادة الترتيب، ما لم يتم تعيين EnableIdempotence إلى true.
وضع المصادقة (اختياري) وضع المصادقة عند استخدام المصادقة البسيطة ومصادقة طبقة الأمان (SASL). القيم المدعومة هي Gssapi، وPlain (افتراضية)، وScramSha256، وScramSha512.
اسم المستخدم (اختياري) اسم المستخدم لمصادقة SASL. غير معتمد عندما يكون AuthenticationMode عبارة عن Gssapi. لمزيد من المعلومات، راجع الاتصالات.
كلمة المرور (اختياري) كلمة المرور لمصادقة SASL. غير معتمد عندما يكون AuthenticationMode عبارة عن Gssapi. لمزيد من المعلومات، راجع الاتصالات.
بروتوكول (اختياري) بروتوكول الأمان المستخدم عند الاتصال بالوسطاء. القيم المدعومة هي plaintext (افتراضية)، وssl، وsasl_plaintext، وsasl_ssl.
sslCaLocation (اختياري) المسار إلى ملف شهادة المرجع المصدق للتحقق من شهادة الوسيط.
sslCertificateLocation (اختياري) المسار إلى شهادة العميل.
sslKeyLocation (اختياري) المسار إلى المفتاح الخاص للعميل (PEM) المستخدم للمصادقة.
sslKeyPassword (اختياري) كلمة المرور لشهادة العميل.

التكوين

يشرح الجدول الآتي خصائص تكوين ربط البيانات التي عليك تعيينها في ملف function.json.

خاصية function.json ‏‏الوصف
النوع يجب تعيينه إلى kafka.
الاتجاه يجب تعيينه إلى out.
الاسم اسم المتغير الذي يمثل البيانات الوسيطة في التعليمة البرمجية للدالة.
brokerList (مطلوب) قائمة وسطاء Kafka التي يتم إرسال الإخراج إليها. لمزيد من المعلومات، راجع الاتصالات.
موضوع (مطلوب) الموضوع الذي يتم إرسال الإخراج إليه.
avroSchema (اختياري) مخطط سجل عام عند استخدام بروتوكول Avro.
maxMessageBytes (اختياري) الحد الأقصى لحجم رسالة الإخراج التي يتم إرسالها (بالميغابايت)، بقيمة افتراضية قدرها 1.
batchSize (اختياري) الحد الأقصى لعدد الرسائل المجمعة في مجموعة رسائل واحدة، بقيمة افتراضية قدرها 10000.
enableIdempotence (اختياري) عند التعيين إلى true، تضمن أن الرسائل يتم إنتاجها بنجاح مرة واحدة بالضبط، وبترتيب الإنتاج الأصلي، بقيمة افتراضية قدرها false
messageTimeoutMs (اختياري) مهلة الرسالة المحلية، بالمللي ثانية. هذه القيمة يتم فرضها محليًا فقط، وتحد من الوقت الذي تنتظر فيه الرسالة المنتجة التسليم الناجح، بقيمة افتراضية قدرها 300000. وقت 0 لا نهائي. هذا هو الحد الأقصى للوقت المستخدم لتسليم رسالة (بما في ذلك إعادة المحاولة). خطأ التسليم يحدث عند تجاوز عدد مرات إعادة المحاولة أو مهلة الرسالة.
requestTimeoutMs (اختياري) مهلة الإقرار لطلب الإخراج، بالمللي ثانية، بقيمة افتراضية قدرها 5000.
maxRetries (اختياري) عدد مرات إعادة محاولة إرسال رسالة فاشلة، بقيمة افتراضية قدرها 2. إعادة المحاولة قد تتسبب في إعادة الترتيب، ما لم يتم تعيين EnableIdempotence إلى true.
وضع المصادقة (اختياري) وضع المصادقة عند استخدام المصادقة البسيطة ومصادقة طبقة الأمان (SASL). القيم المدعومة هي Gssapi، وPlain (افتراضية)، وScramSha256، وScramSha512.
اسم المستخدم (اختياري) اسم المستخدم لمصادقة SASL. غير معتمد عندما يكون AuthenticationMode عبارة عن Gssapi. لمزيد من المعلومات، راجع الاتصالات.
كلمة المرور (اختياري) كلمة المرور لمصادقة SASL. غير معتمد عندما يكون AuthenticationMode عبارة عن Gssapi. لمزيد من المعلومات، راجع الاتصالات.
بروتوكول (اختياري) بروتوكول الأمان المستخدم عند الاتصال بالوسطاء. القيم المدعومة هي plaintext (افتراضية)، وssl، وsasl_plaintext، وsasl_ssl.
sslCaLocation (اختياري) المسار إلى ملف شهادة المرجع المصدق للتحقق من شهادة الوسيط.
sslCertificateLocation (اختياري) المسار إلى شهادة العميل.
sslKeyLocation (اختياري) المسار إلى المفتاح الخاص للعميل (PEM) المستخدم للمصادقة.
sslKeyPassword (اختياري) كلمة المرور لشهادة العميل.

الاستخدام

كل من أنواع المفاتيح والقيم يتم دعمها مع إنشاء تسلسل Avro وProtobuf المضمن.

يتم إنشاء الإزاحة والقسم والطوابع الزمنية للحدث في وقت التشغيل. القيمة والرؤوس يمكن تعيينهم فقط داخل الدالة. الموضوع يتم تعيينه في function.json.

يُرجى التأكد من الوصول إلى موضوع Kafka الذي تحاول الكتابة إليه. يمكنك تكوين الربط باستخدام بيانات اعتماد الوصول والاتصال إلى موضوع Kafka.

في خطة Premium، يجب عليك تمكين مراقبة مقياس وقت التشغيل لإخراج Kafka لتتمكن من التوسع إلى مثيلات متعددة. لمعرفة المزيد، راجع تمكين تحجيم وقت التشغيل.

للحصول على مجموعة كاملة من إعدادات host.json المدعومة لمشغل Kafka، راجع إعدادات host.json.

الاتصالات

يجب الاحتفاظ بجميع معلومات الاتصال المطلوبة من قبل المشغلات والروابط في إعدادات التطبيق وليس في تعريفات الربط في التعليمات البرمجية الخاصة بك. ينطبق هذا على بيانات الاعتماد، والتي يجب ألا يتم تخزينها أبداً في تعليماتك البرمجية.

هام

يجب أن تشير إعدادات بيانات الاعتماد إلى إعداد تطبيق. لا تقم باستخدام بيانات اعتماد التعليمات البرمجية المضمنة في ملفات التعليمات البرمجية أو التكوين. عند التشغيل محلياً، استخدم ملف local.settings.json لبيانات اعتمادك، ولا تنشر ملف local.settings.json.

عند الاتصال بمجموعة Kafka المدارة التي يوفرها Confluent في Azure، تأكد من تعيين بيانات اعتماد المصادقة التالية لبيئة Confluent Cloud في المشغل أو الربط:

الإعدادات القيمة الموصى بها ‏‏الوصف
BrokerList BootstrapServer يحتوي إعداد التطبيق المسمى BootstrapServer على قيمة خادم bootstrap الموجود في صفحة إعدادات Confluent Cloud. تشبه القيمة xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
اسم المستخدم ConfluentCloudUsername يحتوي إعداد التطبيق المسمى ConfluentCloudUsername على مفتاح وصول واجهة برمجة التطبيقات من موقع ويب Confluent Cloud.
كلمة المرور ConfluentCloudPassword يحتوي إعداد التطبيق المسمى ConfluentCloudPassword على البيانات السرية لواجهة برمجة التطبيقات التي تم الحصول عليها من موقع ويب Confluent Cloud.

يجب أن تكون قيم السلسلة التي تستخدمها لهذه الإعدادات موجودة كإعدادات تطبيق في Azure أو في مجموعة Values في ملف local.settings.json أثناء التطوير المحلي.

يجب عليك أيضاً إعداد Protocol وAuthenticationMode وSslCaLocation في تعريفات الربط الخاصة بك.

الخطوات التالية