共用方式為


教學課程:使用 Apache Spark DataFrames 載入並轉換資料

本教學課程介紹如何使用 Azure Databricks 的 Apache Spark Python (PySpark) DataFrame API、Apache Spark Scala DataFrame API 及 SparkR SparkDataFrame API 載入並轉換資料。

注意

如果你使用 Databricks 免費版,請選擇 Python 標籤以查看本教學中的所有程式碼範例。 免費版不支援 R 或 Scala。 此外,免費版限制了外網存取,因此你必須透過工作區介面上傳 CSV 檔案,不能用程式碼下載。 詳細說明請參見 步驟一

在本教學課程結束時,您將了解什麼是 DataFrame 並熟悉以下工作:

Python

另請參閱Apache Spark PySpark API 參照

程式語言 Scala

另請參閱Apache Spark Scala API 參照.

R

另請參閱Apache SparkR API 參照.

什麼是 DataFrame?

DataFrame 是具有可能不同類型之數據行的二維標籤數據結構。 您可以將 DataFrame 想像成電子表格、SQL 資料表或數位物件的字典。 Apache Spark DataFrame 提供一組豐富的函式(選取數據行、篩選、聯結、匯總),可讓您有效率地解決常見的數據分析問題。

Apache Spark DataFrame 是以彈性分散式資料集 (RDD) 為基礎建置的抽象概念。 Spark DataFrame 和 Spark SQL 使用統一的規劃和優化引擎,可讓您在 Azure Databricks (Python、SQL、Scala 和 R) 上取得所有支援語言的幾乎完全相同的效能。

需求

要完成以下教學課程,您必須滿足以下要求:

  • 若要使用本教學課程中的範例,您的工作區必須已啟用 Unity 目錄。 Azure Databricks 免費版和免費試用工作區預設啟用了 Unity 目錄。

  • 本教學課程中的範例會使用 Unity 目錄 磁碟區 來儲存範例數據。 若要使用這些範例,請建立磁碟區,並使用該磁碟區的目錄、架構和磁碟區名稱來設定範例所使用的磁碟區路徑。 免費版使用者預設可存取工作區目錄與 default 結構。

  • 您必須在 Unity 目錄中具有下列權限:

    • READ VOLUME 以及 WRITE VOLUME 用於本教學的磁碟區
    • USE SCHEMA 針對本教學所使用的架構
    • USE CATALOG 本教學所用目錄

    要設定這些權限,請參閱你的 Azure Databricks 管理員或 Unity 目錄權限與可保護物件。 免費版使用者預設擁有工作區目錄與 default 結構的這些權限。

提示

有關本文的完整筆記本,請參閱DataFrame 教學課程筆記本

步驟 1:定義變數並載入 CSV 檔案

此步驟會定義用於本教學課程的變數,然後載入包含嬰兒名字資料的 CSV 檔案,這些資料是從 health.data.ny.gov 匯入到您的 Unity Catalog 磁碟區中。 你需要 Unity Catalog 目錄、結構和容量的名稱。

提示

如果你不知道你的目錄和結構名稱,請點選 「資料」圖示。目錄 請見側邊欄。 工作區目錄與你的工作區共用名稱,並列在目錄面板中。 展開它可以看到可用的結構圖。 免費版及免費試用版使用者可以使用工作區目錄與 default 架構。

如果你沒有磁碟區,可以在筆記本儲存格執行以下指令(將 和 <catalog_name> 替換<schema_name>成你的值):

