Azure 事件中樞 .NET 的事件處理器用戶端程式庫 - 5.8.1 版

Azure 事件中樞是可高度調整的發佈訂閱服務,每秒可以擷取數百萬個事件,並將其串流至多個取用者。 這可讓您處理和分析連線裝置和應用程式所產生的大量資料。 一旦事件中樞收集資料,您就可以使用任何即時分析提供者或批次處理/儲存體配接器來擷取、轉換及儲存資料。 如果您想要深入瞭解Azure 事件中樞,您可以檢閱:什麼是事件中樞

事件處理器用戶端程式庫是Azure 事件中樞用戶端程式庫的隨附專案,提供獨立用戶端,以強固、持久且可調整的方式取用事件,適用于大部分的生產案例。 建議使用 Azure 儲存體 Blob 所建置的具意見實作,建議使用事件處理器:

  • 大規模讀取和處理事件中樞的所有分割區,並復原暫時性失敗和間歇性網路問題。

  • 合作處理事件,其中多個處理器會動態散發並共用取用者群組內容中的責任,以正常方式管理負載,因為處理器會新增並移除群組。

  • 使用 Azure 儲存體 Blob 作為基礎資料存放區,管理以持久方式處理的檢查點和狀態。

| 原始程式碼套件 (NuGet) | API 參考檔 | 產品檔 | 疑難排解指南

開始使用

Prerequisites

  • Azure 訂用帳戶:若要使用 Azure 服務,包括Azure 事件中樞,您需要訂用帳戶。 如果您沒有現有的 Azure 帳戶,您可以在建立帳戶時註冊免費試用或使用Visual Studio 訂用帳戶權益。

  • 具有事件中樞的事件中樞命名空間:若要與Azure 事件中樞互動,您也必須提供命名空間和事件中樞。 如果您不熟悉建立 Azure 資源,建議您遵循使用 Azure 入口網站 建立事件中樞的逐步指南。 您也可以在該處找到使用 Azure CLI、Azure PowerShell或 Azure Resource Manager (ARM) 範本來建立事件中樞的詳細指示。

  • 具有 Blob 儲存體的 Azure 儲存體帳戶: 若要保存檢查點並控管 Azure 儲存體中的擁有權,您必須擁有具有 Blob 的 Azure 儲存體帳戶。 如果您不熟悉 Azure 儲存體帳戶,建議您遵循使用Azure 入口網站 建立儲存體帳戶的逐步指南。 您也可以在該處找到使用 Azure CLI、Azure PowerShell或 Azure Resource Manager (ARM) 範本來建立儲存體帳戶的詳細指示。

  • Azure 儲存體 Blob 容器: Azure 儲存體中的檢查點和擁有權資料將會寫入特定容器中的 Blob。 EventProcessorClient需要現有的容器,而且不會隱含建立容器,以協助防止意外設定錯誤。 如果您不熟悉 Azure 儲存體容器,建議您參閱 管理容器的檔。 您可以在該處找到使用 .NET、Azure CLI 或Azure PowerShell來建立容器的詳細指示。

  • C# 8.0:Azure 事件中樞用戶端程式庫會使用 C# 8.0 中引進的新功能。 若要利用 C# 8.0 語法,建議您使用 .NET Core SDK 3.0 或更新版本編譯, 語言版本latest

    想要充分利用 C# 8.0 語法的 Visual Studio 使用者必須使用 Visual Studio 2019 或更新版本。 Visual Studio 2019 (包括免費的 Community 版) 可以在這裡下載。 Visual Studio 2017 的使用者可以利用 C# 8 語法,方法是使用 Microsoft.Net.Compilers NuGet 套件 和設定語言版本,但編輯體驗可能不理想。

    您仍然可以搭配舊版 C# 語言使用程式庫,但必須手動管理非同步可列舉和非同步可處置成員,而不是受益于新的語法。 您仍然可以以 .NET Core SDK 支援的任何架構版本為目標,包括舊版 .NET Core 或 .NET Framework。 如需詳細資訊,請參閱: 如何指定目標架構

    重要注意事項: 若要在不修改的情況下建置或執行 範例範例 ,必須使用 C# 11.0。 如果您決定調整其他語言版本,您仍然可以執行範例。

若要在 Azure 中快速建立所需的資源,並接收其連接字串,您可以按一下下列命令來部署我們的範例範本:

部署至 Azure

安裝套件

使用NuGet安裝適用于 .NET 的 Azure 事件中樞 事件處理器用戶端程式庫:

dotnet add package Azure.Messaging.EventHubs.Processor

驗證用戶端

取得事件中樞連接字串

若要讓事件中樞用戶端程式庫與事件中樞互動,必須瞭解如何與其連線和授權。 最簡單的方式是使用連接字串,這會在建立事件中樞命名空間時自動建立。 如果您不熟悉搭配事件中樞使用連接字串,您可以依照逐步指南取得 事件中樞連接字串

取得 Azure 儲存體連接字串

