你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

构建 Delta Lake 以支持休闲和旅行在线预订中的临时查询

Azure 事件中心
Azure Data Lake Storage
Azure Databricks
Azure Synapse Analytics

此体系结构提供一个示例 Delta Lake 用于旅行服务预订,其中会频繁生成大量原始文档。

Apache® 和 Apache Spark™ 是 Apache Software Foundation 在美国和/或其他国家/地区的商标或注册商标。 使用这些标记并不暗示获得 Apache Software Foundation 的认可。

体系结构

Diagram of Delta Lake architecture.

下载此体系结构的 Visio 文件

休闲和旅行服务预订方案可能频繁生成大量原始文档。 但是,可能不需要为这些文档的整个内容编制索引。 例如,用户可能需要按已知的交易 ID 或客户名称搜索特定日期的结果,以检索他们感兴趣的一组文档。

数据流

此体系结构背后的概念在于将可用于搜索的元数据与裸数据解耦:

  • 只有元数据将在可查询服务(例如 Spark)中编制索引,而实际数据将存储在数据湖中。
  • 数据湖中的原始文档通过其路径链接到编制索引的元数据。
  • 查询文档时,服务将搜索文档的元数据,进而通过文档的路径从数据湖中检索实际文档。

此解决方案显著降低了成本并提高了性能,因为元数据只占整个数据资产的一小部分(例如,若干 PB 的原始文档可以用数十 GB 的简洁元数据来描述)。

此外,将历史深度和实时要求混合到统一、易维护的高性能系统中是此类方案的一个典型挑战。 Delta Lake 体系结构解决了这一挑战。

组件

Azure 应用服务是用于在托管虚拟机中生成和托管应用的平台即服务 (PaaS)。 应用服务管理运行应用的底层计算基础结构,并提供资源使用配额和应用指标的监视、诊断信息的日志记录以及基于指标的警报。

Azure 数据工厂是 Azure 的云提取、转换和加载 (ETL) 服务,用于横向扩展无服务器数据集成和数据转换。 它提供了无代码的 UI,以用于直观创作和集中式监视与管理。 还可以将现有 SQL Server Integration Services (SSIS) 包直接迁移到 Azure,并在 Azure 数据工厂中以完全兼容的方式运行它们。

Azure Data Lake Storage Gen2 是一组专用于大数据分析的功能,基于 Azure Blob 存储而构建。 Data Lake Storage Gen2 囊括了 Azure Data Lake Storage Gen1 和 Azure Blob 存储的功能。 例如,Data Lake Storage Gen2 提供文件系统语义、文件级安全和缩放。 由于这些功能是在 Blob 存储的基础上构建的,因此还可以得到具有高可用性/灾难恢复功能的低成本分层存储。

Azure 事件中心是一个完全托管的实时数据引入服务,它简单、可信任且可缩放。 每秒从任何源流式传输数百万个事件,以生成动态数据管道,并立即响应业务挑战。

Azure Databricks 是基于 Apache Spark 的数据分析平台,已针对 Microsoft Azure 云服务进行优化。 Azure Databricks 提供三个用于开发数据密集型应用程序的环境:Databricks SQL、Databricks 数据科学与工程以及 Databricks 机器学习。

备选方法

如果你不想要仅为元数据编制索引,还可以为提供查询功能的服务(例如 Azure Databricks、Azure Synapse Analytics、Azure 认知搜索或 Azure 数据资源管理器)中的所有原始数据编制索引。 此方法更直接,但需要注意数据大小、性能要求和更新频率综合造成的影响,尤其是从成本的角度看。

与使用 Delta Lake 相反,使用 Lambda 体系结构可将实时数据保存在与历史数据不同的存储库中,并且客户端将运行逻辑来使异构查询对用户透明。 此解决方案的优势是可用的服务集更大(例如 Azure 流分析和 Azure SQL 数据库),但体系结构更复杂,并且代码库的维护成本更高。

Spark 与 Azure DatabricksAzure Synapse AnalyticsAzure HDInsight 一起分发。 因此,可以使用其中的任何 Azure 数据服务(最好是使用支持 Delta Lake 0.8 或 1.0 的最新 Spark 版本)来实现此体系结构。

方案详细信息

休闲和旅行服务预订方案中原始数据的可见性对于多个参与者而言非常重要。 技术支持团队需要监督实时诊断,以持续监视交易处理并对不良问题做出快速反应。 数据工程师需要监督导出数据以供利益干系人审查,并实时馈送分析数据。 客户支持团队需要获取历史和近期数据来处理客户咨询与投诉。 最后,法律团队需确保遵守法规义务并履行法律措施。 此类要求在聚集了外部提供商和管理用户购买活动的市场中很常见。 例如,休闲和旅行服务预订系统可使用户和服务提供商去中介化,以便能够搜索服务、聚集提供商的有用产品/服务,以及管理用户预订。

Diagram of a marketplace with service providers and B2B and B2C users.

可能的用例

