管道和篩選模式

Azure Blob 儲存體
Azure Functions
Azure 佇列儲存體

將執行複雜處理的工作分解成一系列可重複使用的不同元素。 這樣做可以藉由允許獨立部署和調整處理的工作元素,來改善效能、延展性和可重複使用性。

內容和問題

您有需要處理的循序工作管線。 實作此應用程式的簡單但不靈活方法是在整合型模組中執行此處理。 不過,這種方法可能會減少重構程式代碼、優化程式代碼的機會,或在應用程式中其他地方需要相同處理部分時重複使用程序代碼。

下圖說明使用整合型方法處理數據的其中一個問題,即無法跨多個管線重複使用程序代碼。 在此範例中,應用程式會接收及處理來自兩個來源的數據。 個別的模組會執行一系列工作來轉換數據,再將結果傳遞至應用程式的商業規則,以處理每個來源的數據。

此圖顯示使用整合型模組實作的解決方案。

整合型模組執行的一些工作在功能上很類似,但程式代碼必須在模組中重複,而且很可能在其模組內緊密結合。 除了無法重複使用邏輯之外,此方法也會在需求變更時帶來風險。 您必須記得在這兩個地方更新程序代碼。

整合型實作與多個管線無關或重複使用還有其他挑戰。 使用整合型時,您無法在不同的環境中執行特定工作,或獨立調整它們。 某些工作可能會需要大量計算,並受益於在功能強大的硬體上執行,或平行執行多個實例。 其他工作可能沒有相同的需求。 此外,使用整合型,重新排序工作或在管線中插入新工作是一項挑戰。 這些變更需要重新測試整個管線。

解決方案

將每個數據流所需的處理分解成一組個別元件(或篩選),每個元件都會執行單一工作。 篩選條件會透過使用管道連接篩選來組成管線。 篩選會從輸入管道接收訊息,並將訊息發佈至不同的輸出管道。 管道不會執行路由或任何其他邏輯。 它們只會連接篩選,將輸出訊息從一個篩選條件傳遞為輸入至下一個篩選。

篩選會獨立運作,且不知道其他篩選。 他們只會知道其輸入和輸出架構。 因此,只要任何篩選的輸入架構符合前一個篩選的輸出架構,就可以依任何順序排列篩選條件。 針對所有篩選使用標準化架構可增強重新排序篩選的能力。

篩選條件的鬆散結合可讓您輕鬆:

  • 建立由現有篩選所組成的新管線
  • 更新或取代個別篩選中的邏輯
  • 必要時重新排序篩選
  • 視需要對不同硬體執行篩選
  • 平行執行篩選

下圖顯示使用管道和篩選實作的解決方案:

此圖顯示使用管道和篩選實作的解決方案。

處理單一要求所需的時間取決於管線中最慢的篩選條件速度。 一或多個篩選可能是瓶頸,特別是當來自特定數據源的數據流中出現大量要求時。 執行慢速篩選器平行實例的能力可讓系統分散負載並改善輸送量。

在不同計算實例上執行篩選的能力,可讓它們獨立調整,並利用許多雲端環境所提供的彈性。 需要大量計算的篩選器可以在高效能硬體上執行,而其他需求較低的篩選器則可裝載在成本較低的商品硬體上。 篩選甚至不需要位於相同的數據中心或地理位置,讓管線中的每個元素都能在接近所需資源的環境中執行。 此圖顯示從來源 1 套用至資料管線的範例:

此圖顯示從來源 1 套用至數據管線的範例。

如果篩選的輸入和輸出結構化為數據流,您可以平行執行每個篩選的處理。 管線中的第一個篩選可以啟動其工作並輸出其結果,其結果會在第一個篩選完成其工作之前,直接傳遞至序列中的下一個篩選。

使用管道和篩選模式與 補償交易模式 是實作分散式交易的替代方法。 您可以將分散式交易分成個別的可補償工作,每個工作都可以透過實作補償交易模式的篩選來實作。 您可以將管線中的篩選實作為個別裝載的工作,這些工作會接近其維護的數據。