若要讓事件處理器用戶端使用 Azure 儲存體 Blob 進行檢查點檢查,您必須瞭解如何連線到儲存體帳戶並對其授權。 最簡單的做法是使用連接字串,這是在建立儲存體帳戶時產生的連接字串。 如果您不熟悉 Azure 中的儲存體帳戶連接字串授權,建議您遵循逐步指南來 設定 Azure 儲存體連接字串

重要概念

  • 事件處理器是一種建構,用來管理與連線到指定事件中樞相關聯的責任,並在特定取用者群組的內容中處理其每個分割區的事件。 處理從分割區讀取的事件以及處理事件處理器所委派的任何錯誤動作,會委派給您提供的程式碼,讓您的邏輯專注于傳遞商業價值,而處理器處理與讀取事件相關聯的工作、管理分割區,以及允許狀態以檢查點的形式保存。

  • 檢查點處理 是一個程式,可讓讀取器標記並保存已針對分割區處理的事件位置。 檢查點是取用者的責任,而且發生在個別分割區上,通常是在特定取用者群組的內容中。 EventProcessorClient對於 ,這表示對於取用者群組和資料分割組合,處理器必須追蹤其在事件資料流程中的目前位置。 如果您想要詳細資訊,請參閱事件中樞產品檔中的 檢查點

    當事件處理器連線時,它會在先前由該取用者群組中該分割區的最後一個處理器保存的檢查點上開始讀取事件。如果有的話。 當事件處理器讀取並處理分割區中的事件時,它應該定期建立檢查點,讓下游應用程式將事件標示為「完整」,以及提供復原能力,應事件處理器或裝載失敗的環境。 如有必要,您可以透過這個檢查點程式指定先前的位移,以重新處理先前標示為「完成」的事件。

  • 資料分割是經過排序且保存在事件中樞內的事件序列。 資料分割是與事件取用者所需平行處理原則相關聯的資料組織方式。 Azure 事件中樞透過分割取用者模式來提供訊息串流,每位取用者只會讀取訊息串流的特定子集 (即資料分割)。 當較新的事件送達時,系統會將它們加入序列的結尾。 建立事件中樞時會指定資料分割數目,且無法加以變更。

  • 取用者群組可讓您檢視整個事件中樞。 取用者群組能讓多個取用應用程式擁有自己的事件串流檢視,以及按照自己的步調和從自己的位置自行讀取串流。 每一取用者群組的一個資料分割上最多可以有 5 個並行讀取器;不過,建議給定的資料分割和取用者群組配對只有一個作用中的取用者。 每個作用中的讀者都會從其資料分割收到所有事件;如果相同資料分割上有多個讀者,則其會收到重複的事件。

如需更多概念和更深入的討論,請參閱: 事件中樞功能

用戶端存留期

EventProcessorClient安全快取並使用 作為應用程式存留期的單一,這是定期讀取事件時的最佳做法。 用戶端負責有效率地管理網路、CPU 和記憶體使用量,以在閒置期間保持低使用量。 需要呼叫 StopProcessingAsyncStopProcessing 在處理器上,以確保已正確清除網路資源和其他 Unmanaged 物件。

執行緒安全

我們保證所有用戶端實例方法都是安全線程,且彼此獨立 (指導方針) 。 這可確保重複使用用戶端實例的建議一律是安全的,即使是跨執行緒也一樣。

資料模型類型,例如 EventDataEventDataBatch 不是安全線程。 它們不應該跨執行緒共用,也不應該與用戶端方法同時使用。

其他概念

用戶端選項 | 事件處理常式 | 處理失敗 | 診斷 | 模擬 (處理器) | 模擬 (用戶端類型)

範例

建立事件處理器用戶端

由於 EventProcessorClient 相依於 Azure 儲存體 Blob 以維持其狀態,因此您必須為處理器提供 BlobContainerClient,而其已針對應使用的儲存體帳戶和容器進行設定。 用來設定 的 EventProcessorClient 容器必須存在。

EventProcessorClient由於 無法知道指定不存在之容器的意圖,因此不會隱含建立容器。 這可作為防止設定錯誤的容器,導致惡意處理器無法共用擁有權,並干擾取用者群組中的其他處理器。

// The container specified when creating the BlobContainerClient must exist; it will
// not be implicitly created.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

設定事件和錯誤處理常式

若要使用 EventProcessorClient ,必須提供事件處理和錯誤的處理常式。 這些處理常式會被視為獨立式,而開發人員負責確保處理常式程式碼內的例外狀況會納入考慮。

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

async Task processEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an event.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an error.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheError(eventArgs.Exception);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

啟動和停止處理

會在 EventProcessorClient 明確啟動背景後執行其處理,並繼續執行此動作,直到明確停止為止。 雖然這可讓應用程式程式碼執行其他工作,但如果沒有執行其他工作,它也會負責確保進程不會在處理期間終止。

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}

搭配事件處理器用戶端使用 Active Directory 主體

Azure 身分識別程式庫提供 Azure Active Directory 驗證支援,可用於 Azure 用戶端程式庫,包括事件中樞和 Azure 儲存體。

若要使用 Active Directory 主體,建立事件中樞用戶端時會指定連結 Azure.Identity 庫的其中一個可用認證。 此外,會提供完整的事件中樞命名空間和所需事件中樞的名稱,而不是事件中樞連接字串。

