使用 Spark 处理数据文件

已完成

使用 Spark 的好处之一是,可以使用各种编程语言编写和运行代码,使你能够运用已掌握的编程技能,并针对给定任务使用最合适的语言。 新的 Azure Databricks Spark 笔记本中的默认语言是 PySpark - Python 的 Spark 优化版本,由于它对数据操作和可视化效果的强大支持,因此数据科学家和分析师通常会使用它。 此外,还可以使用 Scala(一种可交互使用的 Java 派生语言)和 SQL(常用 SQL 语言的变体,包含在 Spark SQL 库中,用于处理关系数据结构)等语言。 软件工程师还可以使用 Java 等框架创建在 Spark 上运行的编译解决方案。

使用数据帧探索数据

Spark 原本使用一种称为弹性分布式数据集 (RDD) 的数据结构;但是虽然可以编写直接处理 RDD 的代码,但在 Spark 中处理结构化数据最常用的数据结构是数据帧,它作为 Spark SQL 库的一部分提供。 Spark 中的数据帧类似于通用 Pandas Python 库中的数据帧,但经过优化,可以在 Spark 的分布式处理环境中工作。

注意

除了 Dataframe API,Spark SQL 还提供了 Java 和 Scala 支持的强类型 Dataset API。 在本模块中,我们将重点介绍 Dataframe API。

将数据加载到数据帧中

我们来看看一个假设示例,了解如何使用数据帧来处理数据。 假设你在 Databricks 文件系统 (DBFS) 存储的“数据”文件夹中名为 products.csv 的以逗号分隔的文本文件中有以下数据:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

在 Spark 笔记本中,可以使用以下 PySpark 代码将数据加载到数据帧中并显示前 10 行:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

开头的 %pyspark 行称为 magic,它告诉 Spark 此单元格中使用的语言是 PySpark。 下面是产品数据示例的等效 Scala 代码:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

magic %spark 用于指定 Scala。

提示

你还可以为笔记本界面中的每个单元格选择要使用的语言。

上述两个示例都会生成如下输出:

ProductID ProductName 类别 ListPrice
771 Mountain-100 Silver, 38 山地自行车 3399.9900
772 Mountain-100 Silver, 42 山地自行车 3399.9900
773 Mountain-100 Silver, 44 山地自行车 3399.9900
... ... ... ...

指定数据帧架构

在前面的示例中,CSV 文件的第一行包含列名,Spark 能够根据每一列所包含的数据推断出其数据类型。 还可以指定数据的显式架构,这在数据文件中不包含列名时很有用,例如此 CSV 示例:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

以下 PySpark 示例演示了如何指定要从名为 product-data.csv 的文件加载的数据帧的架构,格式如下:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

结果将再次类似于以下内容:

ProductID ProductName 类别 ListPrice
771 Mountain-100 Silver, 38 山地自行车 3399.9900
772 Mountain-100 Silver, 42 山地自行车 3399.9900
773 Mountain-100 Silver, 44 山地自行车 3399.9900
... ... ... ...

对数据帧进行筛选和分组

可以使用 Dataframe 类的方法来对所包含的数据进行筛选、排序、分组和执行其他操作。 例如,以下代码示例使用 select 方法从包含前面示例中的产品数据的 df 数据帧中检索 ProductName 和 ListPrice 列:

pricelist_df = df.select("ProductID", "ListPrice")

此示例代码的结果可能如下所示:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

与大多数数据操作方法一样,select 返回一个新的数据帧对象。

提示

从数据帧中选择部分列是一种常见的操作,也可以通过使用以下较短的语法来实现:

pricelist_df = df["ProductID", "ListPrice"]

可以将多个方法“链接”在一起来执行一系列操作,从而生成转换后的数据帧。 例如,此示例代码将 select 和 where 方法链接在一起来创建新的数据帧,其中包含用于“山地自行车”或“公路自行车”类别的产品的 ProductName 和 ListPrice 列:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

此示例代码的结果可能如下所示:

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 黑色,52 539.9900
... ...

要对数据进行分组和聚合,可以使用 groupBy 方法和聚合函数。 例如,以下 PySpark 代码计算每个类别的产品数量:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

此示例代码的结果可能如下所示:

类别 count
耳机 3
车轮 14
山地自行车 32
... ...

在 Spark 中使用 SQL 表达式

Dataframe API 是名为 Spark SQL 的 Spark 库的一部分,它使数据分析师能够使用 SQL 表达式来查询和操作数据。

在 Spark 目录中创建数据库对象

Spark 目录是关系数据对象(例如视图和表)的元存储。 Spark 运行时可以使用目录将用任何 Spark 支持的语言编写的代码与 SQL 表达式无缝集成,对于一些数据分析师或开发人员来说,SQL 表达式可能更合理。

使数据帧中的数据可用于在 Spark 目录中查询的最简单方法之一是创建一个临时视图,如以下代码示例所示:

df.createOrReplaceTempView("products")

视图是临时的,这意味着它会在当前会话结束时被自动删除。 还可以创建持久保存在目录中的表,以定义可以使用 Spark SQL 查询的数据库。

注意

在本模块中,我们不会深入探讨 Spark 目录表,但值得花时间强调几个关键点:

  • 可以使用 spark.catalog.createTable 方法创建空表。 表是元数据结构,该结构会将其基础数据存储在与目录关联的存储位置。 删除表也会删除其基础数据。
  • 可以使用数据帧的 saveAsTable 方法将其保存为表。
  • 可以使用 spark.catalog.createExternalTable 方法创建外部表。 外部表定义目录中的元数据,但从外部存储位置获取其基础数据;通常是数据湖中的文件夹。 删除外部表不会删除基础数据。

使用 Spark SQL API 查询数据

可以使用采用任何语言编写的代码中的 Spark SQL API 来查询目录中的数据。 例如,以下 PySpark 代码使用 SQL 查询将 products 视图中的数据作为数据帧返回。

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

代码示例的结果类似于下表:

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 黑色,52 539.9900
... ...

使用 SQL 代码

前面的示例演示了如何使用 Spark SQL API 在 Spark 代码中嵌入 SQL 表达式。 在笔记本中,还可以使用 %sql magic 来运行查询目录中的对象的 SQL 代码,如下所示:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

SQL 代码示例返回了一个结果集,它在笔记本中自动显示为表,如下所示:

类别 ProductCount
骑行背带短裤 3
自行车车架 1
单车存放架 1
... ...