CREATE VOLUME IF NOT EXISTS <catalog_name>.<schema_name>.my_volume
  1. 按一下新增圖示圖示以開啟新筆記本。 若要瞭解如何流覽 Azure Databricks 筆記本,請參閱 自定義筆記本外觀

  2. 請將下列程式碼複製並貼到全新空白筆記本資料格。 將 <catalog-name><schema-name><volume-name> 替換為 Unity Catalog 的目錄名稱、架構名稱和磁碟區名稱。 以您選擇的數據表名稱取代 <table_name>。 你要在這個教學後面把寶寶名字資料載入這個表格。

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    程式語言 Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. 按下 Shift+Enter 以執行儲存格並建立新的空白儲存格。

  4. 將 CSV 檔載入你的磁碟區。 請選擇下列其中一個方法:

    • 使用工作區介面上傳 — 如果你使用 的是 Databricks 免費版,或是選項 B 的程式碼下載因網路錯誤失敗,請使用此方法。 Free Edition 和其他無伺服器運算環境限制了外接網際網路,因此你必須從本地機器上傳檔案。
    • 使用程式碼下載 ——如果您的運算環境有外接網路,請使用此方法。

    選項A:使用工作區介面上傳

    1. 在你本機的電腦上,在瀏覽器開啟 health.data.ny.gov/api/views/jxy9-yhdk/rows.csv 。 檔案下載到你的電腦時為 rows.csv,與先前定義的變數相符 file_name
    2. 回到你的 Azure Databricks 工作區。 在側邊欄點選 新圖示、新增 > 新增或上傳資料
    3. 點擊 「上傳檔案到磁碟區」。
    4. 點擊 瀏覽 並選擇 rows.csv 檔案,或將其拖曳到上傳區域。
    5. 目的磁碟區,選擇您前面指定的磁碟區。
    6. 上傳完成後,回到筆記本繼續步驟 2

    關於上傳檔案的更多細節,請參見 「上傳檔案至Unity Catalog volume」。

    選項B:使用程式碼下載

    請將下列程式碼複製並貼到全新空白筆記本資料格。 這段程式碼會用 rows.csv 指令,將檔案從 health.data.ny.gov 複製到 Unity 目錄卷。 按 Shift+Enter 執行此資料格,然後移至下一個資料格。

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    程式語言 Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

步驟 2:建立 DataFrame

此步驟建立一個以測試資料命名df1的 DataFrame,然後顯示其內容。

  1. 請將下列程式碼複製並貼到全新空白筆記本資料格。 此程式代碼會使用測試數據建立 DataFrame,然後顯示 DataFrame 的內容和架構。

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    # highlight-next-line
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    程式語言 Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    // highlight-next-line
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    # highlight-next-line
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

步驟 3:將資料從 CSV 檔案載入至 DataFrame

此步驟會從您先前載入 Unity 目錄磁碟區的 CSV 檔案建立名為 df_csv 的數據框架。 請參閱spark.read.csv

  1. 請將下列程式碼複製並貼到全新空白筆記本資料格。 此程式碼將嬰兒名稱資料df_csv從 CSV 檔案載入至 DataFrame,然後展示 DataFrame 的內容。

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    程式語言 Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

您可以從許多支援的檔案格式載入資料

步驟 4:檢視 DataFrame 並與其互動

使用以下方法檢視您的寶寶姓名 DataFrames 並與其互動。

瞭解如何顯示 Apache Spark DataFrame 的架構。 Apache Spark 會使用架構 一詞 來參考 DataFrame 中數據行的名稱和數據類型。

注意

Azure Databricks 也會使用架構一詞來描述向目錄註冊的數據表集合。

  1. 請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會使用 .printSchema() 方法來顯示DataFrame的架構,以檢視兩個DataFrame的架構- 以準備將兩個DataFrame聯集。

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    程式語言 Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

在 DataFrame 中重新命名數據行

瞭解如何重新命名 DataFrame 中的數據行。

  1. 請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會重新命名 df1_csv DataFrame 中的數據行,以符合 df1 DataFrame 中的個別數據行。 此程式碼使用 Apache SparkwithColumnRenamed()方法。

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema()
    

    程式語言 Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

合併「DataFrames」

了解如何建立一個新的 DataFrame,將一個 DataFrame 的列新增至另一個 DataFrame。

  1. 請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼使用 Apache Sparkunion()方法將第一個 DataFrame 的內容dfdf_csv包含從 CSV 檔案載入的嬰兒姓名資料的 DataFrame 合併。

    Python

    df = df1.union(df_csv)
    display(df)
    

    程式語言 Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

篩選 DataFrame 的行列

使用 Apache Spark .filter().where() 方法來篩選數據列,探索數據集中最受歡迎的嬰兒名稱。 使用篩選來選取數據列子集,以在DataFrame中傳回或修改。 效能或語法沒有差異,如以下範例所示。

使用 .filter() 方法

  1. 請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼使用 Apache Spark .filter() 方法,在 DataFrame 中顯示計數超過 50 的這些資料列。

    Python
    display(df.filter(df["Count"] > 50))
    
    程式語言 Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

