Sử dụng Spark để làm việc với tệp dữ liệu

Đã hoàn thành

Một trong những lợi ích của việc sử dụng Spark là bạn có thể viết và chạy mã bằng nhiều ngôn ngữ lập trình khác nhau, cho phép bạn sử dụng các kỹ năng lập trình mà bạn đã có và sử dụng ngôn ngữ thích hợp nhất cho một tác vụ nhất định. Ngôn ngữ mặc định trong sổ tay Azure Databricks Spark mới là PySpark - phiên bản tối ưu hóa Spark của Python, thường được các nhà khoa học và nhà phân tích dữ liệu sử dụng nhờ sự hỗ trợ mạnh mẽ về thao tác và trực quan hóa dữ liệu. Ngoài ra, bạn có thể sử dụng các ngôn ngữ như Scala (một ngôn ngữ bắt nguồn từ Java có thể được sử dụng tương tác) và SQL (một biến thể của ngôn ngữ SQL thường được sử dụng được bao gồm trong thư viện Spark SQL để làm việc với các cấu trúc dữ liệu có liên quan). Kỹ sư phần mềm cũng có thể tạo các giải pháp được biên dịch chạy trên Spark bằng cách sử dụng các khuôn khổ như Java.

Khám phá dữ liệu với khung dữ liệu

Về cơ bản, Spark sử dụng một cấu trúc dữ liệu gọi là một tập dữ liệu phân tán đàn hồi (RDD); nhưng trong khi bạn có thể viết mã hoạt động trực tiếp với RDD, cấu trúc dữ liệu được sử dụng phổ biến nhất để làm việc với dữ liệu có cấu trúc trong Spark là khung dữ liệu, được cung cấp như một phần của thư viện Spark SQL. Khung dữ liệu trong Spark tương tự như trong thư viện Pandas Python phổ biến, nhưng được tối ưu hóa để làm việc trong môi trường xử lý phân tán của Spark.

Lưu ý

Ngoài API Khung dữ liệu, Spark SQL còn cung cấp MỘT API Tập dữ liệu kiểu mạnh được hỗ trợ trong Java và Scala. Chúng tôi sẽ tập trung vào API Khung dữ liệu trong mô-đun này.

Đang tải dữ liệu vào khung dữ liệu

Hãy khám phá một ví dụ giả thuyết để xem cách bạn có thể sử dụng khung dữ liệu để làm việc với dữ liệu. Giả sử bạn có dữ liệu sau đây trong tệp văn bản được phân cách bằng dấu phẩy có tênproducts.csv trong thư mục dữ liệu trong dung lượng lưu trữ Databricks File System (DBFS) của bạn:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Trong sổ tay Spark, bạn có thể sử dụng mã PySpark sau đây để tải dữ liệu vào khung dữ liệu và hiển thị 10 hàng đầu tiên:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Dòng %pyspark ở đầu được gọi là một phép thuật, và nói với Spark rằng ngôn ngữ được sử dụng trong ô này là PySpark. Dưới đây là mã Scala tương đương cho ví dụ dữ liệu sản phẩm:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

Phép thuật %spark được sử dụng để chỉ định Scala.

Mẹo

Bạn cũng có thể chọn ngôn ngữ mình muốn sử dụng cho từng ô trong giao diện Sổ tay.

Cả hai ví dụ được hiển thị trước đó sẽ tạo ra kết quả như sau:

ID Sản phẩm Tên Sản phẩm Danh mục Giá Danh sách
771 Núi-100 Bạc, 38 Xe đạp leo núi 3399.9900
772 Núi-100 Bạc, 42 Xe đạp leo núi 3399.9900
773 Núi-100 Bạc, 44 Xe đạp leo núi 3399.9900
... ... ... ...

Xác định sơ đồ khung dữ liệu

Trong ví dụ trước, hàng đầu tiên của tệp CSV chứa tên cột và Spark có thể suy ra kiểu dữ liệu của mỗi cột từ dữ liệu chứa trong đó. Bạn cũng có thể chỉ định sơ đồ rõ ràng cho dữ liệu, điều này rất hữu ích khi tên cột không được bao gồm trong tệp dữ liệu, như ví dụ CSV này:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Ví dụ về PySpark sau đây cho thấy cách xác định sơ đồ cho khung dữ liệu được tải từ một tệp có tên product-data.csv định dạng này:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Kết quả một lần nữa sẽ tương tự như sau:

ID Sản phẩm Tên Sản phẩm Danh mục Giá Danh sách
771 Núi-100 Bạc, 38 Xe đạp leo núi 3399.9900
772 Núi-100 Bạc, 42 Xe đạp leo núi 3399.9900
773 Núi-100 Bạc, 44 Xe đạp leo núi 3399.9900
... ... ... ...

Khung dữ liệu lọc và nhóm

Bạn có thể sử dụng các phương pháp của lớp Khung dữ liệu để lọc, sắp xếp, nhóm và thao tác khác với dữ liệu trong đó. Ví dụ: ví dụ về mã sau đây sử dụng phương pháp chọn để truy xuất các cột ProductNameListPrice từ khung dữ liệu df chứa dữ liệu sản phẩm trong ví dụ trước:

pricelist_df = df.select("ProductID", "ListPrice")

Kết quả từ ví dụ mã này sẽ trông giống như thế này:

ID Sản phẩm Giá Danh sách
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Thông thường với hầu hết các phương pháp thao tác dữ liệu, select sẽ trả về một đối tượng khung dữ liệu mới.

Mẹo

Chọn một tập hợp con các cột từ khung dữ liệu là một thao tác phổ biến, cũng có thể đạt được điều này bằng cách sử dụng cú pháp ngắn hơn sau đây:

pricelist_df = df["ProductID", "ListPrice"]

Bạn có thể kết hợp các phương pháp "chuỗi" để thực hiện một loạt các thao tác dẫn đến khung dữ liệu được chuyển đổi. Ví dụ: mã ví dụ này liên kết các chuỗi các lựa chọn và nơi các phương pháp để tạo khung dữ liệu mới chứa cột ProductNameListPrice cho các sản phẩm có một danh mục xe đạp leo núi hoặc Xe đạp đường bộ:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Kết quả từ ví dụ mã này sẽ trông giống như thế này:

Tên Sản phẩm Giá Danh sách
Núi-100 Bạc, 38 3399.9900
Đường-750 Đen, 52 539.9900
... ...

Để nhóm và tổng hợp dữ liệu, bạn có thể sử dụng phương pháp groupBy và các hàm tổng hợp. Ví dụ, mã PySpark sau đây đếm số lượng sản phẩm cho từng thể loại:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Kết quả từ ví dụ mã này sẽ trông giống như thế này:

Danh mục đếm
Tai nghe 3
Bánh xe 14
Xe đạp leo núi 32
... ...

Sử dụng biểu thức SQL trong Spark

API Khung dữ liệu là một phần của thư viện Spark có tên là Spark SQL, cho phép các nhà phân tích dữ liệu sử dụng các biểu thức SQL để truy vấn và thao tác với dữ liệu.

Tạo đối tượng cơ sở dữ liệu trong ca-ta-lô Thu nhỏ

Danh mục Spark là một siêu lưu trữ cho các đối tượng dữ liệu quan hệ như dạng xem và bảng. Thời gian chạy Spark có thể sử dụng danh mục để tích hợp liền mạch mã được viết bằng bất kỳ ngôn ngữ nào được Spark hỗ trợ với biểu thức SQL có thể tự nhiên hơn đối với một số nhà phân tích dữ liệu hoặc nhà phát triển.

Một trong những cách đơn giản nhất để làm cho dữ liệu trong khung dữ liệu sẵn dùng để truy vấn trong danh mục Spark là tạo dạng xem tạm thời, như minh họa trong ví dụ về mã sau đây:

df.createOrReplaceTempView("products")

Dạng xem là tạm thời, nghĩa là dạng xem sẽ tự động bị xóa vào cuối phiên hiện tại. Bạn cũng có thể tạo bảng vẫn tồn tại trong ca-ta-lô để xác định cơ sở dữ liệu có thể được truy vấn bằng Spark SQL.

Lưu ý

Chúng tôi sẽ không tìm hiểu sâu về bảng ca-ta-lô Spark trong mô-đun này, nhưng bạn nên dành thời gian để nêu bật một vài điểm chính:

  • Bạn có thể tạo bảng trống bằng cách sử dụng phương spark.catalog.createTable này. Bảng là cấu trúc siêu dữ liệu lưu trữ dữ liệu cơ sở của chúng trong vị trí lưu trữ liên kết với ca-ta-lô. Việc xóa bảng cũng sẽ xóa dữ liệu cơ sở của bảng đó.
  • Bạn có thể lưu khung dữ liệu dưới dạng bảng bằng cách sử dụng phương pháp saveAsTable bảng.
  • Bạn có thể tạo bảng bên ngoài bằng cách sử dụng phương spark.catalog.createExternalTable pháp. Các bảng bên ngoài xác định siêu dữ liệu trong ca-ta-lô nhưng lấy dữ liệu cơ sở của chúng từ một vị trí lưu trữ bên ngoài; thường là một thư mục trong một hồ dữ liệu. Việc xóa bảng bên ngoài sẽ không xóa dữ liệu cơ sở.

Sử dụng API Spark SQL để truy vấn dữ liệu

Bạn có thể sử dụng API Spark SQL trong mã được viết bằng bất kỳ ngôn ngữ nào để truy vấn dữ liệu trong danh mục. Ví dụ: mã PySpark sau đây sử dụng truy vấn SQL để trả về dữ liệu từ dạng xem sản phẩm dưới dạng khung dữ liệu.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

Kết quả từ ví dụ mã sẽ trông giống như bảng sau đây:

Tên Sản phẩm Giá Danh sách
Núi-100 Bạc, 38 3399.9900
Đường-750 Đen, 52 539.9900
... ...

Sử dụng mã SQL

Ví dụ trước đã minh họa cách sử dụng API Spark SQL để nhúng biểu thức SQL trong mã Spark. Trong sổ tay, bạn cũng có thể sử dụng %sql kỳ diệu để chạy mã SQL truy vấn các đối tượng trong danh mục, như thế này:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

Ví dụ về mã SQL trả về một tập hợp kết quả được tự động hiển thị trong sổ tay dưới dạng bảng, chẳng hạn như tập hợp dưới đây:

Danh mục Số lượng sản phẩm
Bib-Shorts 3
Giá đỡ xe đạp 1
Chân đế Xe đạp 1
... ...