Использование Spark для работы с файлами данных

Завершено

Одним из преимуществ использования Spark является возможность написания и выполнения кода на различных языках программирования, что позволяет использовать существующие навыки программирования и выбрать наиболее подходящий язык для данной задачи. По умолчанию Azure Databricks использует в новой записной книжке Spark язык PySpark. Это оптимизированная для Spark версия Python, обычно используемая специалистами по обработке и анализу данных из-за ее хороших возможностей преобразования и визуализации. Кроме того, можно использовать такие языки, как Scala (производный от Java язык, который можно использовать в интерактивном режиме) и SQL (вариант часто используемого языка SQL, включенного в библиотеку Spark SQL для работы с реляционными структурами данных). Инженеры программного обеспечения также могут создавать компилируемые решения на основе Spark с помощью таких платформ, как Java.

Изучение данных с помощью кадров данных

В собственном коде Spark используется структура данных, называемая отказоустойчивым распределенным набором данных (RDD); но хотя вы можете написать код, который работает непосредственно с RDD, чаще всего для работы со структурированными данными в Spark используется кадр данных, предоставляемый в составе библиотеки Spark SQL. Кадры данных в Spark аналогичны тем, которые используются в универсальной библиотеке Pandas Python, но при этом они оптимизированы для работы в распределенной среде обработки Spark.

Примечание.

В дополнение к API кадра данных Spark SQL предоставляет строго типизированный API набора данных, который поддерживается в Java и Scala. В этом модуле мы будем в первую очередь рассматривать API кадра данных.

Загрузка данных в кадр данных

Рассмотрим гипотетический пример использования кадра данных для работы с данными. Предположим, в вашем DBFS-хранилище есть текстовый файл с разделителями-запятыми products.csv в папке data, содержащий следующие данные:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

В записной книжке Spark можно использовать следующий код PySpark для загрузки данных в кадр данных и отображения первых 10 строк:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Строка %pyspark в начале называется магической командой, которая сообщает Spark, что в этой ячейке используется язык PySpark. Вот эквивалентный код Scala для примера данных продуктов:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

Магическая команда %spark используется для указания Scala.

Совет

Вы также можете выбрать язык, который хотите использовать для каждой ячейки, в интерфейсе записной книжки.

Оба приведенные ранее примера имеют следующие выходные данные:

ProductID НаименованиеПродукта Категория ПрейскурантнаяЦена
771 Mountain-100 Silver, 38 Горные велосипеды 3399.9900
772 Mountain-100 Silver, 42 Горные велосипеды 3399.9900
773 Mountain-100 Silver, 44 Горные велосипеды 3399.9900
... ... ... ...

Указание схемы кадра данных

В предыдущем примере первая строка CSV-файла содержала имена столбцов, а Spark могла определять тип данных в каждом столбце на основе содержащихся в нем данных. Вы также можете указать явную схему для данных; это удобно в том случае, если имена столбцов не включены в файл данных, как в этом примере CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

В следующем примере PySpark показано, как указать схему для кадра данных, загружаемого из файла с именем product-data.csv в следующем формате:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Результаты будут снова похожи на:

ProductID НаименованиеПродукта Категория ПрейскурантнаяЦена
771 Mountain-100 Silver, 38 Горные велосипеды 3399.9900
772 Mountain-100 Silver, 42 Горные велосипеды 3399.9900
773 Mountain-100 Silver, 44 Горные велосипеды 3399.9900
... ... ... ...

Фильтрация и группировка кадров данных

Методы класса Dataframe можно использовать для фильтрации, сортировки, группировки и обработки содержащихся в нем данных. Например, в следующем примере кода используется метод select для получения столбцов ProductName и ListPrice из кадра данных df, где содержатся данные продукта, рассматриваемого в предыдущем примере:

pricelist_df = df.select("ProductID", "ListPrice")

Результаты для этого примера кода будут выглядеть примерно так:

ProductID ПрейскурантнаяЦена
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Как правило, при использовании большинства методов обработки данных метод select возвращает новый объект кадра данных.

Совет

Выбор вложенного набора столбцов из кадра данных — это распространенная операция, которую также можно выполнить с помощью следующего, более короткого синтаксиса:

pricelist_df = df["ProductID", "ListPrice"]

Можно объединить методы в цепочку для выполнения ряда операций, которые приводят к преобразованию кадра данных. Например, в этом примере кода показано объединение в цепочку методов select и where для создания нового кадра данных, содержащего столбцы ProductName и ListPrice для продуктов в категории Mountain Bikes или Road Bikes:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Результаты для этого примера кода будут выглядеть примерно так:

НаименованиеПродукта ПрейскурантнаяЦена
Mountain-100 Silver, 38 3399.9900
Road-750 Black, 52 539.9900
... ...

Для группирования и агрегирования данных можно использовать метод groupBy и статистические функции. Например, следующий код PySpark подсчитывает количество продуктов для каждой категории:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Результаты для этого примера кода будут выглядеть примерно так:

Категория count
Рулевые колонки 3
Колеса 14
Горные велосипеды 32
... ...

Использование выражений SQL в Spark

API кадра данных входит в библиотеку Spark с именем Spark SQL, позволяя аналитикам данных использовать выражения SQL для запроса данных и управления ими.

Создание объектов базы данных в каталоге Spark

Каталог Spark — это хранилище метаданных для реляционных объектов данных, таких как представления и таблицы. Среда выполнения Spark может использовать каталог для простой и эффективной интеграции кода, написанного на любом языке, поддерживаемом Spark, с использованием выражений SQL, которые будут более привычными для многих аналитиков и разработчиков данных.

Одним из самых простых способов сделать данные в кадре данных доступными для запроса в каталоге Spark, является создание временного представления, как показано в следующем примере кода:

df.createOrReplaceTempView("products")

Представление является временным, то есть оно автоматически удаляется в конце текущего сеанса. Вы также можете создавать таблицы, которые сохраняются в каталоге, чтобы определить базу данных, которую можно запросить с помощью Spark SQL.

Примечание.

Мы не будем подробно изучать таблицы каталога Spark в этом модуле, однако стоит выделить несколько ключевых моментов:

  • Можно создать пустую таблицу с помощью метода spark.catalog.createTable. Таблицы — это структуры метаданных, базовые данные которых хранятся в расположении хранилища, связанном с каталогом. При удалении таблицы также удаляются базовые данные.
  • Кадр данных можно сохранить в виде таблицы с помощью метода saveAsTable.
  • Можно создать внешнюю таблицу с помощью метода spark.catalog.createExternalTable. Внешние таблицы определяют метаданные в каталоге, но получают их базовые данные из внешнего расположения хранилища; обычно это папка в озере данных. При удалении внешней таблицы базовые данные не удаляются.

Использование API SQL Spark для запроса данных

API SQL Spark можно использовать в коде, написанном на любом языке, чтобы запрашивать данные в каталоге. Например, следующий код PySpark использует запрос SQL для возврата данных из представления продуктов в виде кадра данных.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

Результаты из примера кода будут выглядеть примерно так:

НаименованиеПродукта ПрейскурантнаяЦена
Mountain-100 Silver, 38 3399.9900
Road-750 Black, 52 539.9900
... ...

Использование кода SQL

В предыдущем примере показано, как использовать API SQL Spark для внедрения выражений SQL в код Spark. В записной книжке можно также использовать магическую команду %sql для запуска SQL кода, который запрашивает объекты в каталоге, примерно следующим образом:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

В примере кода SQL возвращается набор результатов, который автоматически отображается в записной книжке в виде таблицы, как показано ниже.

Категория ProductCount
Велошорты 3
Багажники для велосипедов 1
Велосипедные стойки 1
... ...