Бөлісу құралы:


Разработка кода конвейера с помощью Python

Lakeflow Spark Декларативные конвейеры (SDP) представляет несколько новых конструкций кода Python для определения материализованных представлений и потоковых таблиц в конвейерах. Поддержка Python для разработки конвейеров основана на основах API PySpark DataFrame и Структурированной потоковой передачи.

Для пользователей, незнакомых с Python и DataFrames, Databricks рекомендует использовать интерфейс SQL. См. раздел "Разработка декларативного конвейера Spark Lakeflow с помощью SQL".

Полный справочник по синтаксису Python SDP Lakeflow см. в справочнике по языку Python для Декларативного конвейера Spark Lakeflow.

Основы Python для разработки конвейеров

Код Python, создающий наборы данных pipeline, должен возвращать DataFrame.

В модуле pyspark.pipelines реализованы все API Python для Декларативного конвейера Spark Lakeflow. Код конвейера, реализованный с помощью Python, должен явно импортировать pipelines модуль в верхней части источника Python. В наших примерах мы используем следующую команду импорта и используем dp в примерах для ссылки pipelines.

from pyspark import pipelines as dp

Замечание

Apache Spark™ включает декларативные конвейеры , начиная с Spark 4.1, доступные pyspark.pipelines через модуль. Среда выполнения Databricks расширяет эти возможности с открытым исходным кодом с помощью дополнительных API и интеграции для управляемого рабочего использования.

Код, написанный с помощью модуля с открытым исходным кодом pipelines , выполняется без изменений в Azure Databricks. Следующие функции не являются частью Apache Spark:

  • dp.create_auto_cdc_flow
  • dp.create_auto_cdc_from_snapshot_flow
  • @dp.expect(...)

Конвейер считывает и записывает данные по умолчанию в каталог и схему, указанную во время настройки конвейера. См. Задайте целевой каталог и схему.

Код Python для конкретного конвейера отличается от других типов кода Python одним критическим образом: код конвейера Python не вызывает функции, которые выполняют прием данных и преобразование для создания наборов данных. Вместо этого SDP интерпретирует функции декоратора из модуля dp во всех файлах исходного кода, включенных в конвейер, и создает граф данных потоков.

Это важно

Чтобы избежать непредвиденного поведения при запуске конвейера, не включайте код, который может иметь побочные эффекты в функциях, определяющих наборы данных. Чтобы узнать больше, смотрите справочник Python.

Создание материализованного представления или потоковой таблицы с помощью Python

Используйте @dp.table для создания таблицы потоковой передачи из результатов потокового чтения. Используйте @dp.materialized_view для создания материализованного представления из результатов пакетного чтения.

По умолчанию материализованное представление и имена потоковой таблицы выводятся из имен функций. В следующем примере кода показан базовый синтаксис для создания материализованного представления и потоковой таблицы:

Замечание

Обе функции ссылаются на одну и ту же таблицу в каталоге samples и используют одну и ту же функцию-декоратор. В этих примерах подчеркивается, что единственное различие в базовом синтаксисе для материализованных представлений и потоковых таблиц заключается в использовании spark.read вместо spark.readStream.

Не все источники данных поддерживают потоковое чтение. Некоторые источники данных всегда должны обрабатываться с семантикой потоковой передачи.

from pyspark import pipelines as dp

@dp.materialized_view()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dp.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

При необходимости можно указать имя таблицы с помощью аргумента name в декораторе @dp.table. В следующем примере показан этот шаблон для материализованного представления и потоковой таблицы:

from pyspark import pipelines as dp