使用 .where() 方法

  1. 請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼使用 Apache Spark .where() 方法,在 DataFrame 中顯示計數超過 50 的這些資料列。

    Python
    display(df.where(df["Count"] > 50))
    
    程式語言 Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

從 DataFrame 選取數據行,並依頻率排序

請了解如何使用 select() 方法來指定要從 DataFrame 中返回的嬰兒名字頻率欄位。 使用 Apache Spark orderbydesc函式來排序結果。

Apache Spark 的 pyspark.sql 模組提供對 SQL 函式的支援。 我們在本教學中使用的這些函數包括 Apache Spark orderBy()desc()expr() 函數。 您可以在需要時將這些函式匯入到您的工作階段中,以啟用這些功能。

  1. 請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼匯入 desc() 函式,然後使用 Apache Spark select() 方法與 Apache Spark orderBy() 以及 desc() 函式,按遞減順序顯示最常見的名稱及其計數。

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    程式語言 Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

建立子集的數據框架

了解如何從現有 DataFrame 建立子集合 DataFrame。

  1. 請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼使用 Apache Spark filter方法建立新的 DataFrame,按年份、計數及性別限制資料。 它會使用 Apache Spark select() 方法來限制數據行。 它還使用 Apache Spark orderBy()desc()函式按計數對新的 DataFrame 進行排序。

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    程式語言 Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

步驟 5:儲存 DataFrame

了解如何儲存 DataFrame。 您可以將 DataFrame 儲存至數據表,或將數據框架寫入檔案或多個檔案。

將 DataFrame 儲存至數據表

根據預設,Azure Databricks 會針對所有數據表使用 Delta Lake 格式。 若要儲存 DataFrame,您必須在目錄和架構上擁有 CREATE 資料表許可權。

  1. 請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會使用您在本教學課程開始時定義的變數,將 DataFrame 的內容儲存至資料表。

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    程式語言 Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

大多數 Apache Spark 應用程式都以分散式方式處理大型資料集。 Apache Spark 會寫出檔案目錄,而非單一檔案。 Delta Lake 可拆分 Parquet 資料夾與檔案。 許多資料系統可以讀取這些檔案目錄。 Azure Databricks 建議針對大多數應用程式使用檔案路徑的數據表。

將 DataFrame 儲存至 JSON 檔案

  1. 請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼將 DataFrame 儲存至 JSON 檔案目錄。

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    程式語言 Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

從 JSON 文件讀取 DataFrame

了解如何使用 Apache Spark spark.read.format()方法將 JSON 資料從目錄讀取至 DataFrame。

  1. 請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼顯示您在先前範例儲存的 JSON 檔案。

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    程式語言 Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

其他工作:在 PySpark、Scala 及 R 中執行 SQL 查詢

Apache Spark DataFrames 提供以下選項將 SQL 與 PySpark、Scala 及 R 結合。您可以在為本教學課程建立的同一筆記本中執行以下程式碼。

將數據行指定為 SQL 查詢

了解如何使用 Apache Spark selectExpr()方法。 這是select()方法的變體,可接受 SQL 運算式並傳回更新的 DataFrame。 該方法讓您可使用 SQL 運算式,例如upper

  1. 請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會使用 Apache Spark selectExpr() 方法和 SQL upper 表示式,將字串資料行轉換成大寫(並重新命名數據行)。

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    程式語言 Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

使用 expr() 來套用欄位的 SQL 語法

瞭解如何匯入並使用 Apache Spark expr() 函式,以在指定數據行的任何位置使用 SQL 語法。

  1. 請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會匯入 expr() 函式,然後使用Apache Spark expr() 函式和 SQL lower 表示式,將字串數據行轉換成小寫(並將數據行重新命名)。

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    程式語言 Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

使用 spark.sql() 函數執行任意 SQL 查詢

瞭解如何使用 Apache Spark spark.sql() 函數來執行任意 SQL 查詢。

  1. 請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會使用 Apache Spark spark.sql() 函式,使用 SQL 語法查詢 SQL 數據表。

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    程式語言 Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Shift+Enter 執行此資料格,然後移至下一個資料格。

DataFrame 教學課程筆記本

下列筆記本包含本教學課程中的範例查詢。

Python

使用 Python 的 DataFrames 教學課程

拿筆記本

程式語言 Scala

使用 Scala 的 DataFrames 教學課程

拿筆記本

R

使用 R 的 DataFrames 教學課程

拿筆記本

其他資源