建置 Delta Lake 以支援在線休閒和旅遊預訂中的臨機操作查詢

Azure 事件中樞
Azure Data Lake 儲存體
Azure Databricks
Azure Synapse Analytics

此架構提供旅行預約的範例 Delta Lake,其中會以高頻率產生大量的原始檔。

Apache 和 Apache® Spark™ 是 美國 和/或其他國家/地區的 Apache Software Foundation 註冊商標或商標。 使用這些標記不會隱含 Apache Software Foundation 的背書。

架構

Delta Lake 架構的圖表。

下載此架構的 Visio 檔案

休閒和旅遊預約案例可能會以高頻率產生大量的原始檔。 不過,您可能不需要為這些文件的整個內容編製索引。 例如,使用者可能需要依已知的交易標識碼或特定日期的客戶名稱來搜尋,以擷取一組對他們感興趣的檔。

資料流程

此架構背後的概念包含將元數據與裸機數據搜尋很有用的元數據分離:

  • 只有元數據會在可查詢的服務中編製索引(例如Spark),而實際數據會儲存在數據湖中。
  • Data Lake 中的原始檔會依其路徑連結到已編製索引的元數據。
  • 查詢檔時,服務會搜尋檔的元數據,然後依其路徑從 Data Lake 擷取實際檔。

此解決方案可大幅降低成本並提升效能,因為元數據包含整個數據資產的一小部分(例如,數 PB 的原始檔可由數十 GB 的精簡元數據來描述)。

此外,管理將歷史深度和即時需求混合成統一、易於維護、高效能的系統,是這類案例的典型挑戰。 Delta Lake 架構可回答這項挑戰。

元件

Azure App 服務 是一種平臺即服務(PaaS),用於在受控虛擬機中建置和裝載應用程式。 App Service 會管理應用程式執行所在的基礎計算基礎結構,並根據計量來監視資源使用量配額和應用程式計量、診斷信息的記錄,以及警示。

Azure Data Factory 是 Azure 的雲端擷取、轉換和載入 (ETL) 服務,可用於向外延展無伺服器數據整合和數據轉換。 它提供無程式碼的 UI,可用於直覺撰寫和單一虛擬管理平台的監視與管理。 您也可以將現有的 SQL Server Integration Services (SSIS) 套件隨即轉移至 Azure,並在 Azure Data Factory 中以完整相容性執行這些套件。

Azure Data Lake 儲存體 Gen2 是一組專用於巨量數據分析的功能,以 Azure Blob 儲存體 為基礎。 Data Lake Storage Gen2 包含 Azure Data Lake Storage Gen1 的功能和 Azure Blob 儲存體。 例如,Data Lake Storage Gen2 會提供檔案系統語法、檔案層級安全性和規模調整。 由於這些功能是以 Blob 儲存體 為基礎所建置,因此您也會獲得低成本、分層式記憶體,以及高可用性/災害復原功能。

Azure 事件中樞 是完全受控的實時數據擷取服務,其簡單、受信任且可調整。 每秒從任何來源串流數百萬個事件,以建立動態資料管道,並立即回應商務挑戰。

Azure Databricks 是以 Apache Spark 為基礎的數據分析平臺,已針對 Microsoft Azure 雲端服務 優化。 Azure Databricks 提供三個環境來開發需要大量數據的應用程式:Databricks SQL、Databricks 資料科學 和 Engineering,以及 Databricks 機器學習。

替代項目

除了只編製元數據的索引,您也可以為服務中的所有原始數據編製索引,以提供查詢功能,例如 Azure Databricks、Azure Synapse Analytics、Azure 認知搜尋 或 Azure 數據總管。 這種方法比較直接,但請注意數據大小、效能需求和更新頻率的結合效果,特別是從成本的觀點來看。

