管道和篩選模式

Azure Blob 儲存體
Azure Functions
Azure 佇列儲存體

將執行複雜處理的工作分解成一系列可重複使用的不同元素。 這樣做可以藉由允許獨立部署和調整處理的工作元素,來改善初始步驟的效能、延展性和重複使用性。 管道和篩選模式支援高階模組化。

內容和問題

您有需要處理的循序工作管線。 實作此應用程式的簡單但不靈活方法是在整合型模組中執行此處理。 不過,這種方法可能會減少重構程式代碼、優化程式代碼的機會,或在應用程式中其他地方需要相同處理部分時重複使用程序代碼。

下圖說明使用整合型方法處理數據的其中一個問題,即無法跨多個管線重複使用程序代碼。 在此範例中,應用程式會接收及處理來自兩個來源的數據。 個別的模組會執行一系列工作來轉換數據,再將結果傳遞至應用程式的商業規則,以處理每個來源的數據。

此圖顯示使用整合型模組實作的解決方案。

整合型模組執行的一些工作在功能上很類似,但程式代碼必須在模組中重複,而且很可能在其模組內緊密結合。 除了無法重複使用邏輯之外,此方法也會在需求變更時帶來風險。 您必須記得在這兩個地方更新程序代碼。

整合型實作與多個管線無關或重複使用還有其他挑戰。 使用整合型時,您無法在不同的環境中執行特定工作,或獨立調整它們。 某些工作可能會需要大量計算,並受益於在功能強大的硬體上執行,或平行執行多個實例。 其他工作可能沒有相同的需求。 此外,使用整合型,重新排序工作或在管線中插入新工作是一項挑戰。 這些變更需要重新測試整個管線。

解決方案

將每個數據流所需的處理分解成一組個別元件(或篩選),每個元件都會執行單一工作。 複合工作應該使用多個篩選,而不是一個篩選。 篩選條件會透過使用管道連接篩選來組成管線。 篩選條件是獨立、獨立且通常無狀態。 篩選會從輸入管道接收訊息,並將訊息發佈至不同的輸出管道。 篩選條件可以轉換訊息,或針對一或多個準則進行測試,以包含條件式邏輯。 管道不會執行路由或任何其他邏輯。 它們只會連接篩選,將輸出訊息從一個篩選條件傳遞為輸入至下一個篩選。

篩選會獨立運作,且不知道其他篩選。 他們只會知道其輸入和輸出架構。 因此,只要任何篩選的輸入架構符合前一個篩選的輸出架構,就可以依任何順序排列篩選條件。 針對所有篩選使用標準化架構可增強重新排序篩選的能力。 管道和篩選架構鼓勵重複使用組合。

篩選條件的鬆散結合可讓您輕鬆:

  • 建立由現有篩選所組成的新管線
  • 更新或取代個別篩選中的邏輯
  • 必要時重新排序篩選
  • 視需要對不同硬體執行篩選
  • 平行執行篩選

下圖顯示使用管道和篩選實作的解決方案:

此圖顯示使用管道和篩選實作的解決方案。

處理單一要求所需的時間取決於管線中最慢的篩選條件速度。 一或多個篩選可能是瓶頸,特別是當來自特定數據源的數據流中出現大量要求時。 執行慢速篩選器平行實例的能力可讓系統分散負載並改善輸送量。

在不同計算實例上執行篩選的能力,可讓它們獨立調整,並利用許多雲端環境所提供的彈性。 需要大量計算的篩選器可以在高效能硬體上執行,而其他需求較低的篩選器則可裝載在成本較低的商品硬體上。 篩選甚至不需要位於相同的數據中心或地理位置,讓管線中的每個元素都能在接近所需資源的環境中執行。 這些工作需要特定的設計技術,例如傳訊、多線程等等,以最大化每個管道或篩選的彈性。 此圖顯示從來源 1 套用至資料管線的範例:

此圖顯示從來源 1 套用至數據管線的範例。

如果篩選的輸入和輸出結構化為數據流,您可以平行執行每個篩選的處理。 管線中的第一個篩選可以啟動其工作並輸出其結果,其結果會在第一個篩選完成其工作之前,直接傳遞至序列中的下一個篩選。

