البرنامج التعليمي: تحميل البيانات وتحويلها باستخدام Apache Spark DataFrames

يوضح لك هذا البرنامج التعليمي كيفية تحميل البيانات وتحويلها باستخدام Apache Spark Python (PySpark) DataFrame API وApache Spark Scala DataFrame API وSparkR SparkDataFrame API في Azure Databricks.

بنهاية هذا البرنامج التعليمي، سوف تفهم ما هو DataFrame وتكون على دراية بالمهام التالية:

Python

راجع أيضا مرجع Apache Spark PySpark API.

Scala

راجع أيضا مرجع Apache Spark Scala API.

R

راجع أيضا مرجع Apache SparkR API.

ما هو DataFrame؟

DataFrame هو بنية بيانات ثنائية الأبعاد تحمل أعمدة من أنواع مختلفة محتملة. يمكنك التفكير في DataFrame مثل جدول بيانات أو جدول SQL أو قاموس كائنات السلسلة. توفر Apache Spark DataFrames مجموعة غنية من الوظائف (تحديد الأعمدة، والتصفية، والانضمام، والتجاميع) التي تسمح لك بحل مشكلات تحليل البيانات الشائعة بكفاءة.

Apache Spark DataFrames هي تجريد مبني على مجموعات البيانات الموزعة المرنة (RDDs). يستخدم Spark DataFrames وSpark SQL محرك تخطيط وتحسين موحد، مما يسمح لك بالحصول على أداء متطابق تقريبا عبر جميع اللغات المدعومة على Azure Databricks (Python وSQL وSca وR).

المتطلبات

لإكمال البرنامج التعليمي التالي، يجب أن تفي بالمتطلبات التالية:

  • لاستخدام الأمثلة في هذا البرنامج التعليمي، يجب تمكين كتالوج Unity لمساحة العمل الخاصة بك.

  • تستخدم الأمثلة في هذا البرنامج التعليمي وحدة تخزين كتالوج Unity لتخزين بيانات العينة. لاستخدام هذه الأمثلة، قم بإنشاء وحدة تخزين واستخدم أسماء كتالوج وحدة التخزين والمخطط ووحدات التخزين لتعيين مسار وحدة التخزين المستخدمة من قبل الأمثلة.

  • يجب أن يكون لديك الأذونات التالية في كتالوج Unity:

    • READ VOLUME و WRITE VOLUME، أو ALL PRIVILEGES وحدة التخزين المستخدمة لهذا البرنامج التعليمي.
    • USE SCHEMA أو ALL PRIVILEGES للمخطط المستخدم لهذا البرنامج التعليمي.
    • USE CATALOG أو ALL PRIVILEGES للكتالوج المستخدم لهذا البرنامج التعليمي.

    لتعيين هذه الأذونات، راجع امتيازات مسؤول Databricks أو كتالوج Unity والعناصر القابلة للتأمين.

تلميح

للحصول على دفتر ملاحظات مكتمل لهذه المقالة، راجع دفاتر ملاحظات البرنامج التعليمي DataFrame.

الخطوة 1: تحديد المتغيرات وتحميل ملف CSV

تحدد هذه الخطوة المتغيرات للاستخدام في هذا البرنامج التعليمي ثم تقوم بتحميل ملف CSV يحتوي على بيانات اسم الطفل من health.data.ny.gov إلى وحدة تخزين كتالوج Unity.

  1. افتح دفتر ملاحظات جديدا بالنقر فوق الأيقونة أيقونة جديدة . لمعرفة كيفية التنقل في دفاتر ملاحظات Azure Databricks، راجع واجهة دفتر ملاحظات Databricks وعناصر التحكم.

  2. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. استبدل <catalog-name>و <schema-name>و <volume-name> بأسماء الكتالوج والمخطط ووحدات التخزين لوحدة تخزين كتالوج Unity. استبدل <table_name> باسم جدول من اختيارك. ستقوم بتحميل بيانات اسم الطفل في هذا الجدول لاحقا في هذا البرنامج التعليمي.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. اضغط Shift+Enter لتشغيل الخلية وإنشاء خلية فارغة جديدة.

  4. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. تنسخ هذه التعليمة البرمجية rows.csv الملف من health.data.ny.gov إلى وحدة تخزين كتالوج Unity باستخدام الأمر Databricks dbutuils .

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