@dp.materialized_view(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dp.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Загрузка данных из хранилища объектов

Конвейеры поддерживают загрузку данных из всех форматов, поддерживаемых Azure Databricks. См. параметры формата данных .

Замечание

В этих примерах используются данные, доступные на /databricks-datasets, автоматически подключённые к вашей рабочей области. Databricks рекомендует использовать пути тома или облачные URI для ссылки на данные, хранящиеся в облачном хранилище объектов. См. статью Что такое тома каталога Unity?.

Databricks рекомендует использовать автозагрузчик и потоковые таблицы при настройке добавочных рабочих нагрузок приема данных, хранящихся в облачном хранилище объектов. См. раздел "Что такое автозагрузчик?".

В следующем примере создается потоковая таблица из JSON-файлов с помощью автозагрузчика:

from pyspark import pipelines as dp

@dp.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

В следующем примере используется пакетная семантика для чтения каталога JSON и создания материализованного представления:

from pyspark import pipelines as dp

@dp.materialized_view()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Проверка данных с учетом ожиданий

Вы можете использовать ожидания для установки и применения ограничений качества данных. См. Управление качеством данных, используя ожидания конвейера.

Следующий код использует @dp.expect_or_drop для определения ожидания с именем valid_data, которая удаляет записи, которые являются null во время приема данных:

from pyspark import pipelines as dp

@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Выполните запрос к материализованным представлениям и потоковым таблицам, которые определены в вашем конвейере

В следующем примере определяются четыре набора данных:

  • Потоковая таблица с именем orders, которая загружает данные JSON.
  • Материализованное представление с именем customers, которое загружает данные CSV.
  • Материализованное представление с именем customer_orders, которое объединяет записи из наборов данных orders и customers, преобразует метку времени заказа в дату и выбирает поля customer_id, order_number, stateи order_date.
  • Материализованное представление под именем daily_orders_by_state, которое агрегирует ежедневное количество заказов для каждого штата.

Замечание

При запросе представлений или таблиц в конвейере можно указать каталог и схему напрямую или использовать значения по умолчанию, настроенные в конвейере. В этом примере таблицы orders, customersи customer_orders записываются и считываются из каталога по умолчанию и схемы, настроенной для конвейера.

Устаревший режим публикации использует схему LIVE для выполнения запросов к другим материализованным представлениям и потоковым таблицам, определенным в конвейере обработки данных. В новых конвейерах синтаксис схемы LIVE автоматически игнорируется. См. LIVE схему (устаревшую версию).

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dp.materialized_view()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dp.materialized_view()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dp.materialized_view()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Создание таблиц в цикле for

Вы можете использовать циклы Python for для создания нескольких таблиц программным способом. Это может быть полезно, если у вас есть множество источников данных или целевых наборов данных, которые зависят только от нескольких параметров, что приводит к снижению общего объема кода для поддержания и уменьшения избыточности кода.

Цикл for оценивает логику в последовательном порядке, но после завершения планирования для наборов данных конвейер выполняет логику параллельно.

Это важно

При использовании этого шаблона для определения наборов данных убедитесь, что список значений, передаваемых в цикл for, всегда является аддитивным. Если набор данных, ранее определенный в конвейере, опущен из будущего запуска конвейера, этот набор данных удаляется автоматически из целевой схемы.

В следующем примере создаются пять таблиц, которые фильтруют заказы клиентов по регионам. Здесь имя региона используется для задания имени целевых материализованных представлений и фильтрации исходных данных. Временные представления используются для определения соединений из исходных таблиц, используемых при создании окончательных материализованных представлений.

from pyspark import pipelines as dp
from pyspark.sql.functions import collect_list, col

@dp.temporary_view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dp.temporary_view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dp.materialized_view(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Ниже приведен пример графа потока данных для этого конвейера:

Граф данных с двумя представлениями, приводящими к пяти региональным таблицам.

Устранение неполадок: цикл for создает множество таблиц с одинаковыми значениями

Модель отложенного выполнения, которую используют конвейеры для оценки кода Python, требует, чтобы ваша логика напрямую ссылалась на отдельные значения, когда функция, декорированная с помощью @dp.materialized_view(), вызывается.

В следующем примере демонстрируется два правильных подхода к определению таблиц с помощью цикла for. В обоих примерах каждое имя таблицы из списка tables явно упоминается в функции, аннотированной @dp.materialized_view().

from pyspark import pipelines as dp

# Create a parent function to set local variables

def create_table(table_name):
  @dp.materialized_view(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dp.materialized_view()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dp.materialized_view(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

В следующем примере не ссылается на ссылочные значения правильно. В этом примере создаются таблицы с отдельными именами, но все таблицы загружают данные из последнего значения в цикле for:

from pyspark import pipelines as dp

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dp.materialized(name=t_name)
  def create_table():
    return spark.read.table(t_name)

Окончательное удаление записей из материализованного представления или потоковой таблицы

Чтобы окончательно удалить записи из материализованного представления или потоковой таблицы с включенными векторами удаления, например для соответствия GDPR, необходимо выполнить дополнительные операции в базовых таблицах Delta объекта. Чтобы обеспечить перманентное удаление записей из материализованного представления, см. удаление записей из материализованного представления с активными векторами удаления. Чтобы гарантировать удаление записей из потоковой таблицы, см. постоянное удаление записей из потоковой таблицы.