重要
Lakeflow 声明性管道 sink
API 正在公开预览。
本文介绍 Lakeflow 声明性管道 sink
API,以及如何将其与 流 配合使用,以将管道转换的记录写入外部数据接收器。 外部数据接收器包括 Unity 目录托管表和外部表,以及 Apache Kafka 或 Azure 事件中心等事件流服务。
注释
Lakeflow 声明性管道 sink
API 仅适用于 Python。
什么是 Lakeflow 声明性管道接收器?
Lakeflow 声明性管道接收器是 Lakeflow 声明性管道流的目标。 默认情况下,Lakeflow 声明性管道将数据发送至流式表或物化视图目标。 这两个表都是 Azure Databricks 托管的 Delta 表。 Lakeflow 声明性管道接收器是用于将转换后的数据写入目标的替代方案,目标包括事件流服务(如 Apache Kafka 或 Azure 事件中心)及由 Unity 目录管理的外部表。 通过使用 sinks,现在可以有更多的选项来持久化 Lakeflow 声明性管道的输出。
何时应使用 Lakeflow 声明性管道接收器?
如果需要:Databricks 建议使用 Lakeflow 声明性管道接收器:
- 构建作用例,例如欺诈检测、实时分析和客户建议。 作用例通常从消息总线(例如 Apache Kafka 主题)读取数据,然后以低延迟处理数据,并将处理过的记录写回到消息总线。 此方法使你能够通过不从云存储写入或读取来实现较低的延迟。
- 将转换后的数据从 Lakeflow 声明性管道流中写入由外部 Delta 实例管理的表,包括由“Unity Catalog”管理的表和外部表。
- 将反向提取-转换-加载 (ETL) 执行到 Databricks 外部的接收器,例如 Apache Kafka 主题。 使用此方法可以有效地支持需要读取或使用 Unity 目录表或其他 Databricks 托管存储之外的数据的用例。
如何使用 Lakeflow 声明性管道接收器?
当事件数据从流式处理源引入 Lakeflow 声明性管道时,可以使用 Lakeflow 声明性管道功能处理和优化此数据,然后使用追加流处理将转换的数据记录流式传输到 Lakeflow 声明性管道接收器。 使用 create_sink()
函数创建此接收器。 有关 create_sink
函数的更多详细信息,请参阅 Sink API 参考。
如果您有一个管道用于创建或处理流式事件数据,并为写入准备数据记录,那么就可以使用 Lakeflow 声明式管道的接收器。
实现 Lakeflow 声明性管道接收器由两个步骤组成:
- 创建 Lakeflow 声明式管道接收器。
- 使用追加流将准备好的记录写入接收器。
创建 Lakeflow 声明性管道接收器
Databricks 支持将从流数据处理的记录写入下面三种类型的目标接收器:
- Delta 表接收器(包括 Unity Catalog 托管表和外部表)
- Apache Kafka 接收器
- Azure 事件中心接收器
下面是 Delta、Kafka 和 Azure 事件中心接收器的配置示例:
德尔塔水槽
若要按文件路径创建 Delta 接收器,请执行以下操作:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
若要使用完全限定的目录和架构路径按表名称创建 Delta 接收器,请执行以下操作:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
Kafka 和 Azure 事件中心接收器
此代码适用于 Apache Kafka 和 Azure 事件中心接收器。
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
有关使用 create_sink
函数的更多详细信息,请参阅 接收器 API 参考。
创建接收器后,可以开始将处理过的记录流式传输到接收器。
使用追加流写入 Lakeflow 声明性管道接收器
创建接收器后,下一步是通过将接收器指定为追加流输出的记录的目标,向其写入处理后的记录。 为此,请将接收器指定为 target
修饰器中的 append_flow
值。
- 对于 Unity 目录托管表和外部表,请使用格式
delta
并在选项中指定路径或表名称。 Lakeflow 声明式管道必须配置为使用 Unity 目录。 - 对于 Apache Kafka 主题,请使用
kafka
格式,并在选项中指定主题名称、连接信息和身份验证信息。 这些选项与 Spark 结构化流式处理 Kafka 接收器支持的选项相同。 请参阅配置 Kafka 结构化流式处理写入器。 - 对于 Azure 事件中心,请使用
kafka
格式,并在选项中指定事件中心名称、连接信息和身份验证信息。 这些选项与使用 Kafka 接口的 Spark 结构化流式处理事件中心接收器中支持的选项相同。 请参阅使用 Microsoft Entra ID 和 Azure 事件中心进行服务主体身份验证。
以下示例演示如何设置流以写入 Delta、Kafka 和 Azure 事件中心接收器,其中包含由 Lakeflow 声明性管道处理的记录。
Delta 接收器
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Kafka 和 Azure 事件中心接收器
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
对于 Azure 事件中心接收器,value
参数是必需的。 其他参数(如 key
、partition
、headers
和 topic
)是可选的。
有关 append_flow
修饰器的更多详细信息,请参见 使用多个流写入单个目标。
限制
仅支持 Python API。 不支持 SQL。
仅支持流式处理查询。 不支持批处理查询。
只可使用
append_flow
写入到接收器。 不支持其他流,例如create_auto_cdc_flow
,不支持在 Lakeflow 声明性管道数据集定义中使用接收器。 例如,不支持以下各项:@table("from_sink_table") def fromSink(): return read_stream("my_sink")
对于 Delta 接收器,表名称必须完全限定的。 具体而言,对于 Unity Catalog 托管的外部表,表名称必须是格式
<catalog>.<schema>.<table>
。 对于 Hive 元存储,它必须采用<schema>.<table>
格式。运行完全刷新更新不会清理接收器中以前计算的结果数据。 这意味着,任何重新处理的数据都将追加到接收器,并且不会更改现有数据。
不支持 Lakeflow 声明性管道预期。