إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
يتطلب استعلام Structured Streaming ذي الحالة تحديثات تزايدية لمعلومات الحالة المتوسطة، بينما يتعقب استعلام الدفق المنظم عديم الحالة المعلومات حول الصفوف التي تمت معالجتها من المصدر إلى المتلقي فقط.
تتضمن العمليات ذات الحالة تجميع الدفق والدفق dropDuplicatesوربط دفق الدفق و mapGroupsWithStateو.flatMapGroupsWithState
يمكن أن تؤدي معلومات الحالة الوسيطة المطلوبة لاستعلامات الدفق المنظم ذات الحالة إلى حدوث مشكلات غير متوقعة في زمن الانتقال والإنتاج إذا لم يتم تكوينها بشكل صحيح.
في Databricks Runtime 13.3 LTS وما فوق، يمكنك تمكين نقاط التحقق changelog باستخدام RocksDB لتقليل مدة نقطة التحقق وزمن الانتقال من طرف إلى طرف لأحمال عمل Structured Streaming. توصي Databricks بتمكين نقاط التحقق changelog لجميع استعلامات Structured Streaming ذات الحالة. راجع تمكين التحقق من سجل التغيير.
تحسين استعلامات الدفق المنظم ذات الحالة
يمكن أن تساعد إدارة معلومات الحالة المتوسطة من استعلامات الدفق المنظم ذات الحالة في منع حدوث مشكلات غير متوقعة في زمن الانتقال والإنتاج.
توصي Databricks بما يلي:
- استخدم المثيلات المحسنة للحساب كعمال.
- تعيين عدد الأقسام العشوائية إلى عدد مرات 1-2 من الذاكرات الأساسية في نظام المجموعة.
spark.sql.streaming.noDataMicroBatches.enabledتعيين التكوين إلىfalseفي SparkSession. وهذا يمنع محرك الدفعات الصغيرة المتدفقة من معالجة الدفعات الصغيرة التي لا تحتوي على بيانات. لاحظ أيضا أن تعيين هذا التكوين إلى قد يؤدي إلىfalseعمليات ذات حالة تستفيد من العلامات المائية أو معالجة المهلات لعدم الحصول على إخراج البيانات حتى تصل بيانات جديدة بدلا من فورا.
توصي Databricks باستخدام RocksDB مع نقاط التحقق changelog لإدارة حالة التدفقات ذات الحالة. راجع تكوين مخزن حالة RocksDB على Azure Databricks.
إشعار
لا يمكن تغيير نظام إدارة الحالة بين عمليات إعادة تشغيل الاستعلام. أي، إذا تم بدء تشغيل استعلام مع الإدارة الافتراضية، فلا يمكن تغييره دون بدء الاستعلام من الصفر مع موقع نقطة تحقق جديد.
العمل مع عوامل تشغيل متعددة ذات حالة في Structured Streaming
في Databricks Runtime 13.3 LTS وما فوق، يقدم Azure Databricks دعما متقدما للمشغلين المناسبين في أحمال عمل Structured Streaming. يمكنك الآن ربط عوامل تشغيل متعددة ذات حالة معا، ما يعني أنه يمكنك تغذية إخراج عملية مثل التجميع في نافذة إلى عملية أخرى ذات حالة مثل الصلة.
توضح الأمثلة التالية عدة أنماط يمكنك استخدامها.
هام
توجد القيود التالية عند العمل مع عوامل تشغيل متعددة ذات حالة:
FlatMapGroupWithStateغير مدعم.- يتم دعم وضع إخراج الإلحاق فقط.
تجميع النافذة الزمنية المتسلسل
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
تجميع النافذة الزمنية في دفقين مختلفين متبوعا بربط نافذة دفق الدفق
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
ربط الفاصل الزمني لدفق الدفق متبوعا بتجميع النافذة الزمنية
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
إعادة التوازن للحالة للبث المنظم
يتم تمكين إعادة التوازن للحالة بشكل افتراضي لجميع أحمال العمل المتدفقة في Delta Live Tables. في Databricks Runtime 11.3 LTS وما فوق، يمكنك تعيين خيار التكوين التالي في تكوين نظام مجموعة Spark لتمكين إعادة التوازن للحالة:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
تفيد إعادة التوازن للحالة البنية الأساسية لبرنامج ربط العمليات التجارية المهيكلة ذات الحالة التي تخضع لأحداث تغيير حجم نظام المجموعة. لا تستفيد عمليات الدفق عديمة الحالة، بغض النظر عن تغيير أحجام نظام المجموعة.
إشعار
يحتوي التحجيم التلقائي للحساب على قيود على تقليص حجم نظام المجموعة لأحمال عمل Structured Streaming. توصي Databricks باستخدام Delta Live Tables مع تغيير الحجم التلقائي المحسن لأحمال العمل المتدفقة. راجع تحسين استخدام نظام المجموعة لخطوط أنابيب Delta Live Tables مع التحجيم التلقائي المحسن.
تتسبب أحداث تغيير حجم نظام المجموعة في تشغيل إعادة التوازن للحالة. أثناء أحداث إعادة التوازن، قد يكون للدفعات الصغيرة زمن انتقال أعلى حيث يتم تحميل الحالة من التخزين السحابي إلى المنفذين الجدد.
تحديد الحالة الأولية ل mapGroupsWithState
يمكنك تحديد حالة أولية محددة من قبل المستخدم للمعالجة ذات الحالة المتدفقة المنظمة باستخدام flatMapGroupsWithStateأو mapGroupsWithState. يسمح لك هذا بتجنب إعادة معالجة البيانات عند بدء دفق ذي حالة دون نقطة تحقق صالحة.
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
مثال حالة الاستخدام التي تحدد حالة أولية flatMapGroupsWithState لعامل التشغيل:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
مثال حالة الاستخدام التي تحدد حالة أولية mapGroupsWithState لعامل التشغيل:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
اختبار دالة mapGroupsWithState التحديث
TestGroupState تمكنك واجهة برمجة التطبيقات من اختبار دالة تحديث الحالة المستخدمة ل Dataset.groupByKey(...).mapGroupsWithState(...) وDataset.groupByKey(...).flatMapGroupsWithState(...).
تأخذ دالة تحديث الحالة الحالة السابقة كإدخل باستخدام كائن من النوع GroupState. راجع الوثائق المرجعية Apache Spark GroupState. على سبيل المثال:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}