使用管道和篩選模式與 補償交易模式 是實作分散式交易的替代方法。 您可以將分散式交易分成個別的可補償工作,每個工作都可以透過實作補償交易模式的篩選來實作。 您可以將管線中的篩選實作為個別裝載的工作,這些工作會接近其維護的數據。

問題和考量

當您決定如何實作此模式時,請考慮下列幾點:

  • 單體性質。 此模式通常會實作為整合型管線,因此對於任何變更,整個篩選鏈結都應該以端對端測試。 此外,必須考慮整個程式的容錯;如果篩選或管道失敗,整個管線可能會失敗。

  • 複雜度。 此模式所提供的彈性增加也可能會帶來複雜度,尤其是在管線中的篩選分散到不同的伺服器時。

  • 可靠性。 使用基礎結構,確保管線中的篩選之間流動的數據不會遺失。

  • 等冪性。 如果管線中的篩選在收到訊息後失敗,並將工作重新排程至篩選的另一個實例,部分工作可能已經完成。 如果工作更新全域狀態的某些層面(例如儲存在資料庫中的資訊),可能會重複單一更新。 如果篩選在將結果張貼到下一個篩選之後失敗,但在它指出它順利完成其工作之前,可能會發生類似的問題。 在這些情況下,篩選的另一個實例可能會重複這項工作,導致相同的結果張貼兩次。 此案例可能會導致管線中的後續篩選處理相同數據兩次。 因此,管線中的篩選應該設計成等冪。 如需詳細資訊,請參閱 喬納森·奧利弗部落格上的等冪模式

  • 重複的訊息。 如果管線中的篩選在將訊息張貼至管線的下一個階段之後失敗,則可能會執行篩選的另一個實例,並將相同訊息的複本張貼至管線。 此案例可能會導致相同訊息的兩個實例傳遞至下一個篩選。 若要避免這個問題,管線應該偵測並排除重複的訊息。

    注意

    如果您使用消息佇列實作管線(例如 Azure 服務匯流排 佇列),消息佇列基礎結構可能會提供自動重複的訊息偵測和移除。

  • 內容和狀態。 在管線中,每個篩選基本上都會以隔離方式執行,而且不應該對叫用它的方式進行任何假設。 因此,每個篩選都應該提供足夠的內容來執行其工作。 此內容可能包含大量的狀態資訊。 如果篩選條件使用外部狀態,例如資料庫或外部記憶體中的數據,則必須考慮對效能的影響。 每個篩選條件必須載入、操作及保存該狀態,這會增加單次載入外部狀態之解決方案的額外負荷。

  • 訊息容錯。 篩選條件必須能夠容忍傳入訊息中不會運作的數據。 它們會處理與其相關的數據,並忽略其他數據,並在輸出訊息中保持不變地傳遞數據。

  • 錯誤處理 - 每個篩選都必須判斷在發生中斷錯誤時要執行的動作。 篩選條件必須判斷它是否失敗管線或傳播例外狀況。

使用此模式的時機

當下列情況時,請使用此模式:

  • 應用程式所需的處理可以輕鬆地細分成一組獨立的步驟。

  • 應用程式所執行之處理步驟有不同的延展性需求。

    注意

    您可以將應該在同一個程式中一起調整的篩選分組。 如需詳細資訊,請參閱 計算資源匯總模式

  • 您需要彈性,以允許重新排序應用程式所執行之處理步驟,或允許新增和移除步驟的功能。

  • 系統可以受益於將處理分散到不同伺服器的步驟。

  • 您需要可靠的解決方案,以在處理數據時將步驟中的失敗影響降到最低。

當下列情況時,此模式可能不實用:

  • 應用程式遵循要求-回應模式。

  • 工作處理必須完成為初始要求的一部分,例如要求/回應案例。

  • 應用程式所執行之處理步驟並不獨立,或必須在單一交易中一起執行。

  • 步驟所需的內容或狀態信息數量讓此方法效率不佳。 您可能能夠將狀態資訊保存到資料庫,但如果資料庫的額外負載造成過多爭用,則請勿使用此策略。

工作負載設計

架構設計人員應該評估管線和篩選模式如何用於其工作負載的設計,以解決 Azure 良好架構架構支柱涵蓋的目標和原則。 例如:

要素 此模式如何支援支柱目標
可靠性設計決策可協助工作負載復原到故障,並確保它會在發生失敗后復原到完全正常運作的狀態。 每個階段的單一責任可引起專注的注意力,並避免干擾交集數據處理。

- RE:01 簡單
- RE:07 背景工作

如同任何設計決策,請考慮對其他可能以此模式導入之目標的任何取捨。

範例

您可以使用一連串的訊息佇列來提供實作管線所需的基礎結構。 初始消息佇列會接收未處理的訊息,成為管道和篩選模式實作的開頭。 實作為篩選工作的元件會接聽此佇列上的訊息、執行其工作,然後將新的或已轉換的訊息張貼到序列中的下一個佇列。 另一個篩選工作可以接聽此佇列上的訊息、處理訊息、將結果張貼至另一個佇列等等,直到結束管道和篩選程式的最後一個步驟為止。 下圖說明使用消息佇列的管線:

顯示使用消息佇列之管線的圖表。

您可以使用此模式來實作影像處理管線。 如果您的工作負載取得映射,映射可能會通過一系列基本上獨立且可重新排序的篩選條件,以執行下列動作:

  • 內容仲裁
  • 調整大小
  • 浮浮水印
  • 調整
  • Exif 元數據移除
  • 內容傳遞網路 (CDN) 發行集

在此範例中,篩選條件可以實作為個別部署的 Azure Functions,甚至是包含每個篩選作為隔離部署的單一 Azure 函式應用程式。 使用 Azure Function 觸發程式、輸入系結和輸出系結可以簡化篩選程式代碼,並使用宣告檢查映像來處理的佇列型管道

此圖顯示一系列 Azure Functions 之間使用 Azure 佇列記憶體的影像處理管線。

以下範例顯示一個篩選實作為 Azure 函式,從具有映射宣告檢查的佇列記憶體管道觸發,並將新的宣告檢查寫入另一個佇列記憶體管道可能看起來像這樣。 我們已將 實作取代為批注中的虛擬程式碼,以求簡潔。 如需更多類似程式代碼,請參閱 GitHub 上提供的管道和篩選模式 示範。

// This is the "Resize" filter. It handles claim checks from input pipe, performs the
// resize work, and places a claim check in the next pipe for anther filter to handle.
[Function(nameof(ResizeFilter))]
[QueueOutput("pipe-fjur", Connection = "pipe")]  // Destination pipe claim check
public async Task<string> RunAsync(
  [QueueTrigger("pipe-xfty", Connection = "pipe")] string imageFilePath,  // Source pipe claim check
  [BlobInput("{QueueTrigger}", Connection = "pipe")] BlockBlobClient imageBlob)  // Image to process
{
  _logger.LogInformation("Processing image {uri} for resizing.", imageBlob.Uri);

  // Idempotency checks
  // ...

  // Download image based on claim check in queue message body
  // ...
  
  // Resize the image
  // ...

  // Write resized image back to storage
  // ...

  // Create claim check for image and place in the next pipe
  // ...
  
  _logger.LogInformation("Image resizing done or not needed. Adding image {filePath} into the next pipe.", imageFilePath);
  return imageFilePath;
}

注意

Spring Integration Framework 具有管道和篩選模式的實作。

下一步

當您實作此模式時,可能會發現下列資源很有説明:

當您實作此模式時,下列模式也可能相關:

  • 宣告檢查模式。 使用佇列實作的管線可能不會保存透過篩選傳送的實際專案,而是需要處理之數據的指標。 此範例會針對儲存在 Azure Blob 儲存體 中的映像,使用 Azure 佇列記憶體中的宣告檢查。
  • 競爭取用者模式。 管線可以包含一或多個篩選條件的多個實例。 此方法適用於執行緩慢篩選的平行實例。 它可讓系統分散負載並改善輸送量。 篩選的每個實例都會與其他實例競爭輸入,但篩選條件的兩個實例不應該能夠處理相同的數據。 本文說明方法。
  • 計算資源匯總模式。 您可以將應該一起調整為單一進程的篩選分組。 本文提供有關此策略優點和取捨的詳細資訊。
  • 補償交易模式。 您可以將篩選實作為可反轉的作業,或具有補償作業,如果發生失敗,會將狀態還原至舊版。 本文說明如何實作此模式,以維護或達成最終一致性。
  • 管道和篩選 - 企業整合模式