此体系结构非常适合旅行和酒店行业。 它适用于以下场景:

  • 以原始格式快速检索实时文档(例如,用于诊断)或历史原始文档(用于合规性目的)。
  • 管理 PB 量级的数据。
  • 保证实时诊断维持秒级性能。
  • 实现统一的实时诊断、历史查询和分析数据馈送方法。
  • 馈送下游实时分析数据。
  • 控制成本。
  • 在内部将数据用作原始文档(例如 json、xml 或 csv 文件)。
  • 当提供一小部分数据就足以描述查询时。
  • 当用户想要检索完整的原始文档时。
  • 当总数据大小需要将系统扩展到超过目标价格时。

对于以下情况,可能不适合使用此体系结构:

  • 在内部将数据用作记录集。
  • 用户需要运行分析。
  • 用户愿意使用自己打包的 BI 工具。
  • 从成本角度看,数据的大小并不是一个挑战。

不一定需要原始文档。

注意事项

这些注意事项实施 Azure 架构良好的框架的支柱原则,即一套可用于改善工作负载质量的指导原则。 有关详细信息,请参阅 Microsoft Azure 架构良好的框架

性能效率

性能效率是指工作负载能够以高效的方式扩展以满足用户对它的需求。 有关详细信息,请参阅性能效率要素概述

用户将执行两次跳跃来访问数据。 他们首先查询元数据,然后检索所需的文档集。 重用现有或打包的客户端资产可能有难度。

Azure Data Lake Storage Gen2 提供三个访问层:热、冷和存档层。 在只是偶尔检索文档的方案中,冷性能层可以保证与热性能层相似的性能,但具有成本优势。 在检索文档概率较高且包含较新数据的方案中,请考虑混合使用冷层和热层。 使用存档层存储还能提供替代硬删除的方案,并通过仅保留有意义的信息或聚合程度更高的数据来减少数据大小。

数据湖可能会管理 PB 量级的数据,因此通常适用数据保留策略。 应采用数据治理解决方案来管理数据生命周期,例如何时在热存储层和冷存储层之间移动旧数据、何时删除或存档旧数据,以及何时将信息聚合到下游分析解决方案中。

考虑如何对下游分析方案使用此方法。 尽管此示例工作负载不适用于分析,但它适用于馈送下游实时分析数据,而对于批处理方案,可以改为从数据湖馈送数据。

可伸缩性

在解耦从诊断和合规系统生成原始文档的事务系统方面,Azure 事件中心具有很高的通用性;易于在已建立的体系结构中实现;最终易于使用。 但是,事务系统可能已使用流式处理模式来处理传入的文档。 在这种情况下,可能需要将用于管理诊断和合规性的逻辑作为子流集成到流式处理应用程序中。

DevOps

若要自动部署此示例工作负载中使用的服务,最好使用连续集成和持续部署 (CI/CD) 过程。 考虑使用解决方案,例如 Azure DevOps 或 GitHub Actions。

成本优化

成本优化是关于寻找减少不必要的费用和提高运营效率的方法。 有关详细信息,请参阅成本优化支柱概述

通常,使用 Azure 定价计算器来估算成本。 若要了解其他注意事项,请参阅 Microsoft Azure 架构良好的框架中的成本部分。

部署此方案

在以下示例体系结构中,我们假设一个或多个 Azure 事件中心命名空间将包含结构化的原始文档(例如 json 或 xml 文件)。 但是,文档和源服务的实际类型与格式及其集成类型严重依赖于具体的方案和体系结构。

流式处理

使用 Spark 结构化流时,原始数据将被拉取、解压缩、分析并转换为流数据帧中的表格数据。

以下 PySpark 代码片段用于从事件中心加载流数据帧:

# Code tested in Databricks with Delta Lake 1.0
eh_connstr = <your_conn_str>
eh_consumergroup = <your_consumer_group>
ehConf = {}
ehConf['eventhubs.connectionString'] = 
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn
str)
ehConf['eventhubs.consumerGroup'] = eh_consumergroup

streaming_df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

以下代码片段用于处理流数据帧。 如有必要,它会首先解压缩事件中心消息,然后将其 json 结构分析为表格格式。 此代码仅为示例,应根据具体的方案对其进行改写:

# Code tested in Databricks with Delta Lake 1.0

# defines an UDF to unzip the Event Hubs Body field, assuming it 
is gzipped

import zlib
def DecompressFunction(data):
  decoded_data = zlib.decompress(bytes(data), 15+32)
  return decoded_data.decode()

Decompress = udf(lambda body: DecompressFunction(body), 
StringType())
decoded_body_df = streaming_df.withColumn("DecodedBody", 
Decompress(col("body"))).select("DecodedBody")

# Parse json message from Event Hubs body, assuming the raw 
document is stored in the data field, and the others fields hold 
some metadata about it

schema = StructType([ \
    StructField("transactionId", LongType(),True), \
    StructField("timestamp",TimestampType(),True), \
    StructField("providerName", StringType(),True), \
    StructField("document", StringType(),True), \
    StructField("documentType", StringType(),True)
  ])

