Поделиться через


Загрузка данных в конвейерах

Вы можете загрузить данные из любого источника данных, поддерживаемого Apache Spark в Azure Databricks, с помощью конвейеров. Наборы данных (таблицы и представления) можно определить в декларативных конвейерах Spark Lakeflow для любого запроса, возвращающего кадр данных Spark, включая потоковую передачу кадров данных и Pandas для кадров данных Spark. Для задач загрузки данных Databricks рекомендует использовать потоковые таблицы для большинства сценариев. Потоковые таблицы подходят для загрузки данных из облачного хранилища объектов с помощью автозагрузчика или из систем обмена сообщениями, таких как Kafka.

Замечание

  • Не все источники данных поддерживают ввод данных через SQL. Вы можете смешивать источники SQL и Python в конвейерах, чтобы использовать Python, где это необходимо, и SQL для других операций в том же конвейере.
  • Дополнительные сведения о работе с библиотеками, не упакованными в Декларативные конвейеры Lakeflow Spark по умолчанию, см. в разделе "Управление зависимостями Python для конвейеров".
  • Общие сведения о поглощении данных в Azure Databricks можно найти в статье "Стандартные соединители" в Lakeflow Connect.

В приведенных ниже примерах показаны некоторые распространенные шаблоны.

Загрузка из существующей таблицы

Загрузите данные из любой существующей таблицы в Azure Databricks. Вы можете преобразовать данные с помощью запроса или загрузить таблицу для дальнейшей обработки в конвейере.

В следующем примере данные считываются из существующей таблицы:

Питон

@dp.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

загружать файлы из облачного хранилища объектов

Databricks рекомендует использовать автозагрузчик в конвейерах для большинства задач приема данных из облачного хранилища объектов или из файлов в томе каталога Unity. Автозагрузчик и конвейеры предназначены для добавочной и идемпотентной загрузки постоянно растущих данных по мере поступления в облачное хранилище.

См. раздел "Что такое автозагрузчик" и "Загрузка данных из хранилища объектов".

В следующем примере данные из облачного хранилища считываются с помощью автозагрузчика:

Питон

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

В следующих примерах используется автозагрузчик для создания наборов данных из CSV-файлов в томе каталога Unity:

Питон

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

Замечание

  • Если вы используете автозагрузчик с уведомлениями о файлах и запускаете полное обновление для конвейера или потоковой таблицы, необходимо вручную очистить ресурсы. Для выполнения очистки можно использовать CloudFilesResourceManager в записной книжке.
  • Чтобы загрузить файлы с помощью Auto Loader в конвейере с поддержкой каталога Unity, нужно применять внешние расположения . Дополнительные сведения об использовании каталога Unity с конвейерами см. в статье "Использование каталога Unity с конвейерами".

Загрузка данных из шины сообщений

Пайплайны можно настроить для получения данных из шины сообщений. Databricks рекомендует использовать потоковые таблицы с непрерывным выполнением и улучшенным авто-масштабированием, чтобы обеспечить наиболее эффективную загрузку с низкой задержкой из шины сообщений. См. статью "Оптимизация использования кластеров Декларативных конвейеров Spark Lakeflow с помощью автомасштабирования".

Например, следующий код настраивает потоковую таблицу для приема данных из Kafka с помощью функции read_kafka :

Питон

from pyspark import pipelines as dp

@dp.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

Чтобы получать данные из других источников шины сообщений, смотрите:

Загрузка данных из Центров событий Azure

Центры событий Azure — это служба потоковой передачи данных, которая предоставляет совместимый интерфейс Apache Kafka. Вы можете использовать соединитель Kafka для структурированной потоковой передачи, включенный в среду выполнения Lakeflow Spark Declarative Pipelines, чтобы загружать сообщения из Центров событий Azure. Дополнительные сведения о загрузке и обработке сообщений из Центров событий Azure см. в статье "Использование Центров событий Azure в качестве источника данных конвейера".

Загрузка данных из внешних систем

Декларативные конвейеры Spark Lakeflow поддерживают загрузку данных из любого источника данных, поддерживаемого Azure Databricks. См. статью "Подключение к источникам данных" и внешним службам. Вы также можете загрузить внешние данные, используя Lakehouse Federation, для поддерживаемых источников данных . Так как для федерации Lakehouse требуется среда выполнения Databricks 13.3 LTS или более поздней версии, для использования федерации Lakehouse ваш конвейер должен быть настроен на использование предварительного канала.

