مشاركة عبر


تكوين مخزن حالة RocksDB على Azure Databricks

يمكنك تمكين إدارة الحالة المستندة إلى RocksDB عن طريق تعيين التكوين التالي في SparkSession قبل بدء تشغيل استعلام الدفق.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

يمكنك تمكين RocksDB على خطوط أنابيب Delta Live Tables. راجع تحسين تكوين البنية الأساسية لبرنامج ربط العمليات التجارية للمعالجة ذات الحالة.

تمكين نقاط التحقق من سجل التغيير

في Databricks Runtime 13.3 LTS وما فوق، يمكنك تمكين نقاط التحقق changelog لتقليل مدة نقطة التحقق وزمن الانتقال من طرف إلى طرف لأحمال عمل Structured Streaming. توصي Databricks بتمكين نقاط التحقق changelog لجميع استعلامات Structured Streaming ذات الحالة.

عادة ما تقوم RocksDB State Store بلقطات وتحميل ملفات البيانات أثناء نقاط التفتيش. لتجنب هذه التكلفة، تكتب نقاط التحقق changelog السجلات التي تم تغييرها منذ نقطة التحقق الأخيرة إلى التخزين الدائم فقط."

يتم تعطيل نقاط التحقق Changelog بشكل افتراضي. يمكنك تمكين نقاط التحقق changelog في مستوى SparkSession باستخدام بناء الجملة التالي:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

يمكنك تمكين نقاط التحقق changelog على دفق موجود والحفاظ على معلومات الحالة المخزنة في نقطة التحقق.

هام

يمكن تشغيل الاستعلامات التي مكنت نقاط التحقق changelog فقط على Databricks Runtime 13.3 LTS وما فوق. يمكنك تعطيل نقاط التحقق changelog للعودة إلى سلوك نقاط التحقق القديمة، ولكن يجب الاستمرار في تشغيل هذه الاستعلامات على Databricks Runtime 13.3 LTS أو أعلى. يجب إعادة تشغيل المهمة لإجراء هذه التغييرات.

مقاييس مخزن حالة RocksDB

يجمع كل عامل تشغيل حالة المقاييس المتعلقة بعمليات إدارة الحالة التي يتم إجراؤها على مثيل RocksDB الخاص به لمراقبة مخزن الحالة ومن المحتمل أن يساعد في تصحيح بطء المهمة. يتم تجميع هذه المقاييس (المجموع) لكل عامل تشغيل حالة في الوظيفة عبر جميع المهام حيث يتم تشغيل عامل تشغيل الحالة. هذه المقاييس هي جزء من customMetrics الخريطة داخل stateOperators الحقول في StreamingQueryProgress. فيما يلي مثال على StreamingQueryProgress في نموذج JSON (تم الحصول عليه باستخدام StreamingQueryProgress.json()).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

الأوصاف التفصيلية للمقاييس هي كما يلي:

اسم قياسي ‏‏الوصف
rocksdbCommitWriteBatchLatency استغرق الوقت (بالمللي ثانية) لتطبيق عمليات الكتابة المرحلية في بنية الذاكرة (WriteBatch) على RocksDB الأصلي.
rocksdbCommitFlushLatency استغرق الوقت (بالمللي ثانية) لمسح تغييرات RocksDB في الذاكرة على القرص المحلي.
rocksdbCommitCompactLatency استغرق الوقت (بالمللي ثانية) للضغط (اختياري) أثناء تثبيت نقطة التحقق.
rocksdbCommitPauseLatency استغرق الوقت (بالمللي ثانية) لإيقاف مؤشرات ترابط عامل الخلفية (للضغط وما إلى ذلك) كجزء من تثبيت نقطة التحقق.
rocksdbCommitCheckpointLatency استغرق الوقت (بالمللي ثانية) لأخذ لقطة من RocksDB الأصلية وكتابتها إلى دليل محلي.
rocksdbCommitFileSyncLatencyMs استغرق الوقت (بالمللي ثانية) لمزامنة الملفات الأصلية المتعلقة بلقطة RocksDB إلى تخزين خارجي (موقع نقطة التحقق).
rocksdbGetLatency استغرق متوسط الوقت (بالنانوية) وفقا للمكالمة الأصلية RocksDB::Get الأساسية.
rocksdbPutCount استغرق متوسط الوقت (بالنانوية) وفقا للمكالمة الأصلية RocksDB::Put الأساسية.
rocksdbGetCount عدد الاستدعاءات الأصلية RocksDB::Get (لا تتضمن Gets من WriteBatch - في دفعة الذاكرة المستخدمة في عمليات الكتابة المرحلية).
rocksdbPutCount عدد الاستدعاءات الأصلية RocksDB::Put (لا تتضمن Puts WriteBatch - في دفعة الذاكرة المستخدمة في عمليات الكتابة المرحلية).
rocksdbTotalBytesReadByGet عدد وحدات البايت غير المضغوطة المقروءة من خلال المكالمات الأصلية RocksDB::Get .
rocksdbTotalBytesWrittenByPut عدد وحدات البايت غير المضغوطة المكتوبة من خلال المكالمات الأصلية RocksDB::Put .
rocksdbReadBlockCacheHitCount عدد المرات التي يتم فيها استخدام ذاكرة التخزين المؤقت لكتل RocksDB الأصلية لتجنب قراءة البيانات من القرص المحلي.
rocksdbReadBlockCacheMissCount عدد المرات التي فاتت فيها ذاكرة التخزين المؤقت لكتل RocksDB الأصلية وتطلبت قراءة البيانات من القرص المحلي.
rocksdbTotalBytesReadByCompaction عدد وحدات البايت المقروءة من القرص المحلي بواسطة عملية ضغط RocksDB الأصلية.
rocksdbTotalBytesWrittenByCompaction عدد وحدات البايت المكتوبة على القرص المحلي بواسطة عملية ضغط RocksDB الأصلية.
rocksdbTotalCompactionLatencyMs استغرق الوقت (بالمللي ثانية) لضغطات RocksDB (كل من الخلفية والضغط الاختياري الذي بدأ أثناء التثبيت).
rocksdbWriterStallLatencyMs الوقت (بالمللي ثانية) توقف الكاتب بسبب ضغط الخلفية أو مسح memtables إلى القرص.
rocksdbTotalBytesReadThroughIterator تتطلب بعض العمليات ذات الحالة (مثل معالجة المهلة في flatMapGroupsWithState أو وضع علامات مائية في التجميعات ذات النوافذ) قراءة بيانات كاملة في DB من خلال المكرر. الحجم الإجمالي للبيانات غير المضغوطة المقروءة باستخدام المكرر.