البرنامج التعليمي: تحميل البيانات وتحويلها باستخدام Apache Spark DataFrames

يوضح لك هذا البرنامج التعليمي كيفية تحميل البيانات وتحويلها باستخدام Apache Spark Python (PySpark) DataFrame API وApache Spark Scala DataFrame API وSparkR SparkDataFrame API في Azure Databricks.

بنهاية هذا البرنامج التعليمي، سوف تفهم ما هو DataFrame وتكون على دراية بالمهام التالية:

Python

راجع أيضا مرجع Apache Spark PySpark API.

Scala

راجع أيضا مرجع Apache Spark Scala API.

R

راجع أيضا مرجع Apache SparkR API.

ما هو DataFrame؟

DataFrame هو بنية بيانات ثنائية الأبعاد تحمل أعمدة من أنواع مختلفة محتملة. يمكنك التفكير في DataFrame مثل جدول بيانات أو جدول SQL أو قاموس كائنات السلسلة. توفر Apache Spark DataFrames مجموعة غنية من الوظائف (تحديد الأعمدة، والتصفية، والانضمام، والتجاميع) التي تسمح لك بحل مشكلات تحليل البيانات الشائعة بكفاءة.

Apache Spark DataFrames هي تجريد مبني على مجموعات البيانات الموزعة المرنة (RDDs). يستخدم Spark DataFrames وSpark SQL محرك تخطيط وتحسين موحد، مما يسمح لك بالحصول على أداء متطابق تقريبا عبر جميع اللغات المدعومة على Azure Databricks (Python وSQL وSca وR).

المتطلبات

لإكمال البرنامج التعليمي التالي، يجب أن تفي بالمتطلبات التالية:

  • لاستخدام الأمثلة في هذا البرنامج التعليمي، يجب تمكين كتالوج Unity لمساحة العمل الخاصة بك.

  • تستخدم الأمثلة في هذا البرنامج التعليمي وحدة تخزين كتالوج Unity لتخزين بيانات العينة. لاستخدام هذه الأمثلة، قم بإنشاء وحدة تخزين واستخدم أسماء كتالوج وحدة التخزين والمخطط ووحدات التخزين لتعيين مسار وحدة التخزين المستخدمة من قبل الأمثلة.

  • يجب أن يكون لديك الأذونات التالية في كتالوج Unity:

    • READ VOLUME و WRITE VOLUME، أو ALL PRIVILEGES وحدة التخزين المستخدمة لهذا البرنامج التعليمي.
    • USE SCHEMA أو ALL PRIVILEGES للمخطط المستخدم لهذا البرنامج التعليمي.
    • USE CATALOG أو ALL PRIVILEGES للكتالوج المستخدم لهذا البرنامج التعليمي.

    لتعيين هذه الأذونات، راجع امتيازات مسؤول Databricks أو كتالوج Unity والعناصر القابلة للتأمين.

تلميح

للحصول على دفتر ملاحظات مكتمل لهذه المقالة، راجع دفتر ملاحظات البرنامج التعليمي DataFrame.

الخطوة 1: تحديد المتغيرات وتحميل ملف CSV

تحدد هذه الخطوة المتغيرات للاستخدام في هذا البرنامج التعليمي ثم تقوم بتحميل ملف CSV يحتوي على بيانات اسم الطفل من health.data.ny.gov إلى وحدة تخزين كتالوج Unity.

  1. افتح دفتر ملاحظات جديدا بالنقر فوق الأيقونة أيقونة جديدة . لمعرفة كيفية التنقل في دفاتر ملاحظات Azure Databricks، راجع واجهة دفتر ملاحظات Databricks وعناصر التحكم.

  2. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. استبدل <catalog-name>و <schema-name>و <volume-name> بأسماء الكتالوج والمخطط ووحدات التخزين لوحدة تخزين كتالوج Unity. استبدل <table_name> باسم جدول من اختيارك. ستقوم بتحميل بيانات اسم الطفل في هذا الجدول لاحقا في هذا البرنامج التعليمي.

  3. اضغط Shift+Enter لتشغيل الخلية وإنشاء خلية فارغة جديدة.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. تنسخ هذه التعليمة البرمجية rows.csv الملف من health.data.ny.gov إلى وحدة تخزين كتالوج Unity باستخدام الأمر Databricks dbutuils .

  5. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