問題和考量

當您決定如何實作此模式時,請考慮下列幾點:

  • 複雜度。 此模式所提供的彈性增加也可能會帶來複雜度,尤其是在管線中的篩選分散到不同的伺服器時。

  • 可靠性。 使用基礎結構,確保管線中的篩選之間流動的數據不會遺失。

  • 等冪性。 如果管線中的篩選在收到訊息後失敗,並將工作重新排程至篩選的另一個實例,部分工作可能已經完成。 如果工作更新全域狀態的某些層面(例如儲存在資料庫中的資訊),可能會重複單一更新。 如果篩選在將結果張貼到下一個篩選之後失敗,但在它指出它順利完成其工作之前,可能會發生類似的問題。 在這些情況下,篩選的另一個實例可能會重複這項工作,導致相同的結果張貼兩次。 此案例可能會導致管線中的後續篩選處理相同數據兩次。 因此,管線中的篩選應該設計成等冪。 如需詳細資訊,請參閱 喬納森·奧利弗部落格上的等冪模式

  • 重複的訊息。 如果管線中的篩選在將訊息張貼至管線的下一個階段之後失敗,則可能會執行篩選的另一個實例,並將相同訊息的複本張貼至管線。 此案例可能會導致相同訊息的兩個實例傳遞至下一個篩選。 若要避免這個問題,管線應該偵測並排除重複的訊息。

    注意

    如果您使用消息佇列實作管線(例如 Azure 服務匯流排 佇列),消息佇列基礎結構可能會提供自動重複的訊息偵測和移除。

  • 內容和狀態。 在管線中,每個篩選基本上都會以隔離方式執行,而且不應該對叫用它的方式進行任何假設。 因此,每個篩選都應該提供足夠的內容來執行其工作。 此內容可能包含大量的狀態資訊。 如果篩選條件使用外部狀態,例如資料庫或外部記憶體中的數據,則必須考慮對效能的影響。 每個篩選條件必須載入、操作及保存該狀態,這會增加單次載入外部狀態之解決方案的額外負荷。

  • 訊息容錯。 篩選條件必須能夠容忍傳入訊息中不會運作的數據。 它們會處理與其相關的數據,並忽略其他數據,並在輸出訊息中保持不變地傳遞數據。

  • 錯誤處理 - 每個篩選都必須判斷在發生中斷錯誤時要執行的動作。 篩選條件必須判斷它是否失敗管線或傳播例外狀況。

使用此模式的時機

當下列情況時,請使用此模式:

  • 應用程式所需的處理可以輕鬆地細分成一組獨立的步驟。

  • 應用程式所執行之處理步驟有不同的延展性需求。

    注意

    您可以將應該在同一個程式中一起調整的篩選分組。 如需詳細資訊,請參閱 計算資源匯總模式

  • 您需要彈性,以允許重新排序應用程式所執行之處理步驟,或允許新增和移除步驟的功能。

  • 系統可以受益於將處理分散到不同伺服器的步驟。

  • 您需要可靠的解決方案,以在處理數據時將步驟中的失敗影響降到最低。

當下列情況時,此模式可能不實用:

  • 應用程式遵循要求-回應模式。

  • 工作處理必須完成為初始要求的一部分,例如要求/回應案例。

  • 應用程式所執行之處理步驟並不獨立,或必須在單一交易中一起執行。

  • 步驟所需的內容或狀態信息數量讓此方法效率不佳。 您可能能夠將狀態資訊保存到資料庫,但如果資料庫的額外負載造成過多爭用,則請勿使用此策略。

工作負載設計

架構設計人員應該評估管線和篩選模式如何用於其工作負載的設計,以解決 Azure 良好架構架構支柱涵蓋的目標和原則。 例如:

要素 此模式如何支援支柱目標
可靠性設計決策可協助工作負載復原到故障,並確保它會在發生失敗后復原到完全正常運作的狀態。 每個階段的單一責任可引起專注的注意力,並避免干擾交集數據處理。

- RE:01 簡單
- RE:07 背景工作

