Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Вы можете загрузить данные из любого источника данных, поддерживаемого Apache Spark в Azure Databricks, с помощью конвейеров. Вы можете определить наборы данных — таблицы и представления — в Декларативных конвейерах Spark Lakeflow для любого запроса, возвращающего кадр данных Spark, включая потоковую передачу кадров данных и Pandas для кадров данных Spark. Для задач загрузки данных Databricks рекомендует использовать потоковые таблицы для большинства сценариев. Потоковые таблицы полезны для приема данных из облачного хранилища объектов с помощью Auto Loader или из шин сообщений, таких как Kafka.
Не все источники данных поддерживают ввод данных через SQL. Однако вы можете смешивать источники SQL и Python в одном конвейере, чтобы использовать Python по необходимости. Дополнительные сведения о работе с библиотеками, не упакованными в Декларативные конвейеры Lakeflow Spark по умолчанию, см. в разделе "Управление зависимостями Python для конвейеров". Общие сведения о поглощении данных в Azure Databricks можно найти в статье "Стандартные соединители" в Lakeflow Connect.
В следующих примерах показаны некоторые распространенные шаблоны загрузки данных.
Загрузка из существующей таблицы
Загрузите данные из любой существующей таблицы в Azure Databricks. Вы можете преобразовать данные с помощью запроса или загрузить таблицу для дальнейшей обработки в конвейере.
Питон
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
загружать файлы из облачного хранилища объектов
Databricks рекомендует использовать автозагрузчик в конвейерах для большинства задач приема данных из облачного хранилища объектов или из файлов в томе каталога Unity. Автозагрузчик и конвейеры предназначены для добавочной и идемпотентной загрузки постоянно растущих данных по мере поступления в облачное хранилище. См. раздел "Что такое автозагрузчик" и "Загрузка данных из хранилища объектов".
В следующем примере данные из облачного хранилища считываются с помощью автозагрузчика.
Питон
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
В следующих примерах используется автозагрузчик для создания наборов данных из CSV-файлов в томе каталога Unity.
Питон
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
Замечание
- Если вы используете автозагрузчик с уведомлениями о файлах и запускаете полное обновление для конвейера или потоковой таблицы, необходимо вручную очистить ресурсы. Для выполнения очистки можно использовать CloudFilesResourceManager в записной книжке.
- Чтобы загрузить файлы с помощью Auto Loader в конвейере с поддержкой каталога Unity, нужно применять внешние расположения . Дополнительные сведения об использовании каталога Unity с конвейерами см. в статье "Использование каталога Unity с конвейерами".
Проверка подлинности в облачном хранилище
Автозагрузчик использует внешние местоположения каталога Unity для аутентификации в облачных хранилищах. Требуется настроить внешнее расположение для пути хранения, из которого вы хотите читать, и предоставить привилегию пользователю.
Чтобы загрузить данные из Azure Data Lake Storage, настройте внешнее расположение, поддерживаемое учетными данными доступа к хранилищу, ссылающимися на контейнер хранилища. Дополнительные сведения см. в разделе "Подключение к облачному хранилищу объектов" с помощью каталога Unity.
Загрузка данных из шины сообщений
Пайплайны можно настроить для получения данных из шины сообщений. Databricks рекомендует использовать потоковые таблицы с непрерывным выполнением и улучшенным авто-масштабированием, чтобы обеспечить наиболее эффективную загрузку с низкой задержкой из шины сообщений. Дополнительные сведения см. в статье "Оптимизация использования кластера Lakeflow Spark Декларативных конвейеров с помощью автомасштабирования".
Например, следующий код настраивает потоковую таблицу для приема данных из Kafka с помощью функции read_kafka .
Питон
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Чтобы получать данные из других источников шины сообщений, смотрите:
- Kinesis: read_kinesis
- Тема Pub/Sub: read_pubsub
- Пульсар: read_pulsar
Загрузите данные из Центры событий Azure
Центры событий Azure — это служба потоковой передачи данных, которая предоставляет совместимый интерфейс Apache Kafka. Вы можете использовать соединитель Kafka для структурированной потоковой передачи, включенный в среду выполнения Lakeflow Spark Declarative Pipelines, чтобы загружать сообщения из Центров событий Azure. Дополнительные сведения о загрузке и обработке сообщений из Центров событий Azure см. в статье "Использование Центров событий Azure в качестве источника данных конвейера".
Загрузка данных из внешних систем
Декларативные конвейеры Spark Lakeflow поддерживают загрузку данных из любого источника данных, поддерживаемого Azure Databricks. См. статью "Подключение к источникам данных" и внешним службам. Вы также можете загрузить внешние данные, используя Lakehouse Federation, для поддерживаемых источников данных . Так как для федерации Lakehouse требуется Databricks Runtime 13.3 LTS или более поздней версии, чтобы использовать федерацию Lakehouse, настройте конвейер для использования канала предварительной версии.
Некоторые источники данных не поддерживают эквивалентную поддержку SQL. Если вы не можете использовать федерацию Lakehouse с одним из этих источников данных, вы можете использовать Python для приема данных из источника. Исходные файлы Python и SQL можно добавить в тот же конвейер. В следующем примере объявляется материализованное представление для доступа к текущему состоянию данных в удаленной таблице PostgreSQL.
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Загрузка небольших или статических наборов данных из облачного хранилища объектов
Вы можете загружать небольшие или статические наборы данных с помощью синтаксиса загрузки Apache Spark. Декларативные конвейеры Spark Lakeflow поддерживают все форматы файлов, поддерживаемые Apache Spark в Azure Databricks. Полный список см. в разделе "Параметры формата данных".
В следующих примерах показано, как загрузить JSON для создания таблицы.
Питон
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
Замечание
Функция read_files SQL распространена во всех средах SQL в Azure Databricks. Рекомендуется использовать шаблон прямого доступа к файлам с помощью SQL в конвейерах. Дополнительные сведения см. в разделе "Параметры".
Загрузка данных из пользовательского источника данных Python
Пользовательские источники данных Python позволяют загружать данные в пользовательских форматах. Вы можете написать код для чтения и записи в определенный внешний источник данных или использовать существующий Python код для чтения данных из собственных внутренних систем. Дополнительные сведения о разработке источников данных Python см. в разделе "Пользовательские источники данных PySpark".
В следующем примере регистрируется пользовательский источник данных с именем формата my_custom_datasource и выполняется чтение из него как в пакетном, так и в потоковом режиме.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
Сконфигурируйте потоковую таблицу для игнорирования изменений в исходной потоковой таблице
По умолчанию для потоковых таблиц требуются источники, которые предназначены для добавления. Если для исходной потоковой таблицы требуются обновления или удаления, например для обработки GDPR "право быть забытым", используйте skipChangeCommits флаг, чтобы игнорировать эти изменения. Этот флаг работает только с spark.readStream, используя функцию option(), и не может использоваться, если исходная потоковая таблица является целью функции create_auto_cdc_flow(). Дополнительные сведения см. в разделе "Обработка изменений в исходных таблицах Delta".
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Обеспечение безопасного доступа к учетным данным хранилища с секретами в потоке обработки
Секреты Azure Databricks можно использовать для хранения учетных данных, таких как ключи доступа или пароли. Чтобы настроить секрет в конвейере, используйте свойство Spark в конфигурации кластера параметров конвейера. См. статью "Настройка классических вычислений для конвейеров".
В следующем примере используется секрет для хранения ключа доступа, необходимого для чтения входных данных из учетной записи хранения Azure Data Lake Storage с помощью автозагрузчика. Этот же метод можно использовать для настройки любого секрета, необходимого для конвейера, например ключей AWS для доступа к S3 или пароля к хранилищу метаданных Apache Hive.
Дополнительные сведения о работе с Azure Data Lake Storage см. в статье "Подключение к Azure Data Lake Storage и хранилищу BLOB-объектов".
Замечание
Необходимо добавить префикс spark.hadoop. в ключ конфигурации spark_conf, который задает значение секрета.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
В этом примере кода замените следующие значения.
| Заполнитель | Заменить на |
|---|---|
<container-name> |
Имя контейнера учетной записи хранения Azure. |
<storage-account-name> |
Имя учетной записи хранения ADLS. |
<path> |
Путь к выходным данным и метаданным конвейера. |
<scope-name> |
Имя области секрета Azure Databricks. |
<secret-name> |
Имя ключа, содержащего ключ доступа к учетной записи хранения Azure. |
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
В этом примере кода замените следующие значения.
| Заполнитель | Заменить на |
|---|---|
<container-name> |
Имя контейнера учетной записи хранения Azure, в которой хранятся входные данные. |
<storage-account-name> |
Имя учетной записи хранения ADLS. |
<path-to-input-dataset> |
Путь к входному набору данных. |