使用 Spark 处理数据文件
使用 Spark 的好处之一是,可以使用各种编程语言编写和运行代码,使你能够运用已掌握的编程技能,并针对给定任务使用最合适的语言。 新的 Azure Databricks Spark 笔记本中的默认语言是 PySpark - Python 的 Spark 优化版本,由于它对数据操作和可视化效果的强大支持,因此数据科学家和分析师通常会使用它。 此外,还可以使用 Scala(一种可交互使用的 Java 派生语言)和 SQL(常用 SQL 语言的变体,包含在 Spark SQL 库中,用于处理关系数据结构)等语言。 软件工程师还可以使用 Java 等框架创建在 Spark 上运行的编译解决方案。
使用数据帧探索数据
Spark 原本使用一种称为弹性分布式数据集 (RDD) 的数据结构;但是虽然可以编写直接处理 RDD 的代码,但在 Spark 中处理结构化数据最常用的数据结构是数据帧,它作为 Spark SQL 库的一部分提供。 Spark 中的数据帧类似于通用 Pandas Python 库中的数据帧,但经过优化,可以在 Spark 的分布式处理环境中工作。
注意
除了 Dataframe API,Spark SQL 还提供了 Java 和 Scala 支持的强类型 Dataset API。 在本模块中,我们将重点介绍 Dataframe API。
将数据加载到数据帧中
我们来看看一个假设示例,了解如何使用数据帧来处理数据。 假设你在 Databricks 文件系统 (DBFS) 存储的“数据”文件夹中名为 products.csv 的以逗号分隔的文本文件中有以下数据:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
在 Spark 笔记本中,可以使用以下 PySpark 代码将数据加载到数据帧中并显示前 10 行:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
开头的 %pyspark
行称为 magic,它告诉 Spark 此单元格中使用的语言是 PySpark。 下面是产品数据示例的等效 Scala 代码:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
magic %spark
用于指定 Scala。
提示
你还可以为笔记本界面中的每个单元格选择要使用的语言。
上述两个示例都会生成如下输出:
ProductID | ProductName | 类别 | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | 山地自行车 | 3399.9900 |
772 | Mountain-100 Silver, 42 | 山地自行车 | 3399.9900 |
773 | Mountain-100 Silver, 44 | 山地自行车 | 3399.9900 |
... | ... | ... | ... |
指定数据帧架构
在前面的示例中,CSV 文件的第一行包含列名,Spark 能够根据每一列所包含的数据推断出其数据类型。 还可以指定数据的显式架构,这在数据文件中不包含列名时很有用,例如此 CSV 示例:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
以下 PySpark 示例演示了如何指定要从名为 product-data.csv 的文件加载的数据帧的架构,格式如下:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
结果将再次类似于以下内容:
ProductID | ProductName | 类别 | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | 山地自行车 | 3399.9900 |
772 | Mountain-100 Silver, 42 | 山地自行车 | 3399.9900 |
773 | Mountain-100 Silver, 44 | 山地自行车 | 3399.9900 |
... | ... | ... | ... |
对数据帧进行筛选和分组
可以使用 Dataframe 类的方法来对所包含的数据进行筛选、排序、分组和执行其他操作。 例如,以下代码示例使用 select 方法从包含前面示例中的产品数据的 df 数据帧中检索 ProductName 和 ListPrice 列:
pricelist_df = df.select("ProductID", "ListPrice")
此示例代码的结果可能如下所示:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
与大多数数据操作方法一样,select 返回一个新的数据帧对象。
提示
从数据帧中选择部分列是一种常见的操作,也可以通过使用以下较短的语法来实现:
pricelist_df = df["ProductID", "ListPrice"]
可以将多个方法“链接”在一起来执行一系列操作,从而生成转换后的数据帧。 例如,此示例代码将 select 和 where 方法链接在一起来创建新的数据帧,其中包含用于“山地自行车”或“公路自行车”类别的产品的 ProductName 和 ListPrice 列:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
此示例代码的结果可能如下所示:
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 黑色,52 | 539.9900 |
... | ... |
要对数据进行分组和聚合,可以使用 groupBy 方法和聚合函数。 例如,以下 PySpark 代码计算每个类别的产品数量:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
此示例代码的结果可能如下所示:
类别 | count |
---|---|
耳机 | 3 |
车轮 | 14 |
山地自行车 | 32 |
... | ... |
在 Spark 中使用 SQL 表达式
Dataframe API 是名为 Spark SQL 的 Spark 库的一部分,它使数据分析师能够使用 SQL 表达式来查询和操作数据。
在 Spark 目录中创建数据库对象
Spark 目录是关系数据对象(例如视图和表)的元存储。 Spark 运行时可以使用目录将用任何 Spark 支持的语言编写的代码与 SQL 表达式无缝集成,对于一些数据分析师或开发人员来说,SQL 表达式可能更合理。
使数据帧中的数据可用于在 Spark 目录中查询的最简单方法之一是创建一个临时视图,如以下代码示例所示:
df.createOrReplaceTempView("products")
视图是临时的,这意味着它会在当前会话结束时被自动删除。 还可以创建持久保存在目录中的表,以定义可以使用 Spark SQL 查询的数据库。
注意
在本模块中,我们不会深入探讨 Spark 目录表,但值得花时间强调几个关键点:
- 可以使用
spark.catalog.createTable
方法创建空表。 表是元数据结构,该结构会将其基础数据存储在与目录关联的存储位置。 删除表也会删除其基础数据。 - 可以使用数据帧的
saveAsTable
方法将其保存为表。 - 可以使用
spark.catalog.createExternalTable
方法创建外部表。 外部表定义目录中的元数据,但从外部存储位置获取其基础数据;通常是数据湖中的文件夹。 删除外部表不会删除基础数据。
使用 Spark SQL API 查询数据
可以使用采用任何语言编写的代码中的 Spark SQL API 来查询目录中的数据。 例如,以下 PySpark 代码使用 SQL 查询将 products 视图中的数据作为数据帧返回。
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
代码示例的结果类似于下表:
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 黑色,52 | 539.9900 |
... | ... |
使用 SQL 代码
前面的示例演示了如何使用 Spark SQL API 在 Spark 代码中嵌入 SQL 表达式。 在笔记本中,还可以使用 %sql
magic 来运行查询目录中的对象的 SQL 代码,如下所示:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
SQL 代码示例返回了一个结果集,它在笔记本中自动显示为表,如下所示:
类别 | ProductCount |
---|---|
骑行背带短裤 | 3 |
自行车车架 | 1 |
单车存放架 | 1 |
... | ... |