若要搭配 Azure 儲存體 Blob 容器使用 Active Directory 主體,必須在建立儲存體用戶端時提供容器的完整 URL。 如需存取 Blob 儲存體之有效 URI 格式的詳細資料,請參閱 命名和參考容器、Blob 和中繼資料

var credential = new DefaultAzureCredential();
var blobStorageUrl ="<< FULLY-QUALIFIED CONTAINER URL (like https://myaccount.blob.core.windows.net/mycontainer) >>";

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(new Uri(blobStorageUrl), credential);

var processor = new EventProcessorClient
(
    storageClient,
    consumerGroup,
    fullyQualifiedNamespace,
    eventHubName,
    credential
);

搭配事件中樞使用 Azure Active Directory 時,您的主體必須獲指派一個角色,以允許從事件中樞讀取,例如 Azure Event Hubs Data Receiver 角色。 如需搭配事件中樞使用 Azure Active Directory 授權的詳細資訊,請參閱 相關聯的檔

搭配 Azure 儲存體使用 Azure Active Directory 時,您的主體必須獲指派一個角色,以允許讀取、寫入和刪除 Blob 的存取權,例如 Storage Blob Data Contributor 角色。 如需搭配 Azure 儲存體使用 Active Directory 授權的詳細資訊,請參閱 相關聯的檔和Azure 儲存體授權範例

疑難排解

如需詳細的疑難排解資訊,請參閱 事件中樞疑難排解指南

例外狀況處理

事件處理器用戶端例外狀況

事件處理器用戶端會在發生例外狀況時嘗試復原,並採取必要的動作以繼續處理,除非不可能這麼做。 不需要開發人員採取任何動作 ,才能進行此動作;它是處理器行為的原生部分。

為了讓開發人員有機會檢查並回應事件處理器用戶端作業內發生的例外狀況,它們會透過 ProcessError 事件呈現。 此事件的引數提供例外狀況及其觀察到內容的詳細資料。 開發人員可以從這個事件處理常式內對事件處理器用戶端執行正常作業,例如停止和/或重新開機它以回應錯誤,但可能不會影響處理器的例外狀況行為。

如需實作錯誤處理常式的基本範例,請參閱範例: 事件處理常式

事件處理常式中的例外狀況

由於事件處理器用戶端缺少適當的內容來瞭解開發人員提供的事件處理常式內例外狀況的嚴重性,因此無法假設哪些動作是合理的回應。 因此,開發人員會被視為負責在事件處理常式內發生的例外狀況,這些例外狀況會使用 try/catch 區塊和其他標準語言建構。

事件處理器用戶端不會嘗試偵測開發人員程式碼中的例外狀況,也不會明確呈現這些例外狀況。 產生的行為將取決於處理器的裝載環境,以及呼叫事件處理常式的內容。 由於這在不同案例之間可能會有所不同,因此強烈建議開發人員以防禦方式撰寫事件處理常式的程式碼,並考慮潛在的例外狀況。

記錄和診斷

事件處理器用戶端程式庫會使用 .NET EventSource 發出資訊,完整檢測各種詳細層級的資訊。 記錄會針對每個作業執行,並遵循標記作業起點、完成和發生任何例外狀況的模式。 可能提供深入解析的其他資訊也會記錄在相關聯作業的內容中。

事件處理器用戶端記錄可供任何 EventListener 使用,方法是加入宣告名為 「Azure-Messaging-EventHubs-Processor-EventProcessorClient」 的來源,或加入宣告具有 「AzureEventSource」 特性的所有來源。 為了更輕鬆地從 Azure 用戶端程式庫擷取記錄, Azure.Core 事件中樞所使用的程式庫會提供 AzureEventSourceListener 。 如需詳細資訊,請參閱 使用 AzureEventSourceListener 擷取事件中樞記錄

事件處理器程式庫也會使用 Application Insights 或 OpenTelemetry 來檢測分散式追蹤。 如需詳細資訊,請參閱 Azure.Core 診斷範例

下一步

除了討論的案例之外,Azure 事件中樞處理器程式庫還提供其他案例的支援,以協助利用 的完整功能集 EventProcessorClient 。 為了協助探索其中一些案例,事件中樞處理器用戶端程式庫提供範例專案,以作為常見案例的圖例。 如需詳細資訊,請參閱自 述檔 範例。

參與

此專案歡迎參與和提供建議。 大部分的參與都要求您同意「參與者授權合約 (CLA)」,宣告您有權且確實授與我們使用投稿的權利。 如需詳細資料,請前往 https://cla.microsoft.com

當您提交提取要求時,CLA Bot 會自動判斷您是否需要提供 CLA,並適當地裝飾 PR (例如標籤、註解)。 請遵循 bot 提供的指示。 您只需要使用我們的 CLA 在所有存放庫上執行此動作一次。

此專案採用 Microsoft Open Source Code of Conduct (Microsoft 開放原始碼管理辦法)。 如需詳細資訊,請參閱管理辦法常見問題集,如有任何其他問題或意見請連絡 opencode@microsoft.com

如需詳細資訊,請參閱我們的 參與指南

曝光數