Преобразование данных с помощью SQL
Библиотека SparkSQL, которая предоставляет структуру кадра данных, также позволяет использовать SQL в качестве способа работы с данными. С помощью этого подхода можно запрашивать и преобразовывать данные в кадрах данных с помощью SQL-запросов и сохранять результаты в виде таблиц.
Примечание.
Таблицы — это абстракции метаданных по файлам. Данные не хранятся в реляционной таблице, но таблица предоставляет реляционный слой по файлам в озере данных.
Определение таблиц и представлений
Определения таблиц в Spark хранятся в хранилище метаданных, слой метаданных, который инкапсулирует реляционные абстракции по файлам. Внешние таблицы — это реляционные таблицы в хранилище метаданных, ссылающиеся на файлы в заданном расположении озера данных. Доступ к этим данным можно получить, запрашивая таблицу или считывая файлы непосредственно из озера данных.
Примечание.
Внешние таблицы "свободно привязаны" к базовым файлам и удаление таблицы не удаляет файлы. Это позволяет использовать Spark для выполнения тяжелого подъема преобразования, а затем сохранять данные в озере. После этого можно удалить таблицу и подчиненные процессы, чтобы получить доступ к этим оптимизированным структурам. Можно также определить управляемые таблицы, для которых базовые файлы данных хранятся во внутреннем управляемом расположении хранилища, связанном с хранилищем метаданных. Управляемые таблицы "тесно привязаны" к файлам, и удаление управляемой таблицы удаляет связанные файлы.
В следующем примере кода кадр данных (загружен из CSV-файлов) сохраняется как имя внешней таблицы sales_orders. Файлы хранятся в папке /sales_orders_table в озере данных.
order_details.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='/sales_orders_table')
Использование SQL для запроса и преобразования данных
После определения таблицы можно использовать SQL для запроса и преобразования данных. Следующий код создает два производных столбца с именем Year и Month , а затем создает новую таблицу transformed_orders с новыми производными столбцами.
# Create derived columns
sql_transform = spark.sql("SELECT *, YEAR(OrderDate) AS Year, MONTH(OrderDate) AS Month FROM sales_orders")
# Save the results
sql_transform.write.partitionBy("Year","Month").saveAsTable('transformed_orders', format='parquet', mode='overwrite', path='/transformed_orders_table')
Файлы данных для новой таблицы хранятся в иерархии папок с форматом Year=*NNNN* / Month=*N*, с каждой папкой, содержащей файл parquet для соответствующих заказов по годам и месяцам.
Запрос хранилища метаданных
Так как эта новая таблица была создана в хранилище метаданных, вы можете использовать SQL для запроса непосредственно с помощью магического ключа %%sql в первой строке, чтобы указать, что синтаксис SQL будет использоваться, как показано в следующем скрипте:
%%sql
SELECT * FROM transformed_orders
WHERE Year = 2021
AND Month = 1
Удаление таблиц
При работе с внешними таблицами можно использовать DROP
команду для удаления определений таблиц из хранилища метаданных, не затрагивая файлы в озере данных. Этот подход позволяет очистить хранилище метаданных после использования SQL для преобразования данных, делая преобразованные файлы данных доступными для потокового анализа и приема данных.
%%sql
DROP TABLE transformed_orders;
DROP TABLE sales_orders;