使用数据接收器通过 Lakeflow 声明性管道将记录推送到外部服务

重要

Lakeflow 声明性管道 sink API 正在公开预览

本文介绍 Lakeflow 声明性管道 sink API,以及如何将其与 配合使用,以将管道转换的记录写入外部数据接收器。 外部数据接收器包括 Unity 目录托管表和外部表,以及 Apache Kafka 或 Azure 事件中心等事件流服务。

注释

Lakeflow 声明性管道 sink API 仅适用于 Python。

什么是 Lakeflow 声明性管道接收器?

Lakeflow 声明性管道接收器是 Lakeflow 声明性管道流的目标。 默认情况下,Lakeflow 声明性管道将数据发送至流式表或物化视图目标。 这两个表都是 Azure Databricks 托管的 Delta 表。 Lakeflow 声明性管道接收器是用于将转换后的数据写入目标的替代方案,目标包括事件流服务(如 Apache Kafka 或 Azure 事件中心)及由 Unity 目录管理的外部表。 通过使用 sinks,现在可以有更多的选项来持久化 Lakeflow 声明性管道的输出。

何时应使用 Lakeflow 声明性管道接收器?

如果需要:Databricks 建议使用 Lakeflow 声明性管道接收器:

  • 构建作用例,例如欺诈检测、实时分析和客户建议。 作用例通常从消息总线(例如 Apache Kafka 主题)读取数据,然后以低延迟处理数据,并将处理过的记录写回到消息总线。 此方法使你能够通过不从云存储写入或读取来实现较低的延迟。
  • 将转换后的数据从 Lakeflow 声明性管道流中写入由外部 Delta 实例管理的表,包括由“Unity Catalog”管理的表和外部表。
  • 将反向提取-转换-加载 (ETL) 执行到 Databricks 外部的接收器,例如 Apache Kafka 主题。 使用此方法可以有效地支持需要读取或使用 Unity 目录表或其他 Databricks 托管存储之外的数据的用例。

如何使用 Lakeflow 声明性管道接收器?

当事件数据从流式处理源引入 Lakeflow 声明性管道时,可以使用 Lakeflow 声明性管道功能处理和优化此数据,然后使用追加流处理将转换的数据记录流式传输到 Lakeflow 声明性管道接收器。 使用 create_sink() 函数创建此接收器。 有关 create_sink 函数的更多详细信息,请参阅 Sink API 参考

如果您有一个管道用于创建或处理流式事件数据,并为写入准备数据记录,那么就可以使用 Lakeflow 声明式管道的接收器。

实现 Lakeflow 声明性管道接收器由两个步骤组成:

  1. 创建 Lakeflow 声明式管道接收器。
  2. 使用追加流将准备好的记录写入接收器。

创建 Lakeflow 声明性管道接收器

Databricks 支持将从流数据处理的记录写入下面三种类型的目标接收器:

  • Delta 表接收器(包括 Unity Catalog 托管表和外部表)
  • Apache Kafka 接收器
  • Azure 事件中心接收器

下面是 Delta、Kafka 和 Azure 事件中心接收器的配置示例:

德尔塔水槽

若要按文件路径创建 Delta 接收器,请执行以下操作:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

若要使用完全限定的目录和架构路径按表名称创建 Delta 接收器,请执行以下操作:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = { "tableName": "catalog_name.schema_name.table_name" }
)

Kafka 和 Azure 事件中心接收器

此代码适用于 Apache Kafka 和 Azure 事件中心接收器。

topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")

eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
  + f' required username="$ConnectionString" password="{connection_string}";'

dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
    "topic": topic_name
  }
)

有关使用 create_sink 函数的更多详细信息,请参阅 接收器 API 参考

创建接收器后,可以开始将处理过的记录流式传输到接收器。

使用追加流写入 Lakeflow 声明性管道接收器

创建接收器后,下一步是通过将接收器指定为追加流输出的记录的目标,向其写入处理后的记录。 为此,请将接收器指定为 target 修饰器中的 append_flow 值。

  • 对于 Unity 目录托管表和外部表,请使用格式 delta 并在选项中指定路径或表名称。 Lakeflow 声明式管道必须配置为使用 Unity 目录。
  • 对于 Apache Kafka 主题,请使用 kafka 格式,并在选项中指定主题名称、连接信息和身份验证信息。 这些选项与 Spark 结构化流式处理 Kafka 接收器支持的选项相同。 请参阅配置 Kafka 结构化流式处理写入器
  • 对于 Azure 事件中心,请使用 kafka 格式,并在选项中指定事件中心名称、连接信息和身份验证信息。 这些选项与使用 Kafka 接口的 Spark 结构化流式处理事件中心接收器中支持的选项相同。 请参阅使用 Microsoft Entra ID 和 Azure 事件中心进行服务主体身份验证

以下示例演示如何设置流以写入 Delta、Kafka 和 Azure 事件中心接收器,其中包含由 Lakeflow 声明性管道处理的记录。

Delta 接收器

@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
  return(
  spark.readStream.table("spark_referrers")
  .selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

Kafka 和 Azure 事件中心接收器

@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
  spark.readStream.table("spark_referrers")
  .selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)

对于 Azure 事件中心接收器,value 参数是必需的。 其他参数(如 keypartitionheaderstopic)是可选的。

有关 append_flow 修饰器的更多详细信息,请参见 使用多个流写入单个目标

限制

  • 仅支持 Python API。 不支持 SQL。

  • 仅支持流式处理查询。 不支持批处理查询。

  • 只可使用 append_flow 写入到接收器。 不支持其他流,例如 create_auto_cdc_flow,不支持在 Lakeflow 声明性管道数据集定义中使用接收器。 例如,不支持以下各项:

    @table("from_sink_table")
    def fromSink():
      return read_stream("my_sink")
    
  • 对于 Delta 接收器,表名称必须完全限定的。 具体而言,对于 Unity Catalog 托管的外部表,表名称必须是格式 <catalog>.<schema>.<table>。 对于 Hive 元存储,它必须采用 <schema>.<table> 格式。

  • 运行完全刷新更新不会清理接收器中以前计算的结果数据。 这意味着,任何重新处理的数据都将追加到接收器,并且不会更改现有数据。

  • 不支持 Lakeflow 声明性管道预期。

资源