الخطوة 2: إنشاء DataFrame

تنشئ هذه الخطوة DataFrame باسم df1 مع بيانات الاختبار ثم تعرض محتوياته.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. تنشئ هذه التعليمة البرمجية DataFrame مع بيانات الاختبار، ثم تعرض محتويات ومخطط DataFrame.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

الخطوة 3: تحميل البيانات في DataFrame من ملف CSV

تنشئ هذه الخطوة DataFrame باسم df_csv من ملف CSV الذي قمت بتحميله مسبقا في وحدة تخزين كتالوج Unity. انظر spark.read.csv.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر الملاحظات الفارغة الجديدة. تقوم هذه التعليمة البرمجية بتحميل بيانات اسم الطفل في DataFrame df_csv من ملف CSV ثم تعرض محتويات DataFrame.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

يمكنك تحميل البيانات من العديد من تنسيقات الملفات المدعومة.

الخطوة 4: عرض إطار البيانات والتفاعل معه

اعرض أسماء طفلك وتفاعل معها باستخدام DataFrames باستخدام الطرق التالية.

تعرف على كيفية عرض مخطط Apache Spark DataFrame. يستخدم Apache Spark مخطط المصطلح للإشارة إلى أسماء وأنواع البيانات للأعمدة في DataFrame.

إشعار

يستخدم Azure Databricks أيضا مخطط المصطلح لوصف مجموعة من الجداول المسجلة في كتالوج.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تعرض هذه التعليمة البرمجية مخطط DataFrames الخاص بك مع .printSchema() أسلوب لعرض مخططات DataFrames - للتحضير لتوحيد إطاري البيانات.

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

إعادة تسمية العمود في DataFrame

تعرف على كيفية إعادة تسمية عمود في DataFrame.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تعيد هذه التعليمة البرمجية تسمية عمود في df1_csv DataFrame لمطابقة العمود المعني في df1 DataFrame. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark withColumnRenamed() .

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

دمج DataFrames

تعرف على كيفية إنشاء DataFrame جديد يضيف صفوف DataFrame إلى آخر.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark union() لدمج محتويات DataFrame df الأول مع DataFrame df_csv الذي يحتوي على بيانات أسماء الأطفال المحملة من ملف CSV.

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

تصفية الصفوف في DataFrame

اكتشف أسماء الأطفال الأكثر شيوعا في مجموعة البيانات الخاصة بك عن طريق تصفية الصفوف، باستخدام Apache Spark .filter() أو .where() الأساليب. استخدم التصفية لتحديد مجموعة فرعية من الصفوف لإرجاعها أو تعديلها في DataFrame. لا يوجد فرق في الأداء أو بناء الجملة، كما هو ملاحظ في الأمثلة التالية.

استخدام أسلوب .filter()

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark .filter() لعرض تلك الصفوف في DataFrame بعدد يزيد عن 50.

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

استخدام أسلوب .where()

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark .where() لعرض تلك الصفوف في DataFrame بعدد يزيد عن 50.

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

تحديد أعمدة من DataFrame وترتيبها حسب التردد

تعرف على تكرار اسم الطفل باستخدام select() الأسلوب لتحديد الأعمدة من DataFrame لإرجاعها. استخدم Apache Spark orderby والوظائف desc لترتيب النتائج.

توفر الوحدة النمطية pyspark.sql ل Apache Spark الدعم لوظائف SQL. من بين هذه الوظائف التي نستخدمها في هذا البرنامج التعليمي هي وظائف Apache Spark orderBy()و desc()و expr() . يمكنك تمكين استخدام هذه الدالات عن طريق استيرادها إلى جلسة العمل الخاصة بك حسب الحاجة.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستورد هذه التعليمة البرمجية الدالة desc() ثم تستخدم أسلوب Apache Spark select() وApache Spark orderBy() والوظائف desc() لعرض الأسماء الأكثر شيوعا وعددها بترتيب تنازلي.

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

إنشاء مجموعة فرعية DataFrame