與使用差異湖相反,使用 Lambda 架構 會將即時數據保留在與歷程記錄數據不同的存放庫中,而您的用戶端會執行邏輯,讓異質查詢對使用者保持透明。 此解決方案的優點是您可以使用的較大型服務集合(例如 Azure 串流分析和 Azure SQL 資料庫),但架構會變得更複雜,而且程式代碼基底更耗費成本來維護。

Spark 會與 Azure Databricks、Azure Synapse AnalyticsAzure HDInsight 一起散發。 因此,此架構可以使用上述任何 Azure 數據服務來實作,最好是支援 Delta Lake 0.8 或 1.0 的最新 Spark 版本。

案例詳細資料

休閒和旅遊預約案例中原始數據的可見度對多個執行者而言很重要。 技術支援小組會監督即時診斷,以持續監視事務處理,並快速回應不想要的問題。 數據工程師會監督導出數據,以便利害關係人檢閱及即時摘要分析。 客戶支援小組需要歷程記錄和最近的數據來處理客戶查詢和投訴。 最後,法律小組可確保遵守合規性職責,並執行法律行動。 這些類型的需求在匯總外部提供者和管理用戶購買的市集中很常見。 例如,休閒和旅遊預訂系統會解除對使用者和服務提供者進行搜尋服務、匯總提供者有意義的供應專案,以及管理用戶預訂。

具有服務提供者和 B2B 和 B2C 使用者的市集圖表。

潛在使用案例

此架構非常適合旅遊和酒店業。 適用於下列案例:

  • 以原始格式快速擷取即時(例如診斷)或歷程記錄(適用於合規性)原始檔。
  • 管理數 PB 的數據。
  • 保證即時診斷的秒範圍效能。
  • 達成即時診斷、歷程記錄查詢和饋送分析的統一方法。
  • 提供下游即時分析。
  • 控制成本。
  • 以原始檔形式提供數據(例如 json、xml 或 csv 檔案)。
  • 當一小部分的數據足以描述查詢時。
  • 當使用者想要擷取完整的原始檔時。
  • 當總數據大小需要調整系統高於您的目標價格時。

此架構在下列情況下可能不適合:

  • 數據會以記錄集的形式提供資源。
  • 用戶必須執行分析。
  • 用戶願意使用自己的封裝 BI 工具。
  • 從成本的觀點來看,數據大小並不是一項挑戰。

未經處理的檔不一定是必要的。

考量

這些考量能實作 Azure Well-Architected Framework 的要素,其為一組指導原則,可以用來改善工作負載的品質。 如需詳細資訊,請參閱 Microsoft Azure Well-Architected Framework (部分機器翻譯)。

效能效益

效能效率可讓您的工作負載進行調整,以有效率的方式符合使用者對其放置的需求。 如需詳細資訊,請參閱效能效率要件概觀

使用者將會執行雙躍點來存取數據。 他們會先查詢元數據,然後擷取所需的檔集。 可能很難重複使用現有的或封裝的客戶端資產。

Azure Data Lake 儲存體 Gen2 提供三存取層:經常性存取層、非經常性存取層和封存層。 在偶爾擷取檔時,非經常性存取效能層級應該保證與經常性效能層級類似的效能,但成本較低。 在具有較新數據擷取機率較高的案例中,請考慮混合非經常性存取層。 使用封存層記憶體也可以提供硬式刪除的替代方案,以及只保留有意義的資訊或更多匯總數據來減少數據的大小。

Data Lake 可能會管理數 PB 的數據,因此通常會套用數據保留原則。 應採用數據控管解決方案來管理數據生命週期,例如何時在經常性存取和非經常性存取儲存層之間移動舊數據、何時刪除或封存舊數據,以及何時將資訊匯總至下游分析解決方案。

請考慮此方法如何與下游分析案例搭配使用。 雖然此範例工作負載不適用於分析,但適合用於饋送下游即時分析,而批次案例可改為從 Data Lake 提供。

延展性