parsed_body_df = decoded_body_df.withColumn("jsonBody", 
from_json(col("DecodedBody"), schema)).select("jsonBody")

实际数据处理包括两个步骤。 首先是提取元数据,以帮助在处理后搜索原始文档。 实际元数据取决于用例,但概括性的示例包括相关日期和标识符、文档类型、源服务和任何类型的类别:

# Code tested in Databricks with Delta Lake 1.0

df = parsed_body_df \
    .withColumn("transactionId", 
parsed_body_df.jsonBody.transactionId) \
    .withColumn("timestamp", parsed_body_df.jsonBody.timestamp) \
    .withColumn("providerName", 
parsed_body_df.jsonBody.providerName) \
    .withColumn("data", parsed_body_df.jsonBody.data)
    .withColumn("documentType", 
parsed_body_df.jsonBody.documentType)

第二个处理步骤是生成 Azure Data Lake Storage Gen2 的路径,你将在其中存储原始文档:

# Code tested in Databricks with Delta Lake 1.0

# A function to generate a path
def GetPathFunction(timeStamp, transactionId, providerName, 
Suffix='', Extension=".gz"):
  yy = timeStamp.year
  mm = timeStamp.month
  dd = timeStamp.day
  hh = timeStamp.hour
  mn = timeStamp.minute
  Suffix = f"{Suffix}_" if Suffix != '' else ''
  Name = f"{Suffix}{providerName}{Extension}"
  path = f"/{yy}/{mm}/{dd}/{hh}/{mn}/{transactionId}/{Name}"
  return path

GetPath = udf(lambda timestamp, transactionId, providerName, 
suffix, extension: GetPathFunction(timestamp, transactionId, 
providerName, suffix, extension), StringType())

df = df.withColumn("path", GetPath(col("timestamp"), 
col("transactionId"), col("providerName"), col('documentType')))

Delta Lake 中的元数据引入

元数据将写入到启用实时查询功能的增量表中。 写入将在缓冲区中流式处理,对表的查询可将缓冲区中的结果与表历史部分中的结果合并。

以下代码片段演示如何在元存储中定义增量表,并按日期将该表分区:

# Code tested in Databricks with Delta Lake 1.0

DeltaTable.create(spark) \
   .tableName("metadata") \
   .addColumn("transactionId", LongType()) \
   .addColumn("date", TimestampType()) \
   .addColumn("providerName", StringType()) \
   .addColumn("documentType", StringType()) \
   .addColumn("path", StringType()) \
   .partitionedBy("date") \
   .execute()

请注意,transactionId 字段是数字。 通过分布式系统的典型消息可能改用 GUID 来唯一标识事务。 但是,数字数据类型可以在大部分数据平台中提高查询性能。

鉴于云数据平台(例如 Spark)的分布性,分配唯一的事务标识符可能有挑战性。 一种有用的方法是将此类事务标识符基于分区标识符(例如事件中心分区编号)和分区内部的递增编号。 此方法的一个示例是 Azure Databricks 中的 monotonically_increasing_id()

以下代码片段演示了如何将带有原始文档元数据的流追加到增量表中:

# Code tested in Databricks with Delta Lake 1.0

df.withColumn("date", col("timeStamp").cast(DateType())) \
    .select("transactionId", "date", "providerName", 
"documentType", "path") \
    .writeStream.format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", 
"/delta/metadata/_checkpoints/metadata_checkpoint") \
    .table("metadata")

请注意,分区是在根据表架构写入流时进行管理的。

数据湖中的数据引入

实际原始文档将写入到 Azure Data Lake Gen2 中的相应存储性能层。

以下代码片段演示一个将文件上传到 Azure Data Lake Store Gen2 的简单函数;使用 DataStreamWriter 类中的 foreach 方法,可以上传托管在流数据帧的每条记录中的文件:

# Code tested in Databricks with Delta Lake 1.0

from azure.storage.filedatalake import DataLakeServiceClient

def upload_data(storage_account_name, storage_account_key, 
file_system_name, file_path, data):

  service_client = 
DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".
format("https", storage_account_name), 
credential=storage_account_key)

  file_system_client = 
service_client.get_file_system_client(file_system_name)
  file_client = 
service_client.get_file_client(file_system_client.file_system_nam
e, file_path)
    
  if not file_client.exists:
    file_client.create_file()      

  file_client.upload_data(data, overwrite=True)
  
# Process a row to upload data to ADLS
def Row2ADLS(row):
  upload_data(adls_name, adls_key, adls_container, row['path'], 
row['data'])

df.writeStream.foreach(Row2ADLS).start()

客户端

客户端可以是自定义 Web 应用程序,它使用元数据通过标准 SQL 语句从增量表中检索文档路径,进而通过标准 Azure Data Lake Storage Gen2 API 从数据湖中检索实际文档。

例如,以下代码片段演示如何检索特定事务中所有文档的路径:

select * from metadata where transactionId = '123456'

后续步骤

参阅相关的体系结构指南:

参阅以下相关体系结构: