教學課程:使用Apache Spark DataFrame載入和轉換數據
本教學課程說明如何使用 Azure Databricks 中的 Apache Spark Python (PySpark) DataFrame API、Apache Spark Scala DataFrame API 和 SparkR SparkDataFrame API 來載入和轉換數據。
在本教學課程結束時,您將瞭解 DataFrame 是什麼,並熟悉下列工作:
Python
- 定義變數並將公用數據複製到 Unity 目錄磁碟區
- 使用 Python 建立 DataFrame
- 從 CSV 檔案將數據載入 DataFrame
- 檢視與 DataFrame 互動
- 儲存 DataFrame
- 在 PySpark 中執行 SQL 查詢
另 請參閱 Apache Spark PySpark API 參考。
Scala
- 定義變數並將公用數據複製到 Unity 目錄磁碟區
- 使用 Scala 建立 DataFrame
- 從 CSV 檔案將數據載入 DataFrame
- 檢視與 DataFrame 互動
- 儲存 DataFrame
- 在 Apache Spark 中執行 SQL 查詢
另 請參閱 Apache Spark Scala API 參考。
R
- 定義變數並將公用數據複製到 Unity 目錄磁碟區
- 建立 SparkR SparkDataFrames
- 從 CSV 檔案將數據載入 DataFrame
- 檢視與 DataFrame 互動
- 儲存 DataFrame
- 在 SparkR 中執行 SQL 查詢
什麼是 DataFrame?
DataFrame 是具有可能不同類型之數據行的二維標籤數據結構。 您可以將 DataFrame 想像成電子表格、SQL 資料表或數位物件的字典。 Apache Spark DataFrame 提供一組豐富的函式(選取數據行、篩選、聯結、匯總),可讓您有效率地解決常見的數據分析問題。
Apache Spark DataFrame 是建立在彈性分散式數據集 (RDD) 之上的抽象概念。 Spark DataFrame 和 Spark SQL 使用統一的規劃和優化引擎,可讓您在 Azure Databricks (Python、SQL、Scala 和 R) 上取得所有支援語言的幾乎完全相同的效能。
需求
若要完成下列教學課程,您必須符合下列需求:
若要使用本教學課程中的範例,您的工作區必須 啟用 Unity 目錄 。
本教學課程中的範例會使用 Unity 目錄 磁碟區 來儲存範例數據。 若要使用這些範例,請建立磁碟區,並使用該磁碟區的目錄、架構和磁碟區名稱來設定範例所使用的磁碟區路徑。
您必須在 Unity 目錄中具有下列權限:
READ VOLUME
和WRITE VOLUME
,或ALL PRIVILEGES
用於本教學課程的磁碟區。USE SCHEMA
或ALL PRIVILEGES
用於本教學課程的架構。USE CATALOG
或ALL PRIVILEGES
用於本教學課程的目錄。
若要設定這些許可權,請參閱 Databricks 系統管理員或 Unity 目錄許可權和安全性實體物件。
提示
如需本文已完成的筆記本,請參閱 DataFrame 教學課程筆記本。
步驟 1:定義變數並載入 CSV 檔案
此步驟會定義變數以供本教學課程使用,然後載入 CSV 檔案,其中包含從 health.data.ny.gov 到 Unity 目錄磁碟區的嬰兒名稱數據。
按兩下圖示以 開啟新的筆記本。 若要瞭解如何流覽 Azure Databricks 筆記本,請參閱 Databricks 筆記本介面和控件。
將下列程式代碼複製並貼到新的空白筆記本數據格中。 將、
<schema-name>
和<volume-name>
取代<catalog-name>
為 Unity 目錄磁碟區的目錄、架構和磁碟區名稱。 將取代<table_name>
為您選擇的數據表名稱。 稍後在本教學課程中,您會將嬰兒名稱數據載入此數據表。按
Shift+Enter
以執行儲存格並建立新的空白儲存格。Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_tables = catalog + "." + schema print(path_tables) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val file_name = "rows.csv" val table_name = "<table_name>" val path_volume = s"/Volumes/$catalog/$schema/$volume" val path_tables = s"$catalog.$schema.$table_name" print(path_volume) // Show the complete path print(path_tables) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_tables <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_tables) # Show the complete path
將下列程式代碼複製並貼到新的空白筆記本數據格中。 此程式代碼會
rows.csv
使用 Databricks dbutuils 命令,將檔案從 health.data.ny.gov 複製到 Unity 目錄磁碟區。按
Shift+Enter
以執行儲存格,然後移至下一個儲存格。Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
Scala
dbutils.fs.cp(download_url, s"$path_volume/$file_name")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
步驟 2:建立數據框架
此步驟會建立名為 df1
且具有測試數據的 DataFrame,然後顯示其內容。
將下列程式代碼複製並貼到新的空白筆記本數據格中。 此程式代碼會使用測試數據建立 Dataframe,然後顯示 DataFrame 的內容和架構。
按
Shift+Enter
以執行儲存格,然後移至下一個儲存格。Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = c(2021), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = c(42) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
步驟 3:從 CSV 檔案將數據載入 DataFrame
此步驟會從您先前載入 Unity 目錄磁碟區的 CSV 檔案建立名為 df_csv
的數據框架。 請參閱 spark.read.csv。
將下列程式代碼複製並貼到新的空白筆記本數據格中。 此程式代碼會從 CSV 檔案將嬰兒名稱數據載入 DataFrame
df_csv
,然後顯示 DataFrame 的內容。按
Shift+Enter
以執行儲存格,然後移至下一個儲存格。Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val df_csv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$path_volume/$file_name") display(df_csv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
您可以從許多 支援的檔案格式載入資料。
步驟 4:檢視 DataFrame 並與其互動
使用下列方法,檢視並與您的嬰兒名稱 DataFrame 互動。
列印 DataFrame 架構
瞭解如何顯示 Apache Spark DataFrame 的架構。 Apache Spark 會使用架構一詞來參考 DataFrame 中數據行的名稱和數據類型。
將下列程式代碼複製並貼到空的筆記本數據格中。 此程式代碼會使用 .printSchema()
方法來顯示 DataFrame 的架構,以檢視兩個 DataFrame 的架構 , 以準備將兩個 DataFrame 聯集。
Python
df_csv.printSchema()
df1.printSchema()
Scala
df_csv.printSchema()
df1.printSchema()
R
printSchema(df_csv)
printSchema(df1)
注意
Azure Databricks 也會使用架構一詞來描述向目錄註冊的數據表集合。
在 DataFrame 中重新命名數據行
瞭解如何重新命名 DataFrame 中的數據行。
將下列程式代碼複製並貼到空的筆記本數據格中。 此程式代碼會重新命名 DataFrame 中的數據 df1_csv
行,以符合 DataFrame 中的 df1
個別數據行。 此程式代碼會使用 Apache Spark withColumnRenamed()
方法。
Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema
Scala
val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)
合併 DataFrame
瞭解如何建立新的 DataFrame,以將一個 DataFrame 的數據列新增至另一個數據框架。
將下列程式代碼複製並貼到空的筆記本數據格中。 此程式代碼會使用 Apache Spark union()
方法,將第一個 DataFrame df
的內容與包含從 CSV 檔案載入之嬰兒名稱數據的 DataFrame df_csv
合併。
Python
df = df1.union(df_csv)
display(df)
Scala
val df = df1.union(df_csv_renamed)
display(df)
R
display(df <- union(df1, df_csv))
篩選 DataFrame 中的數據列
使用 Apache Spark .filter()
或 .where()
方法篩選數據列,探索數據集中最受歡迎的嬰兒名稱。 使用篩選來選取數據列子集,以在DataFrame中傳回或修改。 效能或語法沒有任何差異,如下列範例所示。
使用 .filter() 方法
將下列程式代碼複製並貼到空的筆記本數據格中。 此程式代碼會使用 Apache Spark .filter()
方法,在 DataFrame 中顯示計數超過 50 的數據列。
Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
使用 .where() 方法
將下列程式代碼複製並貼到空的筆記本數據格中。 此程式代碼會使用 Apache Spark .where()
方法,在 DataFrame 中顯示計數超過 50 的數據列。
Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
從 DataFrame 選取數據行,並依頻率排序
瞭解使用 方法指定 select()
要傳回之 DataFrame 資料行的嬰兒名稱頻率。 使用 Apache Spark 和desc
函orderby
式來排序結果。
Apache Spark 的 pyspark.sql模組提供 SQL 函式的支援。 在本教學課程中使用的這些函式中,有 Apache Spark orderBy()
、 desc()
和 expr()
函式。 您可以視需要將它們匯入會話,以啟用這些函式的使用。
將下列程式代碼複製並貼到空的筆記本數據格中。 此程式代碼會匯入函式,desc()
然後使用 Apache Spark 方法和 Apache Spark select()
和desc()
函orderBy()
式,以遞減順序顯示最常見的名稱及其計數。
Python
from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
建立子集 DataFrame
瞭解如何從現有的 DataFrame 建立子集 DataFrame。
將下列程式代碼複製並貼到空的筆記本數據格中。 此程式代碼會使用 Apache Spark filter
方法來建立新的 DataFrame,以依年份、計數和性別來限制數據。 它會使用 Apache Spark select()
方法來限制數據行。 它也會使用 Apache Spark 和desc()
函orderBy()
式來依計數排序新的 DataFrame。
Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)
步驟 5:儲存數據框架
瞭解如何儲存DataFrame。。 您可以將 DataFrame 儲存至數據表,或將數據框架寫入檔案或多個檔案。
將 DataFrame 儲存至數據表
根據預設,Azure Databricks 會針對所有數據表使用 Delta Lake 格式。 若要儲存 DataFrame,您必須擁有 CREATE
目錄和架構的數據表許可權。
將下列程式代碼複製並貼到空的筆記本數據格中。 此程式代碼會使用您在本教學課程開始時定義的變數,將 DataFrame 的內容儲存至資料表。
Python
df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")
# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")
Scala
df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")
// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$tables" + "." + s"$table_name")
R
saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")
大部分的 Apache Spark 應用程式都以分散式方式在大型數據集上運作。 Apache Spark 會寫出檔案目錄,而不是單一檔案。 Delta Lake 會分割 Parquet 資料夾和檔案。 許多數據系統都可以讀取這些檔案目錄。 Azure Databricks 建議針對大多數應用程式使用檔案路徑的數據表。
將 DataFrame 儲存至 JSON 檔案
將下列程式代碼複製並貼到空的筆記本數據格中。 此程式代碼會將 DataFrame 儲存至 JSON 檔案的目錄。
Python
df.write.format("json").save("/tmp/json_data")
# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").save("/tmp/json_data")
// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
從 JSON 檔案讀取 DataFrame
瞭解如何使用 Apache Spark spark.read.format()
方法,將 JSON 數據從目錄讀取到 DataFrame。
將下列程式代碼複製並貼到空的筆記本數據格中。 此程式代碼會顯示您在上一個範例中儲存的 JSON 檔案。
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
其他工作:在 PySpark、Scala 和 R 中執行 SQL 查詢
Apache Spark DataFrame 提供下列選項來結合 SQL 與 PySpark、Scala 和 R。您可以在為本教學課程建立的相同筆記本中執行下列程序代碼。
將數據行指定為 SQL 查詢
瞭解如何使用 Apache Spark selectExpr()
方法。 這是接受 SQL 運算式並傳回更新之 DataFrame 之方法的 select()
變體。 這個方法可讓您使用 SQL 表示式,例如 upper
。
將下列程式代碼複製並貼到空的筆記本數據格中。 此程式代碼會使用 Apache Spark selectExpr()
方法和 SQL upper
運算式,將字串資料行轉換成大寫(並重新命名數據行)。
Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
使用 expr()
來使用數據行的 SQL 語法
瞭解如何匯入並使用 Apache Spark expr()
函式,以在指定數據行的任何位置使用 SQL 語法。
將下列程式代碼複製並貼到空的筆記本數據格中。 此程式代碼會匯入 expr()
函式,然後使用 Apache Spark expr()
函式和 SQL lower
運算式,將字串資料行轉換成小寫(並重新命名數據行)。
Python
from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function
display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality
使用 spark.sql() 函式執行任意 SQL 查詢
瞭解如何使用 Apache Spark spark.sql()
函式來執行任意 SQL 查詢。
將下列程式代碼複製並貼到空的筆記本數據格中。 此程式代碼會使用 Apache Spark spark.sql()
函式,使用 SQL 語法查詢 SQL 資料表。
Python
display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))
R
display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))
DataFrame 教學課程筆記本
下列筆記本包含本教學課程中的範例查詢。
Python
使用 Python 筆記本的 DataFrames 教學課程
Scala
使用 Scala 筆記本的 DataFrames 教學課程
R
使用 R 筆記本的 DataFrames 教學課程
其他資源
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應