共用方式為


事件複寫工作模式

同盟概觀複寫器函數概觀說明複寫工作的原理和基本元素,建議您先熟悉這些項目再繼續閱讀本文。

在本文中,我們會詳細指導如何實作概觀一節中所強調的幾個模式。

複寫

複寫模式會將事件從某個事件中樞複製到下一個事件中樞,或從事件中樞複製到其他目的地 (例如服務匯流排佇列)。 系統會轉送這些事件,而不會對事件裝載進行任何修改。

此模式的實作方式可參考事件中樞之間的事件複寫事件中樞與服務匯流排之間的事件複寫範例,如需了解如何將 Apache Kafka 訊息代理程式中的資料複寫至事件中樞的具體案例,請參閱使用 Apache Kafka MirrorMaker 搭配事件中樞教學課程。

資料流和順序保留

複寫 (不論透過 Azure Functions 或 Azure 串流分析) 的目的並非確保將來源事件中樞的 1:1 複製品建立至目標事件中樞,而是著重在保留應用程式所需的事件相對順序。 應用程式會依照相同的分割區索引鍵來將相關事件分組,藉此傳達這個順序,而事件中樞會在相同的分割區中依序排列具有相同分割區索引鍵的訊息

重要

「位移」資訊對每個事件中樞而言都是唯一的,而且相同事件的位移在不同事件中樞執行個體之間有所不同。 若要找出複製事件資料流中的位置,請使用以時間為基礎的位移,並參考已傳播的服務指派中繼資料

以時間為基礎的位移會在特定時間點啟動接收器:

  • EventPosition.FromStart() - 再次讀取所有保留的資料。
  • EventPosition.FromEnd() - 讀取從連線開始的所有新資料。
  • EventPosition.FromEnqueuedTime(dateTime) - 從指定的日期和時間開始的所有資料。

在 EventProcessor 中,需透過 EventProcessorOptions 上的 InitialOffsetProvider 設定位置。 使用其他接收器 API 時,會透過建構函式傳遞此位置。

以 Azure Functions 為基礎的指導中所使用的預建複寫函數協助程式以範例形式提供,可確保將從原始分割區擷取且具有相同分割區索引鍵的事件資料流,作為原始資料流中的批次提交到目標事件中樞,且具有相同分割區索引鍵。

如果來源和目標事件中樞的分割區計數相同,則目標中的所有資料流都會對應到與來源相同的分割區。 如果分割區計數不同 (這對下列內容中所述的一些深入模式來說很重要),則對應會有所不同,但資料流一律會依序保持為一體。

在目標分割區中,屬於不同資料流的事件或獨立事件 (沒有分割區索引鍵) 的相對順序,可能永遠與來源分割區不同。

服務指派的中繼資料

從來源事件中樞取得的事件之服務指派中繼資料、原始的排入佇列時間、序號和位移,都會由目標事件中樞內新的服務指派值取代,但使用協助程式函數的情況下,範例所提供的複寫工作原始值保留在使用者屬性中:repl-enqueue-time (ISO8601 字串)、repl-sequencerepl-offset

這些屬性的類型為字串,而且包含各自原始屬性的 stringified 值。 如果事件經多次轉送,則會將上一個來源的服務指派中繼資料附加至已存在的屬性,並以分號分隔值。

容錯移轉

如果使用複寫是為了進行災害復原,為了防止事件中樞服務發生區域可用性事件或網路中斷,任何這類失敗案例都需要從原來的事件中樞容錯移轉到下一個事件中樞,並告知生產者和/或取用者使用次要端點。

在所有容錯移轉案例中,會假設命名空間的必要元素結構都相同,這表示事件中樞和取用者群組的名稱相同,而且以相同方式設定共用存取簽章規則和/或角色型存取控制規則。 依照移動命名空間的指引操作,並省略清除步驟,即可建立 (並更新) 次要命名空間。

若要強制生產者和取用者切換,您必須將要使用哪個命名空間的資訊放在容易存取及更新的位置以供查閱。 如果生產者或取用者遇到頻繁或持續性的錯誤,則應查閱該位置並調整其設定。 共用該設定的方式有許多種,以下點出其中兩個方法:DNS 和檔案共用。

以 DNS 為基礎的容錯移轉設定

其中一個候選方法是在受您控制的 DNS 內保存 DNS SRV 記錄中的資訊,並指向各自的事件中樞端點。

重要

請注意,事件中樞不允許其端點直接使用 CNAME 記錄為別名,這表示您需使用 DNS 作為端點位址的復原性查閱機制,而不是直接解析 IP 位址資訊。

假設您擁有網域 example.com,以及用於應用程式的區域 test.example.com。 針對兩個替代事件中樞,您現在需建立兩個額外的巢狀區域,以及為各個區域建立一筆 SRV 記錄。

