العمل مع البيانات في إطار بيانات Spark

مكتمل

يستخدم Spark في الأصل بنية بيانات تسمى مجموعة بيانات موزعة مرنة (RDD)؛ ولكن بينما يمكنك كتابة التعليمات البرمجية التي تعمل مباشرة مع مجموعات البيانات الموزعة المرنة، فإن بنية البيانات الأكثر استخداماً للعمل مع البيانات المنظمة في Spark هي إطار البيانات، الذي يتم توفيره كجزء من مكتبة Spark SQL. تتشابه إطارات البيانات في Spark مع تلك الموجودة في مكتبة Pandas Python في كل مكان، ولكنها محسنة للعمل في بيئة المعالجة الموزعة في Spark.

إشعار

بالإضافة إلى واجهة برمجة تطبيقات Dataframe، يوفر Spark SQL واجهة برمجة تطبيقات مجموعة بيانات مكتوبة بقوة ومدعمة في Java وSca. سنركز على واجهة برمجة تطبيقات إطار البيانات في هذه الوحدة.

تحميل البيانات في إطار البيانات

دعونا نستكشف مثالاً افتراضياً لمعرفة كيف يمكنك استخدام إطار بيانات للعمل مع البيانات. لنفترض أن لديك البيانات التالية في ملف نصي محدد بفاصلة يسمى products.csv في مجلد الملفات/البيانات في مستودعك:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

الاستدلال على مخطط

في دفتر ملاحظات Spark، يمكنك استخدام التعليمات البرمجية PySpark التالية لتحميل بيانات الملف في إطار بيانات وعرض أول 10 صفوف:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

%%pysparkيسمى السطر في البداية سحراً، ويخبر Spark أن اللغة المستخدمة في هذه الخلية هي PySpark. يمكنك تحديد اللغة التي تريد استخدامها كلغة افتراضية في شريط أدوات واجهة دفتر الملاحظات، ثم استخدام سحر لتجاوز هذا الاختيار لخلية معينة. على سبيل المثال، إليك التعليمة البرمجية Scala المكافئة لمثال بيانات المنتجات:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

يتم استخدام السحر %%spark لتحديد Scala.

كل من نماذج التعليمات البرمجية هذه تنتج إخراجاً مثل هذا:

معرّف المنتج ProductName الفئة قائمة الأسعار
771 ماونتن 100 فضي، 38 دراجات جبلية 3399.9900
772 ماونتن 100 فضي، 42 دراجات جبلية 3399.9900
773 ماونتن 100 فضي، 44 دراجات جبلية 3399.9900
... ... ... ...

تحديد مخطط صريح

في المثال السابق، احتوى الصف الأول من ملف CSV على أسماء الأعمدة، وتمكن Spark من استنتاج نوع بيانات كل عمود من البيانات التي يحتوي عليها. يمكنك أيضاً تحديد مخطط صريح للبيانات، وهو أمر مفيد عندما لا يتم تضمين أسماء الأعمدة في ملف البيانات، مثل مثال CSV هذا:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

يوضح مثال PySpark التالي كيفية تحديد مخطط لإطار البيانات المراد تحميله من ملف يسمى product-data.csv بهذا التنسيق:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

وستكون النتائج مرة أخرى مشابهة لما يلي:

معرّف المنتج ProductName الفئة قائمة الأسعار
771 ماونتن 100 فضي، 38 دراجات جبلية 3399.9900
772 ماونتن 100 فضي، 42 دراجات جبلية 3399.9900
773 ماونتن 100 فضي، 44 دراجات جبلية 3399.9900
... ... ... ...

تلميح

يؤدي تحديد مخطط صريح أيضا إلى تحسين الأداء!

تصفية إطارات البيانات وتجميعها

يمكنك استخدام أساليب فئة Dataframe لتصفية البيانات التي تحتوي عليها وفرزها وتجميعها ومعالجتها بطريقة أخرى. على سبيل المثال، يستخدم مثال التعليمات البرمجية التالي أسلوب التحديد لاسترداد عمودي ProductID و ListPrice من إطار بيانات df الذي يحتوي على بيانات المنتج في المثال السابق:

pricelist_df = df.select("ProductID", "ListPrice")

ستبدو النتائج من مثال التعليمات البرمجية هذا كما يلي:

معرّف المنتج قائمة الأسعار
771 3399.9900
772 3399.9900
773 3399.9900
... ...

بشكل مشترك مع معظم أساليب معالجة البيانات، حدد إرجاع كائن إطار بيانات جديد.

تلميح