Некоторые источники данных не поддерживают эквивалентную поддержку в SQL. Если вы не можете использовать федерацию Lakehouse с одним из этих источников данных, вы можете использовать Python для приема данных из источника. Исходные файлы Python и SQL можно добавить в тот же конвейер. В следующем примере объявляется материализованное представление для доступа к текущему состоянию данных в удаленной таблице PostgreSQL:

import dp

@dp.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Загрузка небольших или статических наборов данных из облачного хранилища объектов

Вы можете загружать небольшие или статические наборы данных с помощью синтаксиса загрузки Apache Spark. Декларативные конвейеры Spark Lakeflow поддерживают все форматы файлов, поддерживаемые Apache Spark в Azure Databricks. Полный список см. в разделе "Параметры формата данных".

В следующих примерах показано, как загрузить JSON для создания таблицы:

Питон

@dp.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

Замечание

Функция read_files SQL распространена во всех средах SQL в Azure Databricks. Рекомендуется использовать шаблон прямого доступа к файлам с помощью SQL в конвейерах. Дополнительные сведения см. в разделе "Параметры".

Загрузка данных из пользовательского источника данных Python

Пользовательские источники данных Python позволяют загружать данные в пользовательских форматах. Вы можете написать код для чтения и записи в определенный внешний источник данных или использовать существующий код Python в существующих системах для чтения данных из собственных внутренних систем. Дополнительные сведения о разработке источников данных Python см. в разделе "Пользовательские источники данных PySpark".

Чтобы использовать пользовательский источник данных Python для загрузки данных в конвейер, зарегистрируйте его с именем формата, например my_custom_datasource, а затем считайте данные из него.

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
    return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
    return spark.readStream.format("my_custom_datasource").load()

Сконфигурируйте потоковую таблицу для игнорирования изменений в исходной потоковой таблице

Замечание

  • Флаг skipChangeCommits работает только с spark.readStream при использовании функции option(). Этот флаг нельзя использовать в dp.read_stream() функции.
  • Невозможно использовать skipChangeCommits флаг, если исходная потоковая таблица определена как цель функции create_auto_cdc_flow().

По умолчанию для потоковых таблиц требуются источники, которые предназначены для добавления. Если потоковая таблица использует другую потоковую таблицу в качестве источника, и для исходной потоковой таблицы требуются обновления или удаления, например, для обработки в соответствии с GDPR «право на забвение», можно установить флаг skipChangeCommits при чтении исходной потоковой таблицы, чтобы игнорировать эти изменения. Дополнительные сведения об этом флаге см. в разделе "Игнорировать обновления и удаления".

@dp.table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Обеспечение безопасного доступа к учетным данным хранилища с секретами в потоке обработки

Секреты Azure Databricks можно использовать для хранения учетных данных, таких как ключи доступа или пароли. Чтобы настроить секрет в конвейере, используйте свойство Spark в конфигурации кластера параметров конвейера. См. статью "Настройка классических вычислений для конвейеров".

В следующем примере используется секрет для хранения ключа доступа, необходимого для чтения входных данных из учетной записи хранения Azure Data Lake Storage (ADLS) с помощью автозагрузчика. Этот же метод можно использовать для настройки любого секрета, необходимого для конвейера, например ключей AWS для доступа к S3 или пароля к хранилищу метаданных Apache Hive.

Дополнительные сведения о работе с Azure Data Lake Storage см. в статье "Подключение к Azure Data Lake Storage и хранилищу BLOB-объектов".

Замечание

Необходимо добавить префикс spark.hadoop. в ключ конфигурации spark_conf, который задает значение секрета.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

Заменить

  • <storage-account-name> с именем учетной записи хранения ADLS.
  • <scope-name> с именем области секретных данных Azure Databricks.
  • <secret-name> с именем ключа, содержащего ключ доступа к учетной записи хранения Azure.
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Заменить

  • <container-name> с именем контейнера учетной записи хранения Azure, в которой хранятся входные данные.
  • <storage-account-name> с именем учетной записи хранения ADLS.
  • <path-to-input-dataset> по пути к входному набору данных.