SRV 記錄遵循一般慣例,首碼為 _azure_eventhubs._amqp 並保留兩個端點記錄:一個用於連接埠 5671 上透過 TLS 傳送的 AMQP,另一個用於連接埠 443 上的透過 WebSocket 傳送的 AMQP,兩者都會指向對應至該區域命名空間的事件中樞端點。

區域 SRV 記錄
eh1.test.example.com _azure_servicebus._amqp.eh1.test.example.com
1 1 5671 eh1-test-example-com.servicebus.windows.net
2 2 443 eh1-test-example-com.servicebus.windows.net
eh2.test.example.com _azure_servicebus._amqp.eh2.test.example.com
1 1 5671 eh2-test-example-com.servicebus.windows.net
2 2 443 eh2-test-example-com.servicebus.windows.net

在您應用程式的區域中,您需建立一個 CNAME 項目,指向對應至主要事件中樞的從屬區域:

CNAME 記錄 Alias
eventhub.test.example.com eh1.test.example.com

使用明確允許查詢 CNAME 和 SRV 記錄的 DNS 用戶端 (JAVA 和 .NET 的內建用戶端只允許對 IP 位址的名稱進行簡單解析),您就可以解析所需的端點。 例如使用 DnsClient.NET 時,lookup 函數為:

static string GetEventHubName(string aliasName)
{
    const string SrvRecordPrefix = "_azure_eventhub._amqp.";
    LookupClient lookup = new LookupClient();

    return (from CNameRecord alias in (lookup.Query(aliasName, QueryType.CNAME).Answers)
            from SrvRecord srv in lookup.Query(SrvRecordPrefix + alias.CanonicalName, QueryType.SRV).Answers
            where srv.Port == 5671
            select srv.Target).FirstOrDefault()?.Value.TrimEnd('.');
}

此函數會傳回目前以 CNAME 為別名之區域的連接埠 5671 所註冊的目標主機名稱,如上所示。

執行容錯移轉需要編輯 CNAME 記錄,並將其指向替代區域。

使用 DNS (尤其是 Azure DNS) 的優點是,系統會全域複寫 Azure DNS 資訊,因此可在單一區域服務中斷時復原。

此程序類似事件中樞異地複寫災害復原的運作方式,不過您擁有完整控制權,而且也適用於主動/主動案例。

以檔案共用為基礎的容錯移轉設定

使用 DNS 來共用端點資訊最簡單的替代方式,就是將主要端點的名稱放入純文字檔案中,並從可有效防止服務中斷且仍允許更新的基礎結構提供該檔案。

如果您已經透過全球可用性和內容複寫來執行高可用性的網站基礎結構,請在該處新增這類檔案,並在需要切換時重新發佈檔案。

警告

您應該只以這種方式發佈端點名稱,而不是包含秘密的完整連接字串。

容錯移轉取用者的額外考量事項

針對事件中樞取用者,容錯移轉策略的進一步考量事項取決於事件處理器的需求。

如果發生災害而需要從備份資料重建系統 (包括資料庫),而且資料庫的資料是直接從事件中樞所保存的事件饋送,或透過這類事件的中繼處理來饋送,那麼您需還原備份,然後從資料庫備份建立的時間點開始將事件重放到系統中,而非從原始系統損毀的時間點開始。

如果失敗只會影響系統的一個配量,或只有單一事件中樞變成無法連線,您可能會想從中斷處理的相同位置繼續處理事件。

若要了解這兩個案例,並使用個別 Azure SDK 的事件處理器,您需建立新的檢查點存放區,並根據您要繼續處理的時間戳記提供初始分割區位置。

如果您仍可存取要切換中事件中樞的檢查點存放區,上述傳播的中繼資料將可協助您略過已處理的事件,並準確從您上次離開的地方繼續。

合併

合併模式有一或多個複寫工作指向一個目標,一般產生者可能也同時將事件傳送至相同的目標。

這些模式的變化如下:

  • 兩個或多個複寫函數同時從不同的來源取得事件,並將事件傳送至相同的目標。
  • 另一個複寫函數從某個來源取得事件,而生產者也同時直接使用目標。
  • 第一個模式,不過在兩個或多個事件中樞之間進行鏡像處理,無論事件在哪裡產生,都會導致這些事件中樞包含相同的資料流。

前兩個模式的變化很簡單,而且與一般複寫工作並無不同。

最後一個情況則需排除已複寫的事件,避免再次複寫。 EventHubToEventHubMerge 範例示範並說明這項技巧。

編輯器

編輯器模式是以複寫模式為基礎,但會先修改訊息再加以轉送。

