ملاحظة
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
ينطبق على: NoSQL
يعد Kafka Connect لقاعدة بيانات Azure Cosmos DB موصلاً لقراءة البيانات من قاعدة Azure Cosmos DB وكتابتها فيها. يتيح لك موصل المتلقي Azure Cosmos DB تصدير البيانات من مواضيع Apache Kafka إلى قاعدة بيانات Azure Cosmos DB. يُجري الموصل استطلاع البيانات من Kafka للكتابة على حاويات في قاعدة البيانات بناءً على اشتراك المواضيع.
المتطلبات الأساسية
- ابدأ باستخدام إعداد Confluent platform حيث يوفر بيئة متكاملة للعمل معها. إذا كنت لا ترغب في استخدام Confluent Platform، فأنت بحاجة إلى تثبيت وتكوين Apache Kafka وKafka Connect بنفسك. ستحتاج أيضاً إلى تثبيت وتكوين موصلات Azure Cosmos DB يدويّاً.
- أنشئ حساب Azure Cosmos DB، حاوية دليل الإعداد
- جرى اختبار Bash shell على GitHub Codespaces، وMac، وUbuntu، وWindows باستخدام WSL2. لا يعمل shell في Cloud Shell أو WSL1.
- تنزيل Java 11+
- تنزيل Maven
تثبيت موصل المتلقي
إذا كنت تستخدم إعداد النظام الأساسي Confluent الموصى به، فسيتم تضمين موصل المتلقي Azure Cosmos DB في التثبيت، ويمكنك تخطي هذه الخطوة.
وإلا، يمكنك تنزيل ملف JAR من الإصدار الأخير أو حزمة هذا المستودع لإنشاء ملف JAR جديد. لتثبيت الموصل يدوياً باستخدام ملف JAR، راجع هذه الإرشادات. يمكنك أيضاً حزم ملف JAR جديد من التعليمات البرمجية المصدر.
# clone the kafka-connect-cosmosdb repo if you haven't done so already
git clone https://github.com/microsoft/kafka-connect-cosmosdb.git
cd kafka-connect-cosmosdb
# package the source code into a JAR file
mvn clean package
# include the following JAR file in Kafka Connect installation
ls target/*dependencies.jar
إنشاء موضوع Kafka وكتابة البيانات
إذا كنت تستخدم Confluent Platform، فإن أسهل طريقة لإنشاء موضوع Kafka باستخدام "تجربة مستخدم مركز التحكم" المتوفرة. وإلا، يمكنك إنشاء موضوع Kafka يدوياً باستخدام الصيغة التالية:
./kafka-topics.sh --create --boostrap-server <URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>
بالنسبة لهذا السيناريو، سننشئ موضوع Kafka باسم "فنادق" ونكتب بيانات JSON غير المضمنة في المخطط إلى الموضوع. لإنشاء موضوع داخل "مركز التحكم"، راجع دليل Confluent.
ثم، ابدأ منتج وحدة Kafka لكتابة بعض السجلات لموضوع "hotels".
# Option 1: If using Codespaces, use the built-in CLI utility
kafka-console-producer --broker-list localhost:9092 --topic hotels
# Option 2: Using this repo's Confluent Platform setup, first exec into the broker container
docker exec -it broker /bin/bash
kafka-console-producer --broker-list localhost:9092 --topic hotels
# Option 3: Using your Confluent Platform setup and CLI install
<path-to-confluent>/bin/kafka-console-producer --broker-list <kafka broker hostname> --topic hotels
أدخل في منتج وحدة التحكم ما يلي:
{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}
{"id": "h2", "HotelName": "HolidayInn", "Description": "HolidayInn description"}
{"id": "h3", "HotelName": "Motel8", "Description": "Motel8 description"}
يتم نشر السجلات الثلاثة التي تم إدخالها إلى موضوع كافكا "hotels" بتنسيق JSON.
إنشاء موصل المتلقي
أنشئ موصل المتلقي Azure Cosmos DB في Kafka Connect. يعرف نص JSON التالي التكوين لموصل المتلقي. تأكد من استبدال قيم الخصائص connect.cosmos.connection.endpoint
وconnect.cosmos.master.key
التي يجب أن تقوم بحفظها من دليل إعداد Azure Cosmos DB المتوفر في المتطلبات الأساسية.
لمزيد من المعلومات حول كل خاصية من خصائص التكوين، راجع قسم خصائص المتلقي.
{
"name": "cosmosdb-sink-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
"tasks.max": "1",
"topics": [
"hotels"
],
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
"connect.cosmos.master.key": "<cosmosdbprimarykey>",
"connect.cosmos.databasename": "kafkaconnect",
"connect.cosmos.containers.topicmap": "hotels#kafka"
}
}
بمجرد ملء جميع القيم، احفظ ملف JSON في مكان ما محلياً. يمكنك استخدام هذا الملف لإنشاء الموصل باستخدام REST API.
إنشاء موصل باستخدام "مركز التحكم"
يكمن الخيار السهل لإنشاء الموصل في الانتقال من خلال صفحة ويب "مركز التحكم". اتبع دليل التثبيت لإنشاء موصل من "مركز التحكم". واستخدم تجانب DatagenConnector
بدلاً من استخدام الخيار CosmosDBSinkConnector
. عند تكوين موصل المتلقي، قم بتعبئة القيم على النحو الذي قمت بتعبئتها في ملف JSON.
وبدلاً من ذلك، في صفحة الموصلات، يمكنك تحميل ملف JSON الذي تم إنشاؤه سابقاً باستخدام الخيار "تحميل ملف تكوين الموصل".
إنشاء موصل باستخدام REST API
إنشاء موصل المتلقي باستخدام Connect REST API:
# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors
تأكيد البيانات المكتوبة إلى Azure Cosmos DB
سجل الدخول إلى مدخل Microsoft Azure وانتقل إلى حساب Azure Cosmos DB الخاص بك. تأكد من إنشاء السجلات الثلاثة من موضوع "hotels" في حسابك.
التنظيف
لحذف الموصل من "مركز التحكم"، انتقل إلى موصل المتلقي الذي أنشأته ثم حدد فوق أيقونة حذف.
وبدلاً من ذلك، استخدم Connect REST API لحذف:
# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-sink-connector
لحذف خدمة Azure Cosmos DB التي تم إنشاؤها ومجموعة مواردها باستخدام Azure CLI، راجع هذه الخطوات.
خصائص تكوين المتلقي
يتم استخدام الإعدادات التالية لتكوين موصل متلقي Azure Cosmos DB Kafka. تحدد قيم التكوين هذه أي من بيانات مواضيع Kafka تم استهلاكها، وأي من بيانات حاويات Azure Cosmos DB ستتم كتابتها، وتنسيقات تسلسل البيانات. للحصول على نموذج ملف تكوين باستخدام القيم الافتراضية، راجع هذا التكوين.
اسم | كتابة | الوصف | مطلوب/اختياري |
---|---|---|---|
الموضوعات | قائمة | قائمة مواضيع Kafka للمشاهدة. | المطلوب |
connector.class | سلسلة | اسم فئة متلقي Azure Cosmos DB. يجب ضبطه على com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector . |
المطلوب |
connect.cosmos.connection.endpoint | URI | سلسلة URI لنقطة نهاية Azure Cosmos DB. | المطلوب |
connect.cosmos.master.key | سلسلة | المفتاح الأساسي لقاعدة بيانات Azure Cosmos DB الذي يتصل به المتلقي. | المطلوب |
connect.cosmos.databasename | سلسلة | اسم قاعدة بيانات Azure Cosmos DB التي يكتب المتلقي إليها. | المطلوب |
connect.cosmos.containers.topicmap | سلسلة | تعيين بين مواضيع Kafka وحاويات Azure Cosmos DB، منسقة باستخدام CSV كما هو موضح: topic#container,topic2#container2 . |
المطلوب |
connect.cosmos.connection.gateway.enabled | منطقيه | وضع علامة للإشارة إلى ما إذا كنت تريد استخدام وضع البوابة. بشكل افتراضي، يكون خطأ. | اختياري |
connect.cosmos.sink.bulk.enabled | منطقيه | وضع علامة للإشارة إلى ما إذا كان الوضع المجمع ممكنا أم لا. بشكل افتراضي، هذا صحيح. | اختياري |
connect.cosmos.sink.maxRetryCount | العدد الصحيح | الحد الأقصى لمحاولات إعادة المحاولة في حالات فشل الكتابة العابرة. بشكل افتراضي هو 10 مرات. | اختياري |
key.converter | سلسلة | تنسيق التسلسل للبيانات الرئيسية المكتوبة في موضوع Kafka. | المطلوب |
value.converter | سلسلة | تنسيق التسلسل لبيانات القيمة المكتوبة في موضوع Kafka. | المطلوب |
key.converter.schemas.enable | سلسلة | قم بالتعيين على "صواب" إذا كانت البيانات الرئيسية تحتوي على مخطط مضمن. | اختياري |
value.converter.schemas.enable | سلسلة | قم بالتعيين على "صواب" إذا كانت البيانات الرئيسية تحتوي على مخطط مضمن. | اختياري |
tasks.max | العدد الصحيح | الحد الأقصى لعدد مهام متلقي الموصل. القيمة الافتراضية هي 1 |
اختياري |
ستتم كتابة البيانات دائماً إلى Azure Cosmos DB بتنسيق JSON بدون أي مخطط.
أنواع البيانات المعتمدة
يحول موصل متلقي Cosmos DB Azureسجل المتلقي إلى مستند JSON الذي يدعم أنواع المخطط التالي:
نوع المخطط | نوع بيانات JSON |
---|---|
صفيف | صفيف |
قيمة منطقية | قيمة منطقية |
Float32 | الرقم |
Float64 | الرقم |
Int8 | الرقم |
Int16 | الرقم |
Int32 | الرقم |
int64 | الرقم |
الخريطة | عنصر (JSON) |
السلسلة | السلسلة قيمة فارغة |
البنية | عنصر (JSON) |
يدعم أيضاً موصل المتلقي أنواع AVRO المنطقية التالية:
نوع المخطط | نوع بيانات JSON |
---|---|
التاريخ | الرقم |
الوقت | الرقم |
طابع زمني | الرقم |
إشعار
إلغاء تسلسل البايت غير مدعوم حالياً من خلال موصل متلقي Azure Cosmos DB.
تحويلات رسالة واحدة (SMT)
وإلى جانب استخدام إعدادات موصل المتلقي، يمكنك تحديد استخدام "تحويلات رسالة واحدة" (SMT) لتعديل الرسائل المتدفقة من خلال النظام الأساسي Kafka Connect. لمزيد من المعلومات، راجع وثائق Confluent SMT.
استخدام InsertUUID SMT
يمكنك استخدام InsertUUID SMT لإضافة معرفات العناصر تلقائياً. باستخدام SMT InsertUUID
المخصص، يمكنك إدراج حقل id
بقيمة UUID عشوائية لكل رسالة، قبل كتابته إلى Azure Cosmos DB.
تحذير
لا تستخدم SMT إلا إذا كانت الرسائل لا تحتوي على حقل id
. وإلا، سيتم استبدال القيم id
سيتم الكتابة فوق القيم وقد تجد في النهاية العناصر مكررة في قاعدة بياناتك. قد يكون استخدام معرّفات UUID كمعرّف الرسالة سريعاً وسهلاً إلا إنها ليست مفتاح قسم مثالياً لاستخدامه في Azure Cosmos DB.
تثبيت SMT
قبل أن تتمكن من استخدام InsertUUID
SMT، ستحتاج إلى تثبيت هذا التحويل في إعداد Confluent Platform. إذا كنت تستخدم إعداد Confluent Platform من هذا المستودع، يتم تضمين التحويل بالفعل في التثبيت، ويمكنك تخطي هذه الخطوة.
وبدلاً من ذلك، يمكنك حزم مصدر InsertUUID لإنشاء ملف JAR جديد. لتثبيت الموصل يدوياً باستخدام ملف JAR، راجع هذه الإرشادات.
# clone the kafka-connect-insert-uuid repo
https://github.com/confluentinc/kafka-connect-insert-uuid.git
cd kafka-connect-insert-uuid
# package the source code into a JAR file
mvn clean package
# include the following JAR file in Confluent Platform installation
ls target/*.jar
تكوين SMT
داخل التكوين موصل المتلقي، أضف الخصائص التالية لتعيين id
.
"transforms": "insertID",
"transforms.insertID.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value",
"transforms.insertID.uuid.field.name": "id"
لمزيد من المعلومات حول استخدام SMT، راجع مستودع InsertUUID.
استخدام تحويلات SMT لتكوين "مدة البقاء" (TTL)
باستخدام كل من InsertField
وتحويلات Cast
SMT، يمكنك تكوين TTL على كل عنصر تم إنشاؤه في Azure Cosmos DB. تمكين TTL على الحاوية قبل تمكين TTL على مستوى العنصر. لمزيد من المعلومات، راجع وثيقة مدة البقاء.
داخل تكوين موصل المتلقي، أضف الخصائص التالية لتعيين TTL خلال ثوانٍ. في هذا المثال التالي، يتم تعيين TTL على 100 ثانية. إذا كانت الرسالة تحتوي بالفعل على الحقل TTL
، فسيتم استبدال القيمة TTL
من خلال تحويلات SMT هذه.
"transforms": "insertTTL,castTTLInt",
"transforms.insertTTL.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTTL.static.field": "ttl",
"transforms.insertTTL.static.value": "100",
"transforms.castTTLInt.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTTLInt.spec": "ttl:int32"
لمزيد من المعلومات حول استخدام تحويلات SMT هذه، راجع وثائق InsertField وCast.
استكشاف المشكلات الشائعة وإصلاحها
فيما يلي حلول لبعض المشاكل الشائعة التي قد تواجهها عند العمل باستخدام موصل المتلقي Kafka.
قراءة البيانات بتنسيق غير JSON باستخدام JsonConverter
إذا كان لديك بيانات بتنسيق غير JSON حول موضوع مصدرك في Kafka وحاولت قراءتها باستخدام JsonConverter
، سيظهر لك الاستثناء التالي:
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
...
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7
من المحتمل أن يكون سبب هذا الخطأ تسلسل البيانات في موضوع المصدر سواء بتنسيق Avro أو تنسيق آخر مثل سلسلة CSV.
الحل: إذا كانت بيانات الموضوع بتنسيق AVRO، فعليك تغيير موصل متلقي Kafka Connect لاستخدام AvroConverter
كما هو موضح أدناه.
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
دعم وضع البوابة
connect.cosmos.connection.gateway.enabled
هو خيار تكوين ل Cosmos DB Kafka Sink Connector الذي يعزز استيعاب البيانات باستخدام خدمة بوابة Cosmos DB. تعمل هذه الخدمة كواجهة أمامية ل Cosmos DB، وتقدم فوائد مثل موازنة التحميل وتوجيه الطلب وترجمة البروتوكول. من خلال الاستفادة من خدمة البوابة، يحقق الموصل معدل نقل وقابلية توسع محسنين عند كتابة البيانات إلى Cosmos DB. لمزيد من المعلومات، انظر أوضاع الاتصال.
"connect.cosmos.connection.gateway.enabled": true
دعم الوضع المجمع
connect.cosmos.sink.bulk.enabled
تحدد الخاصية ما إذا كانت ميزة الكتابة المجمعة ممكنة لكتابة البيانات من مواضيع Kafka إلى Azure Cosmos DB.
عند تعيين هذه الخاصية إلى true
(افتراضيا)، فإنها تمكن وضع الكتابة المجمعة، مما يسمح ل Kafka Connect باستخدام واجهة برمجة تطبيقات الاستيراد المجمع ل Azure Cosmos DB لتنفيذ عمليات الكتابة الدفعية الفعالة باستخدام CosmosContainer.executeBulkOperations()
أسلوب . يحسن وضع الكتابة المجمعة أداء الكتابة بشكل كبير ويقلل من زمن الانتقال الإجمالي عند استيعاب البيانات في Cosmos DB بالمقارنة مع الوضع غير المجمع عند CosmosContainer.upsertItem()
استخدام الأسلوب.
يتم تمكين الوضع المجمع بشكل افتراضي. لتعطيل الخاصية connect.cosmos.sink.bulk.enabled
، تحتاج إلى تعيينها إلى false
في تكوين موصل مصدر Cosmos DB. فيما يلي مثال لملف خاصية التكوين:
"name": "my-cosmosdb-connector",
"connector.class": "io.confluent.connect.azure.cosmosdb.CosmosDBSinkConnector",
"tasks.max": 1,
"topics": "my-topic"
"connect.cosmos.endpoint": "https://<cosmosdb-account>.documents.azure.com:443/"
"connect.cosmos.master.key": "<cosmosdb-master-key>"
"connect.cosmos.database": "my-database"
"connect.cosmos.collection": "my-collection"
"connect.cosmos.sink.bulk.enabled": false
من خلال تمكين الخاصية connect.cosmos.sink.bulk.enabled
، يمكنك الاستفادة من وظيفة الكتابة المجمعة في Kafka Connect ل Azure Cosmos DB لتحقيق أداء كتابة محسن عند نسخ البيانات من مواضيع Kafka إلى Azure Cosmos DB.
"connect.cosmos.sink.bulk.enabled": true
قراءة بيانات بتنسيق غير Avro باستخدام AvroConverter
ينطبق هذا السيناريو عند محاولة استخدام محول Avro لقراءة البيانات من موضوع ليس بتنسيق Avro. والتي تتضمن بيانات مكتوبة بواسطة المحول التسلسلي Avro بخلاف المحول التسلسلي Avro الخاص بسجل Confluent Schema Registry، والذي يحتوي على تنسيق تحويل خاص به.
org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
...
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
الحل: تحقق من تنسيق تسلسل موضوع المصدر. ثم قم بتبديل موصل المتلقي Kafka Connect لاستخدام المحول الصحيح أو قم بتبديل تنسيق المصدر إلى Avro.
قراءة رسالة JSON بدون بنية المخطط/ البيانات الأساسية المتوقعة
يدعم Kafka Connect بنية خاصة لرسائل JSON تحتوي على كل من البيانات الأساسية والمخطط على النحو التالي.
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "userid"
},
{
"type": "string",
"optional": false,
"field": "name"
}
]
},
"payload": {
"userid": 123,
"name": "Sam"
}
}
إذا حاولت قراءة بيانات JSON التي لا تحتوي على البيانات في هذه البنية، ستحصل على الخطأ التالي:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
للتوضيح، لا تحتوي إلا بنية JSON الصالحة لـ schemas.enable=true
على حقول المخطط والبيانات الأساسية كعناصر من أعلى مستوى كما هو موضح أعلاه. كما تنص رسالة الخطأ، إذا كانت لديك فقط بيانات JSON عادية، فيجب عليك تغيير تكوين الموصل إلى:
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
القيود
- الإنشاء التلقائي لقواعد البيانات والحاويات في Azure Cosmos DB غير مدعوم. يجب أن تكون قاعدة البيانات والحاويات موجودة بالفعل، ويجب تكوينها بشكل صحيح.
الخطوات التالية
يمكنك معرفة المزيد حول موجز التغيير في Azure Cosmo DB باستخدام المستندات التالية:
يمكنك معرفة المزيد حول العمليات المجمعة في V4 Java SDK باستخدام المستندات التالية: