Преобразование данных с помощью SQL

Завершено

Библиотека SparkSQL, которая предоставляет структуру кадра данных, также позволяет использовать SQL в качестве способа работы с данными. С помощью этого подхода можно запрашивать и преобразовывать данные в кадрах данных с помощью SQL-запросов и сохранять результаты в виде таблиц.

Примечание.

Таблицы — это абстракции метаданных по файлам. Данные не хранятся в реляционной таблице, но таблица предоставляет реляционный слой по файлам в озере данных.

Определение таблиц и представлений

Определения таблиц в Spark хранятся в хранилище метаданных, слой метаданных, который инкапсулирует реляционные абстракции по файлам. Внешние таблицы — это реляционные таблицы в хранилище метаданных, ссылающиеся на файлы в заданном расположении озера данных. Доступ к этим данным можно получить, запрашивая таблицу или считывая файлы непосредственно из озера данных.

Примечание.

Внешние таблицы "свободно привязаны" к базовым файлам и удаление таблицы не удаляет файлы. Это позволяет использовать Spark для выполнения тяжелого подъема преобразования, а затем сохранять данные в озере. После этого можно удалить таблицу и подчиненные процессы, чтобы получить доступ к этим оптимизированным структурам. Можно также определить управляемые таблицы, для которых базовые файлы данных хранятся во внутреннем управляемом расположении хранилища, связанном с хранилищем метаданных. Управляемые таблицы "тесно привязаны" к файлам, и удаление управляемой таблицы удаляет связанные файлы.

В следующем примере кода кадр данных (загружен из CSV-файлов) сохраняется как имя внешней таблицы sales_orders. Файлы хранятся в папке /sales_orders_table в озере данных.

order_details.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='/sales_orders_table')

Использование SQL для запроса и преобразования данных

После определения таблицы можно использовать SQL для запроса и преобразования данных. Следующий код создает два производных столбца с именем Year и Month , а затем создает новую таблицу transformed_orders с новыми производными столбцами.

# Create derived columns
sql_transform = spark.sql("SELECT *, YEAR(OrderDate) AS Year, MONTH(OrderDate) AS Month FROM sales_orders")

# Save the results
sql_transform.write.partitionBy("Year","Month").saveAsTable('transformed_orders', format='parquet', mode='overwrite', path='/transformed_orders_table')

Файлы данных для новой таблицы хранятся в иерархии папок с форматом Year=*NNNN* / Month=*N*, с каждой папкой, содержащей файл parquet для соответствующих заказов по годам и месяцам.

Запрос хранилища метаданных

Так как эта новая таблица была создана в хранилище метаданных, вы можете использовать SQL для запроса непосредственно с помощью магического ключа %%sql в первой строке, чтобы указать, что синтаксис SQL будет использоваться, как показано в следующем скрипте:

%%sql

SELECT * FROM transformed_orders
WHERE Year = 2021
    AND Month = 1

Удаление таблиц

При работе с внешними таблицами можно использовать DROP команду для удаления определений таблиц из хранилища метаданных, не затрагивая файлы в озере данных. Этот подход позволяет очистить хранилище метаданных после использования SQL для преобразования данных, делая преобразованные файлы данных доступными для потокового анализа и приема данных.

%%sql

DROP TABLE transformed_orders;
DROP TABLE sales_orders;