Azure 事件中樞 在分離交易式系統時,從診斷和合規性系統分離產生原始檔;很容易在已建立的架構中實作;而且最終很容易使用。 不過,交易系統可能已經使用串流模式來處理傳入的檔。 在此情況下,您可能需要將用來管理診斷和合規性的邏輯整合到串流應用程式中做為子數據流。

DevOps

若要自動部署此範例工作負載中的已使用服務,最好使用 持續整合和持續部署 (CI/CD) 程式。 請考慮使用 Azure DevOps 或 GitHub Actions 等解決方案。

成本最佳化

成本最佳化是關於考慮如何減少不必要的費用,並提升營運效率。 如需詳細資訊,請參閱成本最佳化要素的概觀

一般而言,使用 Azure 定價計算機 來預估成本。 請參閱 Microsoft Azure 良好架構架構中的成本一節,以瞭解其他考慮。

部署此案例

在下列範例架構中,我們假設一或多個 Azure 事件中樞 命名空間將包含結構化的原始檔(例如 json 或 xml 檔案)。 不過,檔和來源服務的實際類型和格式及其整合類型高度相依於特定案例和架構。

串流

使用 Spark 結構化串流時,會提取、解壓縮、剖析和轉譯為串流數據框架中的表格式數據。

下列 PySpark 代碼段可用來從事件中樞載入串流資料框架:

# Code tested in Databricks with Delta Lake 1.0
eh_connstr = <your_conn_str>
eh_consumergroup = <your_consumer_group>
ehConf = {}
ehConf['eventhubs.connectionString'] = 
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn
str)
ehConf['eventhubs.consumerGroup'] = eh_consumergroup

streaming_df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

下列代碼段是用來處理串流 DataFrame。 它會在必要時先解壓縮事件中樞訊息,然後將其 json 結構剖析為表格式格式。 此程式代碼是範例,應調整為您的特定案例:

# Code tested in Databricks with Delta Lake 1.0

# defines an UDF to unzip the Event Hubs Body field, assuming it 
is gzipped

import zlib
def DecompressFunction(data):
  decoded_data = zlib.decompress(bytes(data), 15+32)
  return decoded_data.decode()

Decompress = udf(lambda body: DecompressFunction(body), 
StringType())
decoded_body_df = streaming_df.withColumn("DecodedBody", 
Decompress(col("body"))).select("DecodedBody")

# Parse json message from Event Hubs body, assuming the raw 
document is stored in the data field, and the others fields hold 
some metadata about it

schema = StructType([ \
    StructField("transactionId", LongType(),True), \
    StructField("timestamp",TimestampType(),True), \
    StructField("providerName", StringType(),True), \
    StructField("document", StringType(),True), \
    StructField("documentType", StringType(),True)
  ])

parsed_body_df = decoded_body_df.withColumn("jsonBody", 
from_json(col("DecodedBody"), schema)).select("jsonBody")

實際數據處理是由兩個步驟所組成。 第一個是擷取元數據,以協助在處理之後搜尋原始檔。 實際元數據取決於使用案例,但一般化範例會是相關的日期和標識碼、檔類型、來源服務,以及任何類型的類別:

# Code tested in Databricks with Delta Lake 1.0

df = parsed_body_df \
    .withColumn("transactionId", 
parsed_body_df.jsonBody.transactionId) \
    .withColumn("timestamp", parsed_body_df.jsonBody.timestamp) \
    .withColumn("providerName", 
parsed_body_df.jsonBody.providerName) \
    .withColumn("data", parsed_body_df.jsonBody.data)
    .withColumn("documentType", 
parsed_body_df.jsonBody.documentType)

第二個處理步驟是產生 Azure Data Lake 儲存體 Gen2 的路徑,您將在其中儲存源檔:

# Code tested in Databricks with Delta Lake 1.0