تعرف على كيفية إنشاء مجموعة فرعية DataFrame من DataFrame موجود.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark filter لإنشاء DataFrame جديد يقيد البيانات حسب السنة والعدد والجنس. يستخدم أسلوب Apache Spark select() للحد من الأعمدة. كما يستخدم Apache Spark orderBy() والوظائف desc() لفرز DataFrame الجديد حسب العدد.

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

الخطوة 5: حفظ DataFrame

تعرف على كيفية حفظ DataFrame. يمكنك إما حفظ DataFrame إلى جدول أو كتابة DataFrame إلى ملف أو ملفات متعددة.

حفظ DataFrame في جدول

يستخدم Azure Databricks تنسيق Delta Lake لكافة الجداول بشكل افتراضي. لحفظ DataFrame الخاص بك، يجب أن يكون لديك CREATE امتيازات الجدول على الكتالوج والمخطط.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تحفظ هذه التعليمة البرمجية محتويات DataFrame في جدول باستخدام المتغير الذي حددته في بداية هذا البرنامج التعليمي.

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

تعمل معظم تطبيقات Apache Spark على مجموعات بيانات كبيرة وبطريقة موزعة. يكتب Apache Spark دليلا من الملفات بدلا من ملف واحد. يقسم Delta Lake مجلدات وملفات Parquet. يمكن للعديد من أنظمة البيانات قراءة هذه الدلائل من الملفات. توصي Azure Databricks باستخدام الجداول عبر مسارات الملفات لمعظم التطبيقات.

حفظ DataFrame إلى ملفات JSON

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تحفظ هذه التعليمة البرمجية DataFrame إلى دليل ملفات JSON.

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

قراءة DataFrame من ملف JSON

تعرف على كيفية استخدام أسلوب Apache Spark spark.read.format() لقراءة بيانات JSON من دليل إلى DataFrame.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تعرض هذه التعليمة البرمجية ملفات JSON التي حفظتها في المثال السابق.

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

مهام إضافية: تشغيل استعلامات SQL في PySpark وSc scala وR

توفر Apache Spark DataFrames الخيارات التالية لدمج SQL مع PySpark وSc scala وR. يمكنك تشغيل التعليمات البرمجية التالية في نفس دفتر الملاحظات الذي قمت بإنشائه لهذا البرنامج التعليمي.

تحديد عمود كتعلام SQL

تعرف على كيفية استخدام أسلوب Apache Spark selectExpr() . هذا هو متغير من select() الأسلوب الذي يقبل تعبيرات SQL وإرجاع DataFrame محدث. يسمح لك هذا الأسلوب باستخدام تعبير SQL، مثل upper.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية أسلوب Apache Spark selectExpr() وتعبير SQL upper لتحويل عمود سلسلة إلى أحرف كبيرة (وإعادة تسمية العمود).

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

استخدم expr() لاستخدام بناء جملة SQL لعمود

تعرف على كيفية استيراد واستخدام دالة Apache Spark expr() لاستخدام بناء جملة SQL في أي مكان يتم فيه تحديد عمود.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستورد هذه التعليمة البرمجية الدالة expr() ثم تستخدم دالة Apache Spark expr() وتعبير SQL lower لتحويل عمود سلسلة إلى حالة صغيرة (وإعادة تسمية العمود).

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

تشغيل استعلام SQL عشوائي باستخدام الدالة spark.sql()

تعرف على كيفية استخدام وظيفة Apache Spark spark.sql() لتشغيل استعلامات SQL العشوائية.

  1. انسخ التعليمات البرمجية التالية والصقها في خلية دفتر ملاحظات فارغة. تستخدم هذه التعليمة البرمجية دالة Apache Spark spark.sql() للاستعلام عن جدول SQL باستخدام بناء جملة SQL.

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. اضغط Shift+Enter لتشغيل الخلية ثم انتقل إلى الخلية التالية.

دفاتر ملاحظات البرنامج التعليمي DataFrame

تتضمن دفاتر الملاحظات التالية أمثلة على الاستعلامات من هذا البرنامج التعليمي.

Python

البرنامج التعليمي DataFrames باستخدام Python

الحصول على دفتر الملاحظات

Scala

البرنامج التعليمي DataFrames باستخدام Scala

الحصول على دفتر الملاحظات

R

البرنامج التعليمي DataFrames باستخدام R

الحصول على دفتر الملاحظات

الموارد الإضافية