إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
يمكنك تمكين إدارة الحالة المستندة إلى RocksDB عن طريق تعيين التكوين التالي في SparkSession قبل بدء تشغيل استعلام الدفق.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
يمكنك تمكين RocksDB على خطوط أنابيب Delta Live Tables. راجع تحسين تكوين البنية الأساسية لبرنامج ربط العمليات التجارية للمعالجة ذات الحالة.
تمكين نقاط التحقق من سجل التغيير
في Databricks Runtime 13.3 LTS وما فوق، يمكنك تمكين نقاط التحقق changelog لتقليل مدة نقطة التحقق وزمن الانتقال من طرف إلى طرف لأحمال عمل Structured Streaming. توصي Databricks بتمكين نقاط التحقق changelog لجميع استعلامات Structured Streaming ذات الحالة.
عادة ما تقوم RocksDB State Store بلقطات وتحميل ملفات البيانات أثناء نقاط التفتيش. لتجنب هذه التكلفة، تكتب نقاط التحقق changelog السجلات التي تم تغييرها منذ نقطة التحقق الأخيرة إلى التخزين الدائم فقط."
يتم تعطيل نقاط التحقق Changelog بشكل افتراضي. يمكنك تمكين نقاط التحقق changelog في مستوى SparkSession باستخدام بناء الجملة التالي:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
يمكنك تمكين نقاط التحقق changelog على دفق موجود والحفاظ على معلومات الحالة المخزنة في نقطة التحقق.
هام
يمكن تشغيل الاستعلامات التي مكنت نقاط التحقق changelog فقط على Databricks Runtime 13.3 LTS وما فوق. يمكنك تعطيل نقاط التحقق changelog للعودة إلى سلوك نقاط التحقق القديمة، ولكن يجب الاستمرار في تشغيل هذه الاستعلامات على Databricks Runtime 13.3 LTS أو أعلى. يجب إعادة تشغيل المهمة لإجراء هذه التغييرات.
مقاييس مخزن حالة RocksDB
يجمع كل عامل تشغيل حالة المقاييس المتعلقة بعمليات إدارة الحالة التي يتم إجراؤها على مثيل RocksDB الخاص به لمراقبة مخزن الحالة ومن المحتمل أن يساعد في تصحيح بطء المهمة. يتم تجميع هذه المقاييس (المجموع) لكل عامل تشغيل حالة في الوظيفة عبر جميع المهام حيث يتم تشغيل عامل تشغيل الحالة. هذه المقاييس هي جزء من customMetrics الخريطة داخل stateOperators الحقول في StreamingQueryProgress. فيما يلي مثال على StreamingQueryProgress في نموذج JSON (تم الحصول عليه باستخدام StreamingQueryProgress.json()).
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
الأوصاف التفصيلية للمقاييس هي كما يلي:
| اسم قياسي | الوصف |
|---|---|
| rocksdbCommitWriteBatchLatency | استغرق الوقت (بالمللي ثانية) لتطبيق عمليات الكتابة المرحلية في بنية الذاكرة (WriteBatch) على RocksDB الأصلي. |
| rocksdbCommitFlushLatency | استغرق الوقت (بالمللي ثانية) لمسح تغييرات RocksDB في الذاكرة على القرص المحلي. |
| rocksdbCommitCompactLatency | استغرق الوقت (بالمللي ثانية) للضغط (اختياري) أثناء تثبيت نقطة التحقق. |
| rocksdbCommitPauseLatency | استغرق الوقت (بالمللي ثانية) لإيقاف مؤشرات ترابط عامل الخلفية (للضغط وما إلى ذلك) كجزء من تثبيت نقطة التحقق. |
| rocksdbCommitCheckpointLatency | استغرق الوقت (بالمللي ثانية) لأخذ لقطة من RocksDB الأصلية وكتابتها إلى دليل محلي. |
| rocksdbCommitFileSyncLatencyMs | استغرق الوقت (بالمللي ثانية) لمزامنة الملفات الأصلية المتعلقة بلقطة RocksDB إلى تخزين خارجي (موقع نقطة التحقق). |
| rocksdbGetLatency | استغرق متوسط الوقت (بالنانوية) وفقا للمكالمة الأصلية RocksDB::Get الأساسية. |
| rocksdbPutCount | استغرق متوسط الوقت (بالنانوية) وفقا للمكالمة الأصلية RocksDB::Put الأساسية. |
| rocksdbGetCount | عدد الاستدعاءات الأصلية RocksDB::Get (لا تتضمن Gets من WriteBatch - في دفعة الذاكرة المستخدمة في عمليات الكتابة المرحلية). |
| rocksdbPutCount | عدد الاستدعاءات الأصلية RocksDB::Put (لا تتضمن Puts WriteBatch - في دفعة الذاكرة المستخدمة في عمليات الكتابة المرحلية). |
| rocksdbTotalBytesReadByGet | عدد وحدات البايت غير المضغوطة المقروءة من خلال المكالمات الأصلية RocksDB::Get . |
| rocksdbTotalBytesWrittenByPut | عدد وحدات البايت غير المضغوطة المكتوبة من خلال المكالمات الأصلية RocksDB::Put . |
| rocksdbReadBlockCacheHitCount | عدد المرات التي يتم فيها استخدام ذاكرة التخزين المؤقت لكتل RocksDB الأصلية لتجنب قراءة البيانات من القرص المحلي. |
| rocksdbReadBlockCacheMissCount | عدد المرات التي فاتت فيها ذاكرة التخزين المؤقت لكتل RocksDB الأصلية وتطلبت قراءة البيانات من القرص المحلي. |
| rocksdbTotalBytesReadByCompaction | عدد وحدات البايت المقروءة من القرص المحلي بواسطة عملية ضغط RocksDB الأصلية. |
| rocksdbTotalBytesWrittenByCompaction | عدد وحدات البايت المكتوبة على القرص المحلي بواسطة عملية ضغط RocksDB الأصلية. |
| rocksdbTotalCompactionLatencyMs | استغرق الوقت (بالمللي ثانية) لضغطات RocksDB (كل من الخلفية والضغط الاختياري الذي بدأ أثناء التثبيت). |
| rocksdbWriterStallLatencyMs | الوقت (بالمللي ثانية) توقف الكاتب بسبب ضغط الخلفية أو مسح memtables إلى القرص. |
| rocksdbTotalBytesReadThroughIterator | تتطلب بعض العمليات ذات الحالة (مثل معالجة المهلة في flatMapGroupsWithState أو وضع علامات مائية في التجميعات ذات النوافذ) قراءة بيانات كاملة في DB من خلال المكرر. الحجم الإجمالي للبيانات غير المضغوطة المقروءة باستخدام المكرر. |