يعد تحديد مجموعة فرعية من الأعمدة من إطار البيانات عملية شائعة، والتي يمكن تحقيقها أيضا باستخدام بناء الجملة الأقصر التالي:

pricelist_df = df["ProductID", "ListPrice"]

يمكنك "ربط" الأساليب معا لتنفيذ سلسلة من المعالجات التي تؤدي إلى تحويل إطار البيانات. على سبيل المثال، يقوم مثال التعليمات البرمجية هذا بسلاسل التحديد و مكان إنشاء إطار بيانات جديد يحتوي على عمودي ProductName و ListPrice للمنتجات ذات فئة Mountain Bikes أو Road Bikes:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

ستبدو النتائج من مثال التعليمات البرمجية هذا كما يلي:

ProductName الفئة قائمة الأسعار
ماونتن 100 فضي، 38 دراجات جبلية 3399.9900
رود-750 أسود، 52 دراجات السباق 539.9900
... ... ...

لجمع البيانات وتجميعها، يمكنك استخدام أسلوب groupBy والدالات التجميعية. على سبيل المثال، تحسب التعليمات البرمجية PySpark التالية عدد المنتجات لكل فئة:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

ستبدو النتائج من مثال التعليمات البرمجية هذا كما يلي:

الفئة عدد
سماعة الرأس 3
العجلات 14
دراجات جبلية 32
... ...

حفظ إطار بيانات

ستحتاج غالبا إلى استخدام Spark لتحويل البيانات الأولية وحفظ النتائج لمزيد من التحليل أو معالجة انتقال البيانات من الخادم. يحفظ مثال التعليمات البرمجية التالي dataFrame في ملف parquet في مستودع البيانات، مع استبدال أي ملف موجود بنفس الاسم.

bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

إشعار

يفضل تنسيق Parquet عادة لملفات البيانات التي ستستخدمها لمزيد من التحليل أو الاستيعاب في مخزن تحليلي. Parquet هو تنسيق فعال للغاية مدعوم من قبل معظم أنظمة تحليلات البيانات على نطاق واسع. في الواقع، في بعض الأحيان قد يكون متطلبات تحويل البيانات ببساطة هو تحويل البيانات من تنسيق آخر (مثل CSV) إلى Parquet!

تقسيم ملف الإخراج

التقسيم هو تقنية تحسين تمكن Spark من زيادة الأداء عبر العقد العاملة. يمكن تحقيق المزيد من مكاسب الأداء عند تصفية البيانات في الاستعلامات عن طريق إزالة الإدخال/الإخراج غير الضروري للقرص.

لحفظ إطار بيانات كملف مقسم، استخدم الأسلوب partitionBy عند كتابة البيانات. يحفظ المثال التالي إطار بيانات bikes_df (الذي يحتوي على بيانات المنتج لفئات الدراجات الجبلية ودراجات الطرق)، ويقسم البيانات حسب الفئة:

bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

تتضمن أسماء المجلدات التي تم إنشاؤها عند تقسيم إطار بيانات اسم عمود التقسيم والقيمة بتنسيق عمود =قيمة ، لذلك ينشئ مثال التعليمات البرمجية مجلدا يسمى bike_data يحتوي على المجلدات الفرعية التالية:

  • الفئة = الدراجات الجبلية
  • الفئة = دراجات الطرق

يحتوي كل مجلد فرعي على ملف باركيه واحد أو أكثر مع بيانات المنتج للفئة المناسبة.

إشعار

يمكنك تقسيم البيانات حسب أعمدة متعددة، ما يؤدي إلى تسلسل هرمي للمجلدات لكل مفتاح تقسيم. على سبيل المثال، يمكنك تقسيم بيانات أمر المبيعات حسب السنة والشهر، بحيث يتضمن التسلسل الهرمي للمجلد مجلدا لكل قيمة سنة، والذي بدوره يحتوي على مجلد فرعي لكل قيمة شهر.

تحميل البيانات المقسمة

عند قراءة البيانات المقسمة في إطار بيانات، يمكنك تحميل البيانات من أي مجلد داخل التسلسل الهرمي عن طريق تحديد قيم صريحة أو أحرف بدل للحاول المقسمة. يقوم المثال التالي بتحميل البيانات للمنتجات في فئة Road Bikes :

road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))

إشعار

يتم حذف أعمدة التقسيم المحددة في مسار الملف في إطار البيانات الناتج. لن تتضمن النتائج التي ينتجها استعلام المثال عمود الفئة - ستكون الفئة لكافة الصفوف دراجات الطريق.