البرنامج التعليمي: تحميل البيانات وتحويلها باستخدام Apache Spark DataFrames
يوضح لك هذا البرنامج التعليمي كيفية تحميل البيانات وتحويلها باستخدام Apache Spark Python (PySpark) DataFrame API وApache Spark Scala DataFrame API وSparkR SparkDataFrame API في Azure Databricks.
بنهاية هذا البرنامج التعليمي، سوف تفهم ما هو DataFrame وتكون على دراية بالمهام التالية:
Python
- تعريف المتغيرات ونسخ البيانات العامة إلى وحدة تخزين كتالوج Unity
- إنشاء DataFrame باستخدام Python
- تحميل البيانات في DataFrame من ملف CSV
- عرض إطار بيانات والتفاعل معه
- حفظ DataFrame
- تشغيل استعلامات SQL في PySpark
راجع أيضا مرجع Apache Spark PySpark API.
Scala
- تعريف المتغيرات ونسخ البيانات العامة إلى وحدة تخزين كتالوج Unity
- إنشاء DataFrame باستخدام Scala
- تحميل البيانات في DataFrame من ملف CSV
- عرض DataFrame والتفاعل معه
- حفظ DataFrame
- تشغيل استعلامات SQL في Apache Spark
راجع أيضا مرجع Apache Spark Scala API.
R
- تعريف المتغيرات ونسخ البيانات العامة إلى وحدة تخزين كتالوج Unity
- إنشاء SparkR SparkDataFrames
- تحميل البيانات في DataFrame من ملف CSV
- عرض إطار بيانات والتفاعل معه
- حفظ DataFrame
- تشغيل استعلامات SQL في SparkR
راجع أيضا مرجع Apache SparkR API.
ما هو DataFrame؟
DataFrame هو بنية بيانات ثنائية الأبعاد تحمل أعمدة من أنواع مختلفة محتملة. يمكنك التفكير في DataFrame مثل جدول بيانات أو جدول SQL أو قاموس كائنات السلسلة. توفر Apache Spark DataFrames مجموعة غنية من الوظائف (تحديد الأعمدة، والتصفية، والانضمام، والتجاميع) التي تسمح لك بحل مشكلات تحليل البيانات الشائعة بكفاءة.
Apache Spark DataFrames هي تجريد مبني على مجموعات البيانات الموزعة المرنة (RDDs). يستخدم Spark DataFrames وSpark SQL محرك تخطيط وتحسين موحد، مما يسمح لك بالحصول على أداء متطابق تقريبا عبر جميع اللغات المدعومة على Azure Databricks (Python وSQL وSca وR).
المتطلبات
لإكمال البرنامج التعليمي التالي، يجب أن تفي بالمتطلبات التالية:
لاستخدام الأمثلة في هذا البرنامج التعليمي، يجب تمكين كتالوج Unity لمساحة العمل الخاصة بك.
تستخدم الأمثلة في هذا البرنامج التعليمي وحدة تخزين كتالوج Unity لتخزين بيانات العينة. لاستخدام هذه الأمثلة، قم بإنشاء وحدة تخزين واستخدم أسماء كتالوج وحدة التخزين والمخطط ووحدات التخزين لتعيين مسار وحدة التخزين المستخدمة من قبل الأمثلة.
يجب أن يكون لديك الأذونات التالية في كتالوج Unity:
READ VOLUME
وWRITE VOLUME
، أوALL PRIVILEGES
وحدة التخزين المستخدمة لهذا البرنامج التعليمي.USE SCHEMA
أوALL PRIVILEGES
للمخطط المستخدم لهذا البرنامج التعليمي.USE CATALOG
أوALL PRIVILEGES
للكتالوج المستخدم لهذا البرنامج التعليمي.
لتعيين هذه الأذونات، راجع امتيازات مسؤول Databricks أو كتالوج Unity والعناصر القابلة للتأمين.
تلميح
للحصول على دفتر ملاحظات مكتمل لهذه المقالة، راجع دفتر ملاحظات البرنامج التعليمي DataFrame.
الخطوة 1: تحديد المتغيرات وتحميل ملف CSV
تحدد هذه الخطوة المتغيرات للاستخدام في هذا البرنامج التعليمي ثم تقوم بتحميل ملف CSV يحتوي على بيانات اسم الطفل من health.data.ny.gov إلى وحدة تخزين كتالوج Unity.
افتح دفتر ملاحظات جديدا بالنقر فوق الأيقونة . لمعرفة كيفية التنقل في دفاتر ملاحظات Azure Databricks، راجع واجهة دفتر ملاحظات Databricks وعناصر التحكم.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. استبدل
<catalog-name>
و<schema-name>
و<volume-name>
بأسماء الكتالوج والمخطط ووحدات التخزين لوحدة تخزين كتالوج Unity. استبدل<table_name>
باسم جدول من اختيارك. ستقوم بتحميل بيانات اسم الطفل في هذا الجدول لاحقا في هذا البرنامج التعليمي.اضغط
Shift+Enter
لتشغيل الخلية وإنشاء خلية فارغة جديدة.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_tables = catalog + "." + schema print(path_tables) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val file_name = "rows.csv" val table_name = "<table_name>" val path_volume = s"/Volumes/$catalog/$schema/$volume" val path_tables = s"$catalog.$schema.$table_name" print(path_volume) // Show the complete path print(path_tables) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_tables <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_tables) # Show the complete path
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. تنسخ هذه التعليمة البرمجية
rows.csv
الملف من health.data.ny.gov إلى وحدة تخزين كتالوج Unity باستخدام الأمر Databricks dbutuils .اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
Scala
dbutils.fs.cp(download_url, s"$path_volume/$file_name")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
الخطوة 2: إنشاء DataFrame
تنشئ هذه الخطوة DataFrame باسم df1
مع بيانات الاختبار ثم تعرض محتوياته.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. تنشئ هذه التعليمة البرمجية Dataframe مع بيانات الاختبار، ثم تعرض محتويات ومخطط DataFrame.
اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = c(2021), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = c(42) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
الخطوة 3: تحميل البيانات في DataFrame من ملف CSV
تنشئ هذه الخطوة DataFrame باسم df_csv
من ملف CSV الذي قمت بتحميله مسبقا في وحدة تخزين كتالوج Unity. انظر spark.read.csv.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. تقوم هذه التعليمة البرمجية بتحميل بيانات اسم الطفل في DataFrame
df_csv
من ملف CSV ثم تعرض محتويات DataFrame.اضغط
Shift+Enter
لتشغيل الخلية ثم انتقل إلى الخلية التالية.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val df_csv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$path_volume/$file_name") display(df_csv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
يمكنك تحميل البيانات من العديد من تنسيقات الملفات المدعومة.
الخطوة 4: عرض إطار البيانات والتفاعل معه
اعرض أسماء طفلك وتفاعل معها باستخدام DataFrames باستخدام الطرق التالية.
طباعة مخطط DataFrame
تعرف على كيفية عرض مخطط Apache Spark DataFrame. يستخدم Apache Spark مخطط المصطلح للإشارة إلى أسماء وأنواع البيانات للأعمدة في DataFrame.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تعرض هذه التعليمة البرمجية مخطط DataFrames الخاص بك مع .printSchema()
أسلوب لعرض مخططات DataFrames - للتحضير لتوحيد إطاري البيانات.
Python
df_csv.printSchema()
df1.printSchema()
Scala
df_csv.printSchema()
df1.printSchema()
R
printSchema(df_csv)
printSchema(df1)
إشعار
يستخدم Azure Databricks أيضا مخطط المصطلح لوصف مجموعة من الجداول المسجلة في كتالوج.
إعادة تسمية العمود في DataFrame
تعرف على كيفية إعادة تسمية عمود في DataFrame.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تعيد هذه التعليمة البرمجية تسمية عمود في df1_csv
DataFrame لمطابقة العمود المعني في df1
DataFrame. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark withColumnRenamed()
.
Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema
Scala
val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)
دمج DataFrames
تعرف على كيفية إنشاء DataFrame جديد يضيف صفوف DataFrame إلى آخر.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark union()
لدمج محتويات DataFrame df
الأول مع DataFrame df_csv
الذي يحتوي على بيانات أسماء الأطفال المحملة من ملف CSV.
Python
df = df1.union(df_csv)
display(df)
Scala
val df = df1.union(df_csv_renamed)
display(df)
R
display(df <- union(df1, df_csv))
تصفية الصفوف في DataFrame
اكتشف أسماء الأطفال الأكثر شيوعا في مجموعة البيانات الخاصة بك عن طريق تصفية الصفوف، باستخدام Apache Spark .filter()
أو .where()
الأساليب. استخدم التصفية لتحديد مجموعة فرعية من الصفوف لإرجاعها أو تعديلها في DataFrame. لا يوجد فرق في الأداء أو بناء الجملة، كما هو ملاحظ في الأمثلة التالية.
استخدام أسلوب .filter()
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark .filter()
لعرض تلك الصفوف في DataFrame بعدد يزيد عن 50.
Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
استخدام أسلوب .where()
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark .where()
لعرض تلك الصفوف في DataFrame بعدد يزيد عن 50.
Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
تحديد أعمدة من DataFrame وترتيبها حسب التردد
تعرف على تكرار اسم الطفل باستخدام select()
الأسلوب لتحديد الأعمدة من DataFrame لإرجاعها. استخدم Apache Spark orderby
والوظائف desc
لترتيب النتائج.
توفر الوحدة النمطية pyspark.sql ل Apache Spark الدعم لوظائف SQL. من بين هذه الوظائف التي نستخدمها في هذا البرنامج التعليمي هي وظائف Apache Spark orderBy()
و desc()
و expr()
. يمكنك تمكين استخدام هذه الدالات عن طريق استيرادها إلى جلسة العمل الخاصة بك حسب الحاجة.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستورد هذه التعليمة البرمجية الدالة desc()
ثم تستخدم أسلوب Apache Spark select()
وApache Spark orderBy()
والوظائف desc()
لعرض الأسماء الأكثر شيوعا وعددها بترتيب تنازلي.
Python
from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
إنشاء مجموعة فرعية DataFrame
تعرف على كيفية إنشاء مجموعة فرعية DataFrame من DataFrame موجود.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark filter
لإنشاء DataFrame جديد يقيد البيانات حسب السنة والعدد والجنس. يستخدم أسلوب Apache Spark select()
للحد من الأعمدة. كما يستخدم Apache Spark orderBy()
والوظائف desc()
لفرز DataFrame الجديد حسب العدد.
Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)
الخطوة 5: حفظ DataFrame
تعرف على كيفية حفظ DataFrame. يمكنك إما حفظ DataFrame إلى جدول أو كتابة DataFrame إلى ملف أو ملفات متعددة.
حفظ DataFrame في جدول
يستخدم Azure Databricks تنسيق Delta Lake لكافة الجداول بشكل افتراضي. لحفظ DataFrame الخاص بك، يجب أن يكون لديك CREATE
امتيازات الجدول على الكتالوج والمخطط.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تحفظ هذه التعليمة البرمجية محتويات DataFrame في جدول باستخدام المتغير الذي حددته في بداية هذا البرنامج التعليمي.
Python
df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")
# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")
Scala
df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")
// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$tables" + "." + s"$table_name")
R
saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")
تعمل معظم تطبيقات Apache Spark على مجموعات بيانات كبيرة وبطريقة موزعة. يكتب Apache Spark دليلا من الملفات بدلا من ملف واحد. يقسم Delta Lake مجلدات وملفات Parquet. يمكن للعديد من أنظمة البيانات قراءة هذه الدلائل من الملفات. توصي Azure Databricks باستخدام الجداول عبر مسارات الملفات لمعظم التطبيقات.
حفظ DataFrame إلى ملفات JSON
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تحفظ هذه التعليمة البرمجية DataFrame إلى دليل ملفات JSON.
Python
df.write.format("json").save("/tmp/json_data")
# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").save("/tmp/json_data")
// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
قراءة DataFrame من ملف JSON
تعرف على كيفية استخدام أسلوب Apache Spark spark.read.format()
لقراءة بيانات JSON من دليل إلى DataFrame.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تعرض هذه التعليمة البرمجية ملفات JSON التي حفظتها في المثال السابق.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
مهام إضافية: تشغيل استعلامات SQL في PySpark وSc scala وR
توفر Apache Spark DataFrames الخيارات التالية لدمج SQL مع PySpark وSc scala وR. يمكنك تشغيل التعليمات البرمجية التالية في نفس دفتر الملاحظات الذي قمت بإنشائه لهذا البرنامج التعليمي.
تحديد عمود كتعلام SQL
تعرف على كيفية استخدام أسلوب Apache Spark selectExpr()
. هذا هو متغير من select()
الأسلوب الذي يقبل تعبيرات SQL وإرجاع DataFrame محدث. يسمح لك هذا الأسلوب باستخدام تعبير SQL، مثل upper
.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark selectExpr()
وتعبير SQL upper
لتحويل عمود سلسلة إلى أحرف كبيرة (وإعادة تسمية العمود).
Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
استخدم expr()
لاستخدام بناء جملة SQL لعمود
تعرف على كيفية استيراد واستخدام دالة Apache Spark expr()
لاستخدام بناء جملة SQL في أي مكان يتم فيه تحديد عمود.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستورد هذه التعليمة البرمجية الدالة expr()
ثم تستخدم دالة Apache Spark expr()
وتعبير SQL lower
لتحويل عمود سلسلة إلى حالة صغيرة (وإعادة تسمية العمود).
Python
from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function
display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality
تشغيل استعلام SQL عشوائي باستخدام الدالة spark.sql()
تعرف على كيفية استخدام وظيفة Apache Spark spark.sql()
لتشغيل استعلامات SQL العشوائية.
انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية دالة Apache Spark spark.sql()
للاستعلام عن جدول SQL باستخدام بناء جملة SQL.
Python
display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))
R
display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))
دفتر ملاحظات البرنامج التعليمي DataFrame
يتضمن دفتر الملاحظات التالي أمثلة الاستعلامات من هذا البرنامج التعليمي.
Python
البرنامج التعليمي DataFrames باستخدام دفتر ملاحظات Python
Scala
البرنامج التعليمي DataFrames باستخدام دفتر ملاحظات Scala
R
البرنامج التعليمي DataFrames باستخدام دفتر ملاحظات R
الموارد الإضافية
الملاحظات
https://aka.ms/ContentUserFeedback.
قريبًا: خلال عام 2024، سنتخلص تدريجيًا من GitHub Issues بوصفها آلية إرسال ملاحظات للمحتوى ونستبدلها بنظام ملاحظات جديد. لمزيد من المعلومات، راجعإرسال الملاحظات وعرضها المتعلقة بـ