שימוש ב- Spark לעבודה עם קבצי נתונים
אחד היתרונות של השימוש ב- Spark הוא שניתן לכתוב ולהפעיל קוד בשפות תיכנות שונות, כך שתוכל להשתמש במיומנויות התיכנות שכבר יש לך ולהשתמש בשפה המתאימה ביותר עבור משימה נתונה. שפת ברירת המחדל במחברת חדשה של Azure Databricks Spark היא PySpark - גירסה הממוטבת באמצעות Spark של Python, המשמשת בדרך כלל מדעני נתונים ואנליסטים בשל התמיכה החזקה שלה בתפעול נתונים ובפריטים חזותיים. בנוסף, באפשרותך להשתמש בשפות כגון Scala (שפה הנגזרת על-ידי Java שניתן להשתמש בה באופן אינטראקטיבי) וב- SQL (וריאציה של שפת SQL שנמצאת בשימוש נפוץ הכלול בספריית Spark SQL כדי לעבוד עם מבני נתונים יחסיים). מהנדסי תוכנה יכולים גם ליצור פתרונות הידור אשר פועלים ב- Spark באמצעות מסגרות כגון Java.
סקירת נתונים באמצעות מסגרות נתונים
באופן מקורי, Spark משתמש במבנה נתונים הנקרא ערכת נתונים המבווזרת גמישה (RDD); אך למרות שניתן לכתוב קוד שעובד ישירות עם רכיבי RDD, מבנה הנתונים הנפוץ ביותר לעבודה עם נתונים מובנים ב- Spark הוא מסגרת הנתונים, המסופקת כחלק מספריית Spark SQL. מסגרות נתונים ב- Spark דומות לאלה הקיימות בספריית Python של פנדה בכל מקום, אך ממוטבות לעבודה בסביבות העיבוד המבווזר של Spark.
הערה
בנוסף ל- API של Dataframe, Spark SQL מספק API מסוים של ערכת נתונים מסוים הנתמך ב- Java וב- Scala. אנו נתמקד ב- API של מסגרת הנתונים במודול זה.
טוען נתונים לתוך מסגרת נתונים
בוא נבחן דוגמה היפותטית כדי לראות כיצד ניתן להשתמש ב- dataframe כדי לעבוד עם נתונים. נניח שיש לך את הנתונים הבאים בקובץ טקסט מופרד באמצעות פסיקים בשםproducts.csv האחסון של מערכת הקבצים Databricks (DBFS):
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
במחברת Spark, באפשרותך להשתמש בקוד PySpark הבא כדי לטעון את הנתונים לתוך מסגרת נתונים ולהציג את 10 השורות הראשונות:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
השורה %pyspark
בהתחלה נקראת קסם, והיא מורה ל- Spark שהשפה שבה נעשה שימוש בתא זה היא PySpark. להלן קוד Scala המקביל עבור נתוני המוצרים לדוגמה:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
הקסם %spark
כדי לציין Scala.
עצה
באפשרותך גם לבחור את השפה שבה ברצונך להשתמש עבור כל תא בממשק המחברת.
שתי הדוגמאות שהוצגו קודם לכן ייצרו פלט כזה:
מזהה מוצר | שם מוצר | קטגוריה | מחיר רשימה |
---|---|---|---|
771 | הר-100 כסף, 38 | אופני הרים | 3399.9900 |
772 | הר-100 כסף, 42 | אופני הרים | 3399.9900 |
773 | הר-100 כסף, 44 | אופני הרים | 3399.9900 |
... | ... | ... | ... |
ציון סכימת מסגרת נתונים
בדוגמה הקודמת, השורה הראשונה של קובץ ה- CSV הכיל את שמות העמודות, ו- Spark הצליח להסיק את סוג הנתונים של כל עמודה מהנתונים שהיא מכילה. באפשרותך גם לציין סכימה מפורשת עבור הנתונים, אפשרות שימושית כאשר שמות העמודות אינם נכללים בקובץ הנתונים, כמו בדוגמה CSV זו:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
הדוגמה הבאה של PySpark מראה כיצד לציין סכימה עבור מסגרת הנתונים שיש לטען מקובץ בשםproduct-data.csv בתבנית זו:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
התוצאות יהיו שוב דומות ל:
מזהה מוצר | שם מוצר | קטגוריה | מחיר רשימה |
---|---|---|---|
771 | הר-100 כסף, 38 | אופני הרים | 3399.9900 |
772 | הר-100 כסף, 42 | אופני הרים | 3399.9900 |
773 | הר-100 כסף, 44 | אופני הרים | 3399.9900 |
... | ... | ... | ... |
סינון וקיבוץ של מסגרות נתונים
באפשרותך להשתמש בשיטות של מחלקת מסגרת הנתונים כדי לסנן, למיין, לקבץ ולטפל בנתונים שהיא מכילה באופן אחר. לדוגמה, דוגמת הקוד הבאה משתמשת בפעולת השירות Select כדי לאחזר את העמודות ProductName ו- ListPrice מתוך מסגרת הנתונים של df המכילה נתוני מוצר בדוגמה הקודמת:
pricelist_df = df.select("ProductID", "ListPrice")
התוצאות מדוגמה זו של קוד ייראו בערך כך:
מזהה מוצר | מחיר רשימה |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
במשותף לרוב שיטות טיפול בנתונים, בחר מחזיר אובייקט Dataframe חדש.
עצה
בחירת קבוצת משנה של עמודות מתוך מסגרת נתונים היא פעולה נפוצה, שניתן להשיג אותה גם באמצעות התחביר הקצר יותר הבא:
pricelist_df = df["ProductID", "ListPrice"]
ניתן ליצור שרשרת של שיטות יחד כדי לבצע סידרת מניפולציות שהתוצאה היא מסגרת נתונים שהומרה. לדוגמה, קוד לדוגמה זה יוצר שרשרת של אפשרויות בחירה והיכן שיטות ליצירת מסגרת נתונים חדשה המכילה את העמודות ProductName ו- ListPrice עבור מוצרים עם קטגוריה של אופני הרים או אופני כביש:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
התוצאות מדוגמה זו של קוד ייראו בערך כך:
שם מוצר | מחיר רשימה |
---|---|
הר-100 כסף, 38 | 3399.9900 |
כביש-750 שחור, 52 | 539.9900 |
... | ... |
כדי לקבץ ולצטבור נתונים, באפשרותך להשתמש בפעולת השירות של groupBy ובפונקציות הצבירה. לדוגמה, הקוד הבא של PySpark סופר את מספר המוצרים עבור כל קטגוריה:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
התוצאות מדוגמה זו של קוד ייראו בערך כך:
קטגוריה | מנה |
---|---|
אוזניות | 3 |
גלגלים | 14 |
אופני הרים | 32 |
... | ... |
שימוש בביטויים של SQL ב- Spark
ה- API של מסגרת הנתונים מהווה חלק מספריית Spark בשם Spark SQL, המאפשרת לאנליסטים של נתונים להשתמש בביטויים של SQL כדי לבצע שאילתות על נתונים ולטפל בהם.
יצירת אובייקטי מסד נתונים בקטלוג Spark
קטלוג Spark הוא Metastore עבור אובייקטי נתונים יחסיים כגון תצוגות וטבלאות. זמן הריצה של Spark יכול להשתמש בקטלוג כדי לשלב בצורה חלקה קוד הכתוב בכל שפה הנתמכת על-ידי Spark עם ביטויי SQL שעשויים להיות טבעיים יותר לאנליסטים או למפתחים מסוימים של נתונים.
אחת הדרכים הפשוטות ביותר להפיכת נתונים ב- Dataframe לזמינים לביצוע שאילתות בקטלוג Spark היא ליצור תצוגה זמנית, כפי שמוצג בדוגמה הבאה של הקוד:
df.createOrReplaceTempView("products")
תצוגה היא זמנית, כלומר היא נמחקת באופן אוטומטי בסוף ההפעלה הנוכחית. באפשרותך גם ליצור טבלאות עקביות בקטלוג כדי להגדיר מסד נתונים שניתן לבצע בו שאילתה באמצעות Spark SQL.
הערה
לא נחקור לעומק את טבלאות הקטלוג של Spark במודול זה, אך כדאי לסמן כמה נקודות מרכזיות:
- באפשרותך ליצור טבלה ריקה באמצעות
spark.catalog.createTable
זו. טבלאות הן מבני מטה-נתונים המאחסנים את הנתונים המשמשים כבסיס שלהם במיקום האחסון המשויך לקטלוג. מחיקת טבלה מוחקת גם את הנתונים המשמשים כנתונים המשמשים כנתונים המשמשים כנתונים. - באפשרותך לשמור מסגרת נתונים כטבלה באמצעות שיטת ההפצה
saveAsTable
הבאה. - באפשרותך ליצור טבלה חיצונית באמצעות פעולת
spark.catalog.createExternalTable
השירות. טבלאות חיצוניות מגדירות מטה-נתונים בקטלוג, אך מקבלות את הנתונים המשמשים כבסיס ממיקום אחסון חיצוני; בדרך כלל תיקיה באגם נתונים. מחיקת טבלה חיצונית אינה מוחקת את הנתונים המשמשים כנתונים.
שימוש ב- API של Spark SQL כדי לבצע שאילתה על נתונים
באפשרותך להשתמש ב- API של Spark SQL בקוד הכתוב בכל שפה כדי לבצע שאילתה על נתונים בקטלוג. לדוגמה, הקוד הבא של PySpark משתמש בשאילתת SQL כדי להחזיר נתונים בתצוגת המוצרים כמסגרת נתונים.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
התוצאות מהקוד לדוגמה ייראו דומות לטבלה הבאה:
שם מוצר | מחיר רשימה |
---|---|
הר-100 כסף, 38 | 3399.9900 |
כביש-750 שחור, 52 | 539.9900 |
... | ... |
שימוש בקוד SQL
הדוגמה הקודמת הדגימה כיצד להשתמש ב- API של Spark SQL כדי להטביע ביטויי SQL בקוד Spark. במחברת, באפשרותך גם להשתמש ב- %sql
הקסם כדי להפעיל קוד SQL שמציין שאילתות באובייקטים בקטלוג, באופן הבא:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
הדוגמה של קוד SQL מחזירה ערכת תוצאות המוצגת באופן אוטומטי במחברת כטבלה, כמו זו שלהלן:
קטגוריה | מספר מוצר |
---|---|
Bib-Shorts | 3 |
מדפים לאופניים | 1 |
מעמדי אופניים | 1 |
... | ... |