如同任何設計決策,請考慮對其他可能以此模式導入之目標的任何取捨。

範例

您可以使用一連串的訊息佇列來提供實作管線所需的基礎結構。 初始消息佇列會接收未處理的訊息,成為管道和篩選模式實作的開頭。 實作為篩選工作的元件會接聽此佇列上的訊息、執行其工作,然後將新的或已轉換的訊息張貼到序列中的下一個佇列。 另一個篩選工作可以接聽此佇列上的訊息、處理訊息、將結果張貼至另一個佇列等等,直到結束管道和篩選程式的最後一個步驟為止。 下圖說明使用消息佇列的管線:

顯示使用消息佇列之管線的圖表。

您可以使用此模式來實作影像處理管線。 如果您的工作負載取得映射,映射可能會通過一系列基本上獨立且可重新排序的篩選條件,以執行下列動作:

  • 內容仲裁
  • 調整大小
  • 浮浮水印
  • 調整
  • Exif 元數據移除
  • 內容傳遞網路 (CDN) 發行集

在此範例中,篩選條件可以實作為個別部署的 Azure Functions,甚至是包含每個篩選作為隔離部署的單一 Azure 函式應用程式。 使用 Azure Function 觸發程式、輸入系結和輸出系結可以簡化篩選程式代碼,並使用宣告檢查映像來處理的佇列型管道

此圖顯示一系列 Azure Functions 之間使用 Azure 佇列 儲存體 的影像處理管線。

以下是一個實作為 Azure 函式、從佇列 儲存體 管線觸發的範例,其中包含映射的宣告檢查,並將新的宣告檢查寫入另一個佇列 儲存體 管道可能看起來。 我們已將 實作取代為批注中的虛擬程式碼,以求簡潔。 如需更多類似程式代碼,請參閱 GitHub 上提供的管道和篩選模式 示範。

// This is the "Resize" filter. It handles claim checks from input pipe, performs the
// resize work, and places a claim check in the next pipe for anther filter to handle.
[Function(nameof(ResizeFilter))]
[QueueOutput("pipe-fjur", Connection = "pipe")]  // Destination pipe claim check
public async Task<string> RunAsync(
  [QueueTrigger("pipe-xfty", Connection = "pipe")] string imageFilePath,  // Source pipe claim check
  [BlobInput("{QueueTrigger}", Connection = "pipe")] BlockBlobClient imageBlob)  // Image to process
{
  _logger.LogInformation("Processing image {uri} for resizing.", imageBlob.Uri);

  // Idempotency checks
  // ...

  // Download image based on claim check in queue message body
  // ...
  
  // Resize the image
  // ...

  // Write resized image back to storage
  // ...

  // Create claim check for image and place in the next pipe
  // ...
  
  _logger.LogInformation("Image resizing done or not needed. Adding image {filePath} into the next pipe.", imageFilePath);
  return imageFilePath;
}

注意

Spring Integration Framework 具有管道和篩選模式的實作。

下一步

當您實作此模式時,可能會發現下列資源很有説明:

當您實作此模式時,下列模式也可能相關:

  • 宣告檢查模式。 使用佇列實作的管線可能不會保存透過篩選傳送的實際專案,而是需要處理之數據的指標。 此範例會針對儲存在 Azure Blob 儲存體 中的映像,使用 Azure 佇列中的宣告檢查 儲存體。
  • 競爭取用者模式。 管線可以包含一或多個篩選條件的多個實例。 此方法適用於執行緩慢篩選的平行實例。 它可讓系統分散負載並改善輸送量。 篩選的每個實例都會與其他實例競爭輸入,但篩選條件的兩個實例不應該能夠處理相同的數據。 本文說明方法。
  • 計算資源匯總模式。 您可以將應該一起調整為單一進程的篩選分組。 本文提供有關此策略優點和取捨的詳細資訊。
  • 補償交易模式。 您可以將篩選實作為可反轉的作業,或具有補償作業,如果發生失敗,會將狀態還原至舊版。 本文說明如何實作此模式,以維護或達成最終一致性。
  • 管道和篩選 - 企業整合模式