Поделиться через


Функции для определения наборов данных

Модуль pyspark.pipelines, который здесь обозначен как dp, реализует большую часть своей основной функциональности с помощью декораторов. Эти декораторы принимают функцию, которая определяет потоковый или пакетный запрос и возвращает кадр данных Apache Spark. В следующем синтаксисе показан простой пример определения набора данных конвейера:

from pyspark import pipelines as dp

@dp.table()
def function_name(): # This is the function decorated
  return (<query>) # This is the query logic that defines the dataset

На этой странице представлен обзор функций и запросов, определяющих наборы данных в конвейерах. Для получения полного списка доступных декораторов см. справочник разработчика конвейера.

Функции, используемые для определения наборов данных, не должны включать произвольную логику Python, не связанную с набором данных, включая вызовы сторонних API. Конвейеры выполняют эти функции несколько раз во время планирования, проверки и обновления. Включение произвольной логики может привести к непредвиденным результатам.

Чтение данных для начала определения набора данных

Функции, используемые для определения наборов данных конвейера, обычно начинаются с spark.read или spark.readStream операции. Эти операции чтения возвращают статический или потоковый объект DataFrame, который используется для выполнения дополнительных преобразований перед возвратом DataFrame. Другие примеры операций Spark, возвращающих кадр данных, включают spark.tableили spark.range.

Функции никогда не должны ссылаться на DataFrame, объявленные за пределами функции. Попытка ссылаться на DataFrame, определенные в другой области видимости, может вызвать неожиданное поведение. Пример шаблона метапрограммирования для создания нескольких таблиц см. в разделе "Создание таблиц в циклеfor".

В следующих примерах показан базовый синтаксис для чтения данных с помощью пакетной или потоковой логики:

from pyspark import pipelines as dp

# Batch read on a table
@dp.materialized_view()
def function_name():
  return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dp.materialized_view()
def function_name():
  return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dp.table()
def function_name():
  return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dp.table()
def function_name():
  return (spark.read
    .format("cloudFiles")
    .option("cloudFile.format", "parquet")
    .load("/Volumes/catalog_name/schema_name/volume_name/data_path")
  )

Если необходимо считывать данные из внешнего REST API, реализуйте это подключение с помощью пользовательского источника данных Python. См. настраиваемые источники данных PySpark.

Замечание

Можно создавать произвольные фреймы данных Apache Spark из коллекций данных Python, включая pandas DataFrames, словарей и списков. Эти шаблоны могут быть полезны во время разработки и тестирования, но большинство определений набора данных конвейера рабочей среды должны начинаться с загрузки данных из файлов, внешней системы или существующей таблицы или представления.

Цепочка преобразований

Конвейеры поддерживают почти все преобразования Кадра данных Apache Spark. Вы можете включить любое количество преобразований в функцию определения набора данных, но следует убедиться, что используемые методы всегда возвращают объект DataFrame.

Если у вас есть промежуточное преобразование, которое обслуживает несколько связанных рабочих нагрузок, но вам не нужно материализовать его как таблицу, используйте @dp.temporary_view(), чтобы добавить временное представление в ваш конвейер. Затем вы можете ссылаться на это представление с помощью spark.read.table("temp_view_name") в нескольких определениях последующих наборов данных. Следующий синтаксис демонстрирует этот шаблон:

from pyspark import pipelines as dp

@dp.temporary_view()
def a():
  return spark.read.table("source").filter(...)

@dp.materialized_view()
def b():
  return spark.read.table("a").groupBy(...)

@dp.materialized_view()
def c():
  return spark.read.table("a").groupBy(...)

Это гарантирует, что конвейер имеет полное представление о преобразованиях в вашем представлении во время планирования конвейера и предотвращает потенциальные проблемы, связанные с произвольным кодом Python, выполняющимся вне определений набора данных.

В вашей функции вы можете объединять DataFrame-ы, чтобы создавать новые DataFrame-ы, не записывая промежуточные результаты в виде представлений, материализованных представлений или потоковых таблиц, как показано в следующем примере:

from pyspark import pipelines as dp

@dp.table()
def multiple_transformations():
  df1 = spark.read.table("source").filter(...)
  df2 = df1.groupBy(...)
  return df2.filter(...)

Если все кадры данных выполняют свои первоначальные операции чтения с помощью пакетной логики, результат возврата является статическим кадром данных. Если у вас есть какие-либо запросы для потоковой обработки, ваш результат возвращает потоковый кадр данных.

Возврат кадра данных

Используйте @dp.table для создания таблицы потоковой передачи из результатов потокового чтения. Используйте @dp.materialized_view для создания материализованного представления из результатов пакетного чтения. Большинство других декораторов работают как с потоковыми, так и статическими DataFrame, в то время как некоторым требуется потоковый DataFrame.

Функция, используемая для определения набора данных, должна возвращать кадр данных Spark. Никогда не используйте методы, которые сохраняют или записывают данные в файлы или таблицы при работе с кодом набора данных в конвейере.

Примеры операций Apache Spark, которые никогда не должны использоваться в коде конвейера:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

Замечание

Конвейеры также поддерживают использование Pandas в Spark для функций определения набора данных. См. API Pandas в Spark.

Использование SQL в конвейере Python

PySpark поддерживает spark.sql оператор для записи кода Кадра данных с помощью SQL. При использовании этого шаблона в исходном коде конвейера, он преобразуется в материализованные представления или потоковые таблицы.

Следующий пример кода эквивалентен использованию spark.read.table("catalog_name.schema_name.table_name") логики запроса набора данных:

@dp.materialized_view
def my_table():
  return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.read и dlt.read_stream (устаревшая версия)

Старый dlt модуль содержит dlt.read() и dlt.read_stream() функции, которые были введены для поддержки функциональности в устаревшем режиме публикации конвейера. Эти методы поддерживаются, но Databricks рекомендует всегда использовать функции spark.read.table() и spark.readStream.table() по следующим причинам:

  • Функции dlt имеют ограниченную поддержку чтения наборов данных, определенных вне текущего конвейера.
  • Функции spark поддерживают указание параметров, таких как skipChangeCommits, для операций чтения. Указание параметров не поддерживается функциями dlt .
  • Сам dlt модуль заменен модулем pyspark.pipelines . Databricks рекомендует использовать from pyspark import pipelines as dp для импорта pyspark.pipelines в коде конвейеров на Python.