الخطوة 2: إنشاء DataFrame

تنشئ هذه الخطوة DataFrame باسم df1 مع بيانات الاختبار ثم تعرض محتوياته.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. تنشئ هذه التعليمة البرمجية Dataframe مع بيانات الاختبار، ثم تعرض محتويات ومخطط DataFrame.

  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

الخطوة 3: تحميل البيانات في DataFrame من ملف CSV

تنشئ هذه الخطوة DataFrame باسم df_csv من ملف CSV الذي قمت بتحميله مسبقا في وحدة تخزين كتالوج Unity. انظر spark.read.csv.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. تقوم هذه التعليمة البرمجية بتحميل بيانات اسم الطفل في DataFrame df_csv من ملف CSV ثم تعرض محتويات DataFrame.

  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

يمكنك تحميل البيانات من العديد من تنسيقات الملفات المدعومة.

الخطوة 4: عرض إطار البيانات والتفاعل معه

اعرض أسماء طفلك وتفاعل معها باستخدام DataFrames باستخدام الطرق التالية.

تعرف على كيفية عرض مخطط Apache Spark DataFrame. يستخدم Apache Spark مخطط المصطلح للإشارة إلى أسماء وأنواع البيانات للأعمدة في DataFrame.

انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تعرض هذه التعليمة البرمجية مخطط DataFrames الخاص بك مع .printSchema() أسلوب لعرض مخططات DataFrames - للتحضير لتوحيد إطاري البيانات.

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

إشعار

يستخدم Azure Databricks أيضا مخطط المصطلح لوصف مجموعة من الجداول المسجلة في كتالوج.

إعادة تسمية العمود في DataFrame

تعرف على كيفية إعادة تسمية عمود في DataFrame.

انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تعيد هذه التعليمة البرمجية تسمية عمود في df1_csv DataFrame لمطابقة العمود المعني في df1 DataFrame. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark withColumnRenamed() .

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

دمج DataFrames

تعرف على كيفية إنشاء DataFrame جديد يضيف صفوف DataFrame إلى آخر.

انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark union() لدمج محتويات DataFrame df الأول مع DataFrame df_csv الذي يحتوي على بيانات أسماء الأطفال المحملة من ملف CSV.

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

تصفية الصفوف في DataFrame

اكتشف أسماء الأطفال الأكثر شيوعا في مجموعة البيانات الخاصة بك عن طريق تصفية الصفوف، باستخدام Apache Spark .filter() أو .where() الأساليب. استخدم التصفية لتحديد مجموعة فرعية من الصفوف لإرجاعها أو تعديلها في DataFrame. لا يوجد فرق في الأداء أو بناء الجملة، كما هو ملاحظ في الأمثلة التالية.

استخدام أسلوب .filter()

انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark .filter() لعرض تلك الصفوف في DataFrame بعدد يزيد عن 50.

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

استخدام أسلوب .where()

انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark .where() لعرض تلك الصفوف في DataFrame بعدد يزيد عن 50.

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

تحديد أعمدة من DataFrame وترتيبها حسب التردد

تعرف على تكرار اسم الطفل باستخدام select() الأسلوب لتحديد الأعمدة من DataFrame لإرجاعها. استخدم Apache Spark orderby والوظائف desc لترتيب النتائج.

توفر الوحدة النمطية pyspark.sql ل Apache Spark الدعم لوظائف SQL. من بين هذه الوظائف التي نستخدمها في هذا البرنامج التعليمي هي وظائف Apache Spark orderBy()و desc()و expr() . يمكنك تمكين استخدام هذه الدالات عن طريق استيرادها إلى جلسة العمل الخاصة بك حسب الحاجة.

انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستورد هذه التعليمة البرمجية الدالة desc() ثم تستخدم أسلوب Apache Spark select() وApache Spark orderBy() والوظائف desc() لعرض الأسماء الأكثر شيوعا وعددها بترتيب تنازلي.

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

إنشاء مجموعة فرعية DataFrame

تعرف على كيفية إنشاء مجموعة فرعية DataFrame من DataFrame موجود.

انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark filter لإنشاء DataFrame جديد يقيد البيانات حسب السنة والعدد والجنس. يستخدم أسلوب Apache Spark select() للحد من الأعمدة. كما يستخدم Apache Spark orderBy() والوظائف desc() لفرز DataFrame الجديد حسب العدد.

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

الخطوة 5: حفظ DataFrame

تعرف على كيفية حفظ DataFrame. يمكنك إما حفظ DataFrame إلى جدول أو كتابة DataFrame إلى ملف أو ملفات متعددة.

حفظ DataFrame في جدول

يستخدم Azure Databricks تنسيق Delta Lake لكافة الجداول بشكل افتراضي. لحفظ DataFrame الخاص بك، يجب أن يكون لديك CREATE امتيازات الجدول على الكتالوج والمخطط.

انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تحفظ هذه التعليمة البرمجية محتويات DataFrame في جدول باستخدام المتغير الذي حددته في بداية هذا البرنامج التعليمي.

Python

df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$tables" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

تعمل معظم تطبيقات Apache Spark على مجموعات بيانات كبيرة وبطريقة موزعة. يكتب Apache Spark دليلا من الملفات بدلا من ملف واحد. يقسم Delta Lake مجلدات وملفات Parquet. يمكن للعديد من أنظمة البيانات قراءة هذه الدلائل من الملفات. توصي Azure Databricks باستخدام الجداول عبر مسارات الملفات لمعظم التطبيقات.

حفظ DataFrame إلى ملفات JSON

انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تحفظ هذه التعليمة البرمجية DataFrame إلى دليل ملفات JSON.

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

قراءة DataFrame من ملف JSON

تعرف على كيفية استخدام أسلوب Apache Spark spark.read.format() لقراءة بيانات JSON من دليل إلى DataFrame.

انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تعرض هذه التعليمة البرمجية ملفات JSON التي حفظتها في المثال السابق.

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

مهام إضافية: تشغيل استعلامات SQL في PySpark وSc scala وR

توفر Apache Spark DataFrames الخيارات التالية لدمج SQL مع PySpark وSc scala وR. يمكنك تشغيل التعليمات البرمجية التالية في نفس دفتر الملاحظات الذي قمت بإنشائه لهذا البرنامج التعليمي.

تحديد عمود كتعلام SQL

تعرف على كيفية استخدام أسلوب Apache Spark selectExpr() . هذا هو متغير من select() الأسلوب الذي يقبل تعبيرات SQL وإرجاع DataFrame محدث. يسمح لك هذا الأسلوب باستخدام تعبير SQL، مثل upper.

انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark selectExpr() وتعبير SQL upper لتحويل عمود سلسلة إلى أحرف كبيرة (وإعادة تسمية العمود).

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

استخدم expr() لاستخدام بناء جملة SQL لعمود

تعرف على كيفية استيراد واستخدام دالة Apache Spark expr() لاستخدام بناء جملة SQL في أي مكان يتم فيه تحديد عمود.

انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستورد هذه التعليمة البرمجية الدالة expr() ثم تستخدم دالة Apache Spark expr() وتعبير SQL lower لتحويل عمود سلسلة إلى حالة صغيرة (وإعادة تسمية العمود).

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

تشغيل استعلام SQL عشوائي باستخدام الدالة spark.sql()

تعرف على كيفية استخدام وظيفة Apache Spark spark.sql() لتشغيل استعلامات SQL العشوائية.

انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية دالة Apache Spark spark.sql() للاستعلام عن جدول SQL باستخدام بناء جملة SQL.

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

دفتر ملاحظات البرنامج التعليمي DataFrame

يتضمن دفتر الملاحظات التالي أمثلة الاستعلامات من هذا البرنامج التعليمي.

Python

البرنامج التعليمي DataFrames باستخدام دفتر ملاحظات Python

الحصول على دفتر الملاحظات

Scala

البرنامج التعليمي DataFrames باستخدام دفتر ملاحظات Scala

الحصول على دفتر الملاحظات

R

البرنامج التعليمي DataFrames باستخدام دفتر ملاحظات R

الحصول على دفتر الملاحظات

الموارد الإضافية