這類修改的範例如下:

  • 轉碼-如果事件內容 (亦稱為「主體」或「承載」) 的來源使用 Apache Avro 格式或某些專屬序列化格式進行編碼,但是擁有目標的系統預期內容是以 JSON 編碼,則轉碼複寫工作會為轉送中的事件,將 Apache Avro 的承載還原序列化為記憶體內物件圖,然後再將該圖序列化成 JSON 格式。 轉碼也包含內容壓縮和解壓縮工作。
  • 轉換-包含結構化資料的事件可能需要重新整理這些資料,以方便下游取用者取用。 這個過程可能包含壓平合併巢狀結構、剪除沒有直接關聯的資料元素,或將裝載重新整理為完全符合指定的結構描述。
  • 批次處理-以批次方式接收來自某個來源的事件 (單次傳輸多個事件),但必須單獨轉送到目標,反之亦然。 因此,一個工作可根據單一輸入事件傳輸來轉送多個事件,也可彙總一組事件再一起傳輸。
  • 驗證-若事件資料來自外部來源,通常需要先檢查是否符合一組規則,才可加以轉送。 這些規則可以使用結構描述或程式碼來表示。 不合規的事件可能會遭捨棄並在記錄中註明問題,或可能會轉送到特殊目標目的地,以待進一步處理。
  • 擴充-來自某些來源的事件資料可能需要提供進一步內容加以擴充,才能在目標系統中使用。 這個過程可能包含查閱參考資料,並透過事件內嵌這些資料,或加入複寫工作已知但不包含在事件中的來源相關資訊。
  • 篩選-可能必須根據某些規則從目標中清除來自某個來源的部分事件。 篩選條件會根據規則來測試事件,並在事件不符合規則時卸除事件。 篩選的形式為觀察特定條件並卸除具有相同值的後續事件,藉此篩選掉重複的事件。
  • 加密-複寫工作可能必須解密來自來源的內容,以及/或加密轉送至目標的內容,且/或可能必須驗證內容和中繼資料的完整性 (相對於事件中所附的簽章),或附加這類簽章。
  • 證明-複寫工作可將中繼資料 (可能受數位簽章保護) 附加至證明事件已透過特定通道或於特定時間接收的事件。
  • 鏈結-複寫工作可將簽章套用至事件的資料流,讓資料流的完整性受到保護,而且可以偵測到遺失的事件。

轉換、批次處理和擴充模式通常最適合使用 Azure 串流分析作業來實作。

您可以使用 Azure Functions,透過事件中樞觸發程序來取得事件和事件中樞輸出繫結來傳遞這些模式,以實作所有這些模式。

路由

路由模式是以複寫模式為基礎,但不是具有一個來源和一個目標,而是複寫作業具有多個目標,底下以 C# 說明:

[FunctionName("EH2EH")]
public static async Task Run(
    [EventHubTrigger("source", Connection = "EventHubConnectionAppSetting")] EventData[] events,
    [EventHub("dest1", Connection = "EventHubConnectionAppSetting")] EventHubClient output1,
    [EventHub("dest2", Connection = "EventHubConnectionAppSetting")] EventHubClient output2,
    ILogger log)
{
    foreach (EventData eventData in events)
    {
        // send to output1 and/or output2 based on criteria
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output1, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2==0 ) ? inputEvent : null;
        });
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output2, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2!=0 ) ? inputEvent : null;
        });
    }
}

路由函數會考慮訊息中繼資料和/或訊息承載,然後挑選其中一個可用目的地來傳送。

在 Azure 串流分析中,定義多個輸出,然後針對每個輸出執行查詢,即可達到相同目的。

select * into dest1Output from inputSource where Info = 1
select * into dest2Output from inputSource where Info = 2

記錄投影

記錄投影模式會將事件資料流壓平合併到索引資料庫,而事件會變成資料庫中的記錄。 一般情況下,事件會新增至相同的集合或資料表中,而事件中樞分割區索引鍵會成為主要索引鍵的一部分,讓記錄變成唯一的。

記錄投影可以產生事件資料的時間序列歸檔工具或壓縮的檢視,因此每個分割區索引鍵只會保留最新的事件。 目標資料庫的圖形最終會取決於您和您應用程式的需求。 此模式也稱為「事件來源」。

提示

您可以輕鬆在 Azure 串流分析中,將記錄投影建立至 Azure SQL DatabaseAzure Cosmos DB,而且您應該偏好使用該選項。

下列 Azure 函式會將壓縮的事件中樞內容投影到 Azure CosmosDB 集合中。

[FunctionName("Eh1ToCosmosDb1Json")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static async Task Eh1ToCosmosDb1Json(
    [EventHubTrigger("eh1", ConsumerGroup = "Eh1ToCosmosDb1", Connection = "Eh1ToCosmosDb1-source-connection")] EventData[] input,
    [CosmosDB(databaseName: "SampleDb", collectionName: "foo", ConnectionStringSetting = "CosmosDBConnection")] IAsyncCollector<object> output,
    ILogger log)
{
    foreach (var ev in input)
    {
        if (!string.IsNullOrEmpty(ev.SystemProperties.PartitionKey))
        {
            var record = new
            {
                id = ev.SystemProperties.PartitionKey,
                data = JsonDocument.Parse(ev.Body),
                properties = ev.Properties
            };
            await output.AddAsync(record);
        }
    }
}

下一步