可以使用管道从任何由 Azure Databricks 上的 Apache Spark 支持的数据源加载数据。 可以在 Lakeflow Spark 声明性管道中针对返回 Spark 数据帧的任何查询(包括 Spark 数据帧的流式处理数据帧和 Pandas)定义数据集(表和视图)。 对于数据引入任务,Databricks 建议为大多数用例使用流式表。 流式处理表可用于通过自动加载器或 Kafka 等消息总线从云对象存储中引入数据。
并非所有数据源都支持引入 SQL。 但是,可以在同一管道中混合 SQL 和Python源,以便根据需要使用Python。 有关使用默认情况下未打包在 Lakeflow Spark 声明性管道中的库的详细信息,请参阅 管理管道的 Python 依赖项。 有关 Azure Databricks 中引入的常规信息,请参阅 Lakeflow Connect 中的标准连接器。
以下示例演示了一些常见的数据加载模式。
从现有表加载
从 Azure Databricks 中的任何现有表加载数据。 可以使用查询转换数据,也可以加载表,以便在管道中进一步处理。
Python
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
从云对象存储加载文件
Databricks 建议在管道中使用自动加载程序,以便从云对象存储或 Unity 目录卷中的文件执行大多数数据引入任务。 自动加载器和管道旨在以幂等和增量的方式加载随着到达云存储而不断增长的数据。 请参阅 什么是自动加载程序? 以及 从对象存储加载数据。
以下示例使用自动加载程序从云存储读取数据。
Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
以下示例使用自动加载程序从 Unity 目录卷中的 CSV 文件创建数据集。
Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
注释
- 如果将 Auto Loader 与文件通知配合使用,并对你的管道或流式表进行完全刷新,则你必须手动清理你的资源。 可以使用笔记本中的 CloudFilesResourceManager 执行清理。
- 若要在启用 Unity Catalog 的管道中使用 Auto Loader 加载文件,必须使用 外部位置。 若要详细了解如何将 Unity 目录与管道配合使用,请参阅 将 Unity 目录与管道配合使用。
向云存储进行身份验证
自动加载程序使用 Unity 目录外部位置对云存储进行身份验证。 必须为要从中读取的存储路径配置外部位置,并向执行用户授予 READ FILES 权限。
若要从Azure Data Lake Storage引入,请配置一个由存储凭据支持的外部位置,该凭据引用一个存储容器。 有关详细信息,请参阅 使用 Unity 目录连接到云对象存储。
从消息总线加载数据
可以将管道配置为从消息总线引入数据。 Databricks 建议使用流式处理表与连续执行和增强型自动缩放,以最高效地引入来自消息总线的低延迟加载。 有关详细信息,请参阅 使用自动缩放优化 Lakeflow Spark 声明性管道的群集利用率。
例如,以下代码将流式处理表配置为使用 read_kafka 函数从 Kafka 引入数据。
Python
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
若要从其他消息总线源引入,请参阅:
- Kinesis:read_kinesis
- 发布/订阅主题:read_pubsub
- 脉冲:read_pulsar
从Azure 事件中心加载数据
Azure 事件中心是一种数据流式处理服务,提供 Apache Kafka 兼容接口。 可以使用 Lakeflow Spark 声明性管道运行时中包含的结构化流式处理 Kafka 连接器从 Azure 事件中心加载消息。 若要详细了解如何从 Azure 事件中心加载和处理消息,请参阅 使用 Azure 事件中心作为管道数据源。
从外部系统加载数据
Lakeflow Spark 声明性管道支持从 Azure Databricks 支持的任何数据源加载数据。 请参阅 “连接到数据源和外部服务”。 也可使用 Lakehouse Federation 为受支持的数据源加载外部数据。 由于 Lakehouse 联合会要求使用 Databricks Runtime 13.3 LTS 或更高版本,要使用 Lakehouse 联合会,请将管道配置为使用 预览通道。
某些数据源没有等效的 SQL 支持。 如果无法对其中一个数据源使用 Lakehouse 联合身份验证,则可以使用 Python 从源引入数据。 可以将 Python 和 SQL 源文件添加到同一管道。 以下示例声明具体化视图以访问远程 PostgreSQL 表中数据的当前状态。
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
从云对象存储加载小型或静态数据集
可以使用 Apache Spark 加载语法加载小型或静态数据集。 Lakeflow Spark 声明性管道支持 Azure Databricks 上的 Apache Spark 支持的所有文件格式。 要获取完整列表,请参阅“数据格式”选项。
以下示例演示如何加载 JSON 以创建表。
Python
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
注释
SQL read_files 函数适用于 Azure Databricks 上的所有 SQL 环境。 建议在管道中使用 SQL 进行直接文件访问的模式。 有关详细信息,请参阅 选项。
从 Python 自定义数据源加载数据
Python 自定义数据源允许以自定义格式加载数据。 可以编写代码来读取和写入特定的外部数据源,或使用现有的Python代码从自己的内部系统读取数据。 有关开发 Python 数据源的更多详细信息,请参阅 PySpark 自定义数据源。
以下示例使用格式名称 my_custom_datasource 注册自定义数据源,并在批处理和流式处理模式下读取该数据源。
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
将流式处理表配置为忽略源流式处理表中的更改
默认情况下,流式处理表需要“仅追加”源。 如果您的源流式处理表需要更新或删除(例如,为了进行 GDPR “被遗忘权利” 处理),请使用 skipChangeCommits 标记以忽略这些更改。 此标志仅在 spark.readStream 使用 option() 函数时有效,当源流式表是 create_auto_cdc_flow() 函数的目标时无法使用。 有关详细信息,请参阅 处理源 Delta 表的更改。
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
使用管道中的机密安全访问存储凭据
可以使用 Azure Databricks 机密 来存储凭据,例如访问密钥或密码。 若要在管道中配置机密,请在管道设置群集配置中使用一个 Spark 属性。 请参阅 管道的经典计算配置。
以下示例使用一个机密来存储访问密钥,该密钥是使用自动加载程序从 Azure Data Lake Storage 存储帐户读取输入数据所需的。 同样可以使用这种方法来配置管道所需的任何机密,例如用于访问 S3 的 AWS 密钥,或用于访问 Apache Hive 元存储的密码。
若要详细了解如何使用 Azure Data Lake Storage,请参阅 连接到 Azure Data Lake Storage 和 Blob 存储。
注释
必须将 spark.hadoop. 前缀添加到用于设置机密值的 spark_conf 配置密钥。
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
在此代码示例中,替换以下值。
| Placeholder | 替换为 |
|---|---|
<container-name> |
Azure存储帐户容器的名称。 |
<storage-account-name> |
ADLS 存储帐户名称。 |
<path> |
管道输出数据和元数据的路径。 |
<scope-name> |
Azure Databricks 机密范围名称。 |
<secret-name> |
包含Azure存储帐户访问密钥的密钥的名称。 |
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
在此代码示例中,替换以下值。
| Placeholder | 替换为 |
|---|---|
<container-name> |
存储输入数据的Azure存储帐户容器的名称。 |
<storage-account-name> |
ADLS 存储帐户名称。 |
<path-to-input-dataset> |
输入数据集的路径。 |