將執行複雜處理的工作分解成一系列可重複使用的不同元素。 這樣做可以藉由允許獨立部署和調整處理的工作元素,來改善初始步驟的效能、延展性和重複使用性。 管道和篩選模式支援高階模組化。
內容和問題
您有需要處理的循序工作管線。 實作此應用程式的簡單但不靈活方法是在整合型模組中執行此處理。 不過,這種方法可能會減少重構程式代碼、優化程式代碼的機會,或在應用程式中其他地方需要相同處理部分時重複使用程序代碼。
下圖說明使用整合型方法處理數據的其中一個問題,即無法跨多個管線重複使用程序代碼。 在此範例中,應用程式會接收及處理來自兩個來源的數據。 個別的模組會執行一系列工作來轉換數據,再將結果傳遞至應用程式的商業規則,以處理每個來源的數據。
整合型模組執行的一些工作在功能上很類似,但程式代碼必須在模組中重複,而且很可能在其模組內緊密結合。 除了無法重複使用邏輯之外,此方法也會在需求變更時帶來風險。 您必須記得在這兩個地方更新程序代碼。
整合型實作與多個管線無關或重複使用還有其他挑戰。 使用整合型時,您無法在不同的環境中執行特定工作,或獨立調整它們。 某些工作可能會需要大量計算,並受益於在功能強大的硬體上執行,或平行執行多個實例。 其他工作可能沒有相同的需求。 此外,使用整合型,重新排序工作或在管線中插入新工作是一項挑戰。 這些變更需要重新測試整個管線。
解決方案
將每個數據流所需的處理分解成一組個別元件(或篩選),每個元件都會執行單一工作。 複合工作應該使用多個篩選,而不是一個篩選。 篩選條件會透過使用管道連接篩選來組成管線。 篩選條件是獨立、獨立且通常無狀態。 篩選會從輸入管道接收訊息,並將訊息發佈至不同的輸出管道。 篩選條件可以轉換訊息,或針對一或多個準則進行測試,以包含條件式邏輯。 管道不會執行路由或任何其他邏輯。 它們只會連接篩選,將輸出訊息從一個篩選條件傳遞為輸入至下一個篩選。
篩選會獨立運作,且不知道其他篩選。 他們只會知道其輸入和輸出架構。 因此,只要任何篩選的輸入架構符合前一個篩選的輸出架構,就可以依任何順序排列篩選條件。 針對所有篩選使用標準化架構可增強重新排序篩選的能力。 管道和篩選架構鼓勵重複使用組合。
篩選條件的鬆散結合可讓您輕鬆:
- 建立由現有篩選所組成的新管線
- 更新或取代個別篩選中的邏輯
- 必要時重新排序篩選
- 視需要對不同硬體執行篩選
- 平行執行篩選
下圖顯示使用管道和篩選實作的解決方案:
處理單一要求所需的時間取決於管線中最慢的篩選條件速度。 一或多個篩選可能是瓶頸,特別是當來自特定數據源的數據流中出現大量要求時。 執行慢速篩選器平行實例的能力可讓系統分散負載並改善輸送量。
在不同計算實例上執行篩選的能力,可讓它們獨立調整,並利用許多雲端環境所提供的彈性。 需要大量計算的篩選器可以在高效能硬體上執行,而其他需求較低的篩選器則可裝載在成本較低的商品硬體上。 篩選甚至不需要位於相同的數據中心或地理位置,讓管線中的每個元素都能在接近所需資源的環境中執行。 這些工作需要特定的設計技術,例如傳訊、多線程等等,以最大化每個管道或篩選的彈性。 此圖顯示從來源 1 套用至資料管線的範例:
如果篩選的輸入和輸出結構化為數據流,您可以平行執行每個篩選的處理。 管線中的第一個篩選可以啟動其工作並輸出其結果,其結果會在第一個篩選完成其工作之前,直接傳遞至序列中的下一個篩選。
使用管道和篩選模式與 補償交易模式 是實作分散式交易的替代方法。 您可以將分散式交易分成個別的可補償工作,每個工作都可以透過實作補償交易模式的篩選來實作。 您可以將管線中的篩選實作為個別裝載的工作,這些工作會接近其維護的數據。
問題和考量
當您決定如何實作此模式時,請考慮下列幾點:
單體性質。 此模式通常會實作為整合型管線,因此對於任何變更,整個篩選鏈結都應該以端對端測試。 此外,必須考慮整個程式的容錯;如果篩選或管道失敗,整個管線可能會失敗。
複雜度。 此模式所提供的彈性增加也可能會帶來複雜度,尤其是在管線中的篩選分散到不同的伺服器時。
可靠性。 使用基礎結構,確保管線中的篩選之間流動的數據不會遺失。
等冪性。 如果管線中的篩選在收到訊息後失敗,並將工作重新排程至篩選的另一個實例,部分工作可能已經完成。 如果工作更新全域狀態的某些層面(例如儲存在資料庫中的資訊),可能會重複單一更新。 如果篩選在將結果張貼到下一個篩選之後失敗,但在它指出它順利完成其工作之前,可能會發生類似的問題。 在這些情況下,篩選的另一個實例可能會重複這項工作,導致相同的結果張貼兩次。 此案例可能會導致管線中的後續篩選處理相同數據兩次。 因此,管線中的篩選應該設計成等冪。 如需詳細資訊,請參閱 喬納森·奧利弗部落格上的等冪模式 。
重複的訊息。 如果管線中的篩選在將訊息張貼至管線的下一個階段之後失敗,則可能會執行篩選的另一個實例,並將相同訊息的複本張貼至管線。 此案例可能會導致相同訊息的兩個實例傳遞至下一個篩選。 若要避免這個問題,管線應該偵測並排除重複的訊息。
注意
如果您使用消息佇列實作管線(例如 Azure 服務匯流排 佇列),消息佇列基礎結構可能會提供自動重複的訊息偵測和移除。
內容和狀態。 在管線中,每個篩選基本上都會以隔離方式執行,而且不應該對叫用它的方式進行任何假設。 因此,每個篩選都應該提供足夠的內容來執行其工作。 此內容可能包含大量的狀態資訊。 如果篩選條件使用外部狀態,例如資料庫或外部記憶體中的數據,則必須考慮對效能的影響。 每個篩選條件必須載入、操作及保存該狀態,這會增加單次載入外部狀態之解決方案的額外負荷。
訊息容錯。 篩選條件必須能夠容忍傳入訊息中不會運作的數據。 它們會處理與其相關的數據,並忽略其他數據,並在輸出訊息中保持不變地傳遞數據。
錯誤處理 - 每個篩選都必須判斷在發生中斷錯誤時要執行的動作。 篩選條件必須判斷它是否失敗管線或傳播例外狀況。
使用此模式的時機
當下列情況時,請使用此模式:
應用程式所需的處理可以輕鬆地細分成一組獨立的步驟。
應用程式所執行之處理步驟有不同的延展性需求。
注意
您可以將應該在同一個程式中一起調整的篩選分組。 如需詳細資訊,請參閱 計算資源匯總模式。
您需要彈性,以允許重新排序應用程式所執行之處理步驟,或允許新增和移除步驟的功能。
系統可以受益於將處理分散到不同伺服器的步驟。
您需要可靠的解決方案,以在處理數據時將步驟中的失敗影響降到最低。
當下列情況時,此模式可能不實用:
應用程式遵循要求-回應模式。
工作處理必須完成為初始要求的一部分,例如要求/回應案例。
應用程式所執行之處理步驟並不獨立,或必須在單一交易中一起執行。
步驟所需的內容或狀態信息數量讓此方法效率不佳。 您可能能夠將狀態資訊保存到資料庫,但如果資料庫的額外負載造成過多爭用,則請勿使用此策略。
工作負載設計
架構設計人員應該評估管線和篩選模式如何用於其工作負載的設計,以解決 Azure 良好架構架構支柱中涵蓋的目標和原則。 例如:
要素 | 此模式如何支援支柱目標 |
---|---|
可靠性設計決策可協助工作負載復原到故障,並確保它會在發生失敗后復原到完全正常運作的狀態。 | 每個階段的單一責任可引起專注的注意力,並避免干擾交集數據處理。 - RE:01 簡單 - RE:07 背景工作 |
如同任何設計決策,請考慮對其他可能以此模式導入之目標的任何取捨。
範例
您可以使用一連串的訊息佇列來提供實作管線所需的基礎結構。 初始消息佇列會接收未處理的訊息,成為管道和篩選模式實作的開頭。 實作為篩選工作的元件會接聽此佇列上的訊息、執行其工作,然後將新的或已轉換的訊息張貼到序列中的下一個佇列。 另一個篩選工作可以接聽此佇列上的訊息、處理訊息、將結果張貼至另一個佇列等等,直到結束管道和篩選程式的最後一個步驟為止。 下圖說明使用消息佇列的管線:
您可以使用此模式來實作影像處理管線。 如果您的工作負載取得映射,映射可能會通過一系列基本上獨立且可重新排序的篩選條件,以執行下列動作:
- 內容仲裁
- 調整大小
- 浮浮水印
- 調整
- Exif 元數據移除
- 內容傳遞網路 (CDN) 發行集
在此範例中,篩選條件可以實作為個別部署的 Azure Functions,甚至是包含每個篩選作為隔離部署的單一 Azure 函式應用程式。 使用 Azure Function 觸發程式、輸入系結和輸出系結可以簡化篩選程式代碼,並使用宣告檢查映像來處理的佇列型管道。
以下範例顯示一個篩選實作為 Azure 函式,從具有映射宣告檢查的佇列記憶體管道觸發,並將新的宣告檢查寫入另一個佇列記憶體管道可能看起來像這樣。 我們已將 實作取代為批注中的虛擬程式碼,以求簡潔。 如需更多類似程式代碼,請參閱 GitHub 上提供的管道和篩選模式 示範。
// This is the "Resize" filter. It handles claim checks from input pipe, performs the
// resize work, and places a claim check in the next pipe for anther filter to handle.
[Function(nameof(ResizeFilter))]
[QueueOutput("pipe-fjur", Connection = "pipe")] // Destination pipe claim check
public async Task<string> RunAsync(
[QueueTrigger("pipe-xfty", Connection = "pipe")] string imageFilePath, // Source pipe claim check
[BlobInput("{QueueTrigger}", Connection = "pipe")] BlockBlobClient imageBlob) // Image to process
{
_logger.LogInformation("Processing image {uri} for resizing.", imageBlob.Uri);
// Idempotency checks
// ...
// Download image based on claim check in queue message body
// ...
// Resize the image
// ...
// Write resized image back to storage
// ...
// Create claim check for image and place in the next pipe
// ...
_logger.LogInformation("Image resizing done or not needed. Adding image {filePath} into the next pipe.", imageFilePath);
return imageFilePath;
}
注意
Spring Integration Framework 具有管道和篩選模式的實作。
下一步
當您實作此模式時,可能會發現下列資源很有説明:
相關資源
當您實作此模式時,下列模式也可能相關:
- 宣告檢查模式。 使用佇列實作的管線可能不會保存透過篩選傳送的實際專案,而是需要處理之數據的指標。 此範例會針對儲存在 Azure Blob 儲存體 中的映像,使用 Azure 佇列記憶體中的宣告檢查。
- 競爭取用者模式。 管線可以包含一或多個篩選條件的多個實例。 此方法適用於執行緩慢篩選的平行實例。 它可讓系統分散負載並改善輸送量。 篩選的每個實例都會與其他實例競爭輸入,但篩選條件的兩個實例不應該能夠處理相同的數據。 本文說明方法。
- 計算資源匯總模式。 您可以將應該一起調整為單一進程的篩選分組。 本文提供有關此策略優點和取捨的詳細資訊。
- 補償交易模式。 您可以將篩選實作為可反轉的作業,或具有補償作業,如果發生失敗,會將狀態還原至舊版。 本文說明如何實作此模式,以維護或達成最終一致性。
- 管道和篩選 - 企業整合模式。