# A function to generate a path
def GetPathFunction(timeStamp, transactionId, providerName, 
Suffix='', Extension=".gz"):
  yy = timeStamp.year
  mm = timeStamp.month
  dd = timeStamp.day
  hh = timeStamp.hour
  mn = timeStamp.minute
  Suffix = f"{Suffix}_" if Suffix != '' else ''
  Name = f"{Suffix}{providerName}{Extension}"
  path = f"/{yy}/{mm}/{dd}/{hh}/{mn}/{transactionId}/{Name}"
  return path

GetPath = udf(lambda timestamp, transactionId, providerName, 
suffix, extension: GetPathFunction(timestamp, transactionId, 
providerName, suffix, extension), StringType())

df = df.withColumn("path", GetPath(col("timestamp"), 
col("transactionId"), col("providerName"), col('documentType')))

差異湖中的元數據擷取

元數據會寫入差異數據表,以啟用即時查詢功能。 寫入會串流處理在緩衝區中,而對數據表的查詢可以將緩衝區的結果與數據表歷程記錄部分的結果合併。

下列代碼段示範如何在中繼存放區中定義差異數據表,並依日期進行分割:

# Code tested in Databricks with Delta Lake 1.0

DeltaTable.create(spark) \
   .tableName("metadata") \
   .addColumn("transactionId", LongType()) \
   .addColumn("date", TimestampType()) \
   .addColumn("providerName", StringType()) \
   .addColumn("documentType", StringType()) \
   .addColumn("path", StringType()) \
   .partitionedBy("date") \
   .execute()

請注意,transactionId 欄位是數值。 傳遞分散式系統的一般訊息可能會使用 GUID 來唯一識別交易。 不過,數值數據類型在大部分的數據平台中啟用更高的查詢效能。

指派唯一的交易標識碼可能具有挑戰性,因為雲端數據平臺的分散式本質(例如 Spark)。 實用的方法是將這類交易標識符基底在分割區標識符上(例如事件中樞數據分割編號)和數據分割內累加編號。 此方法的範例是 Azure Databricks 中的monotonically_increasing_id()。

下列代碼段示範如何將原始檔的元資料附加至差異資料表的數據流:

# Code tested in Databricks with Delta Lake 1.0

df.withColumn("date", col("timeStamp").cast(DateType())) \
    .select("transactionId", "date", "providerName", 
"documentType", "path") \
    .writeStream.format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", 
"/delta/metadata/_checkpoints/metadata_checkpoint") \
    .table("metadata")

請注意,根據數據表架構撰寫數據流時,會管理數據分割。

數據湖中的數據擷取

實際的原始檔會寫入 Azure Data Lake Gen2 中適當的記憶體效能層。

下列代碼段顯示將檔案上傳至 Azure Data Lake Store Gen2 的簡單函式;在 類別中使用 DataStreamWriter foreach 方法,可讓您上傳串流 DataFrame 中每個記錄中裝載的檔案:

# Code tested in Databricks with Delta Lake 1.0

from azure.storage.filedatalake import DataLakeServiceClient

def upload_data(storage_account_name, storage_account_key, 
file_system_name, file_path, data):

  service_client = 
DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".
format("https", storage_account_name), 
credential=storage_account_key)

  file_system_client = 
service_client.get_file_system_client(file_system_name)
  file_client = 
service_client.get_file_client(file_system_client.file_system_nam
e, file_path)
    
  if not file_client.exists:
    file_client.create_file()      

  file_client.upload_data(data, overwrite=True)
  
# Process a row to upload data to ADLS
def Row2ADLS(row):
  upload_data(adls_name, adls_key, adls_container, row['path'], 
row['data'])

df.writeStream.foreach(Row2ADLS).start()

用戶端

用戶端可以是使用元數據從差異數據表擷取具有標準 SQL 語句之文件路徑的自定義 Web 應用程式,然後透過標準 Azure Data Lake 儲存體 Gen2 API 從 Data Lake 擷取實際檔。

例如,下列代碼段示範如何擷取特定交易中所有文件的路徑:

select * from metadata where transactionId = '123456'

下一步

請參閱相關的架構指引:

請參閱下列相關架構: