在本快速入門中,您將瞭解如何使用 azure-messaging-eventhubs Java 套件,將事件傳送至 Azure 事件中樞並從中接收事件。
小提示
如果您在 Spring 應用程式中使用 Azure 事件中樞資源,建議您將 Spring Cloud Azure 視為替代方案。 Spring Cloud Azure 是開放原始碼專案,可將 Spring 和 Azure 服務無縫整合。 若要深入瞭解 Spring Cloud Azure,以及查看使用事件中樞的範例,請參閱 使用 Azure 事件中樞的 Spring Cloud Stream。
先決條件
如果您對 Azure 事件中樞並不熟悉,在進行此快速入門之前,請先參閱事件中樞概述。
若要完成本快速入門,您需要下列必要條件:
- Microsoft Azure 訂用帳戶。 若要使用 Azure 服務 (包括 Azure 事件中樞),您需要訂用帳戶。 如果您沒有現有的 Azure 帳戶,您可以申請免費試用,或是在建立帳戶時使用 MSDN 訂閱者權益。
- Java 開發環境。 本快速入門使用 Eclipse。 需要具有 8 版或更新版本的 Java 開發工具套件 (JDK)。
- 建立事件中樞命名空間和事件中樞。 第一個步驟是使用 Azure 入口網站 來建立事件中樞類型的命名空間,並取得應用程式與事件中樞通訊所需的管理認證。 若要建立命名空間和事件中樞,請依照這篇文章中的程序操作。 然後,依照下列文章中的指示取得 事件中樞命名空間的連接字串 : 取得連接字串。 稍後您會在本快速入門中使用此連接字串。
傳送事件
本節說明如何建立Java應用程式來傳送事件到事件集線器。
新增 Azure 事件中樞連結庫的參考
首先,在慣用的 Java 開發環境中,為主控台/殼層應用程式建立新的 Maven 專案。 請更新pom.xml
檔案,如下所示。 適用於事件中樞的 Java 用戶端連結庫可在 Maven 中央存放庫中取得。
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.20.2</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.16.1</version>
<scope>compile</scope>
</dependency>
備註
將版本更新為發行至 Maven 存放庫的最新版本。
向 Azure 驗證應用程式
本快速入門顯示兩種連線到 Azure 事件中樞的方式:無密碼和連接字串。 第一個選項如何使用 Microsoft Entra ID 和顯示角色型存取控制 (RBAC) 中的安全性主體來連線到事件中樞命名空間。 您不需要擔心在程式碼或設定檔或 Azure Key Vault 等安全儲存體中,有硬式編碼連接字串。 第二個選項顯示如何使用連接字串來連線到事件中樞命名空間。 如果您不熟悉 Azure,則連接字串選項可能會更容易遵循。 建議在真實世界應用程式和實際執行環境中使用無密碼選項。 如需詳細資訊,請參閱驗證與授權。 您也可以在概觀頁面上,深入了解無密碼驗證。
將角色指派給 Microsoft Entra 使用者
在本機開發時,請確定連線到 Azure 事件中樞的使用者帳戶具有正確的權限。 您需要 Azure 事件中樞資料擁有者角色,才能傳送和接收訊息。 若要將此角色指派給您自己,您需要使用者存取管理員角色,或另一個包含 Microsoft.Authorization/roleAssignments/write
動作的角色。 您可以使用 Azure 入口網站、Azure CLI 或 Azure PowerShell,將 Azure RBAC 角色指派給使用者。 您可以在範圍概觀頁面上,深入了解角色指派的可用範圍。
下列範例會將 Azure Event Hubs Data Owner
角色指派給您的使用者帳戶,該角色提供對 Azure 事件中樞資源的完整存取權。 在實際案例中,遵循最低權限原則,只為使用者提供更安全實際執行環境所需的最低權限。
Azure 事件中樞的 Azure 內建角色
對於 Azure 事件中樞來說,透過 Azure 入口網站和 Azure 資源管理 API 來管理的命名空間和所有相關資源,皆已使用 Azure RBAC 模型來加以保護。 Azure 提供下列 Azure 內建角色,以授權存取事件中樞命名空間:
- Azure 事件中樞資料擁有者:允許資料存取事件中樞命名空間及其實體 (佇列、主題、訂用帳戶和篩選)
- Azure 事件中樞資料傳送者:使用此角色可讓傳送者存取事件中樞命名空間及其實體。
- Azure 事件中樞資料接收者:使用此角色可讓接收者存取事件中樞命名空間及其實體。
如果您想要建立自訂角色,請參閱事件中樞作業所需的權限。
這很重要
在大部分情況下,角色指派在 Azure 中傳播只需要一兩分鐘。 在少數情況下,可能需要長達八分鐘的時間。 如果您第一次執行程式碼時收到驗證錯誤,請稍候片刻再試一次。
在 Azure 入口網站中,使用主要搜尋列或左側導覽找出您的事件中樞命名空間。
在概觀頁面上,從左側功能表中選取 [存取控制 (IAM)]。
在 [存取控制 (IAM)] 頁面上,選取 [角色指派] 索引標籤。
從頂端功能表選取 [+ 新增],然後從產生的下拉功能表中選取 [新增角色指派]。
使用搜尋方塊,從結果篩選出所需的角色。 在此範例中,搜尋
Azure Event Hubs Data Owner
並選取相符的結果。 接著,選擇 [下一步]。在 [存取權指派對象為] 下,選取 [使用者、群組或服務主體],然後選擇 [+ 選取成員]。
在對話方塊中,搜尋 Microsoft Entra 使用者名稱 (通常是您的 user@domain 電子郵件地址),然後在對話方塊底部選擇 [選取]。
選取 [檢閱 + 指派] 以移至最終頁面,然後再次選取 [檢閱 + 指派] 以完成此程序。
撰寫程式代碼以將訊息傳送至事件中樞
新增名為 Sender
的類別,並將下列程式代碼新增至 類別:
這很重要
- 將
<NAMESPACE NAME>
更新為事件中樞命名空間的名稱。 - 使用事件中樞的名稱進行更新
<EVENT HUB NAME>
。
package ehubquickstart;
import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;
import com.azure.identity.*;
public class SenderAAD {
// replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
// Example: private static final String namespaceName = "contosons.servicebus.windows.net";
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
// Replace <EVENT HUB NAME> with the name of your event hub.
// Example: private static final String eventHubName = "ordersehub";
private static final String eventHubName = "<EVENT HUB NAME>";
public static void main(String[] args) {
publishEvents();
}
/**
* Code sample for publishing events.
* @throws IllegalArgumentException if the EventData is bigger than the max batch size.
*/
public static void publishEvents() {
// create a token using the default Azure credential
DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
.authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
.build();
// create a producer client
EventHubProducerClient producer = new EventHubClientBuilder()
.fullyQualifiedNamespace(namespaceName)
.eventHubName(eventHubName)
.credential(credential)
.buildProducerClient();
// sample events in an array
List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
for (EventData eventData : allEvents) {
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
}
建置程式,並確定沒有任何錯誤。 執行接收者程序之後,您將執行此程式。
接收事件
本教學課程中的程式代碼是以 GitHub 上的 EventProcessorClient 範例為基礎,您可以檢查此範例以查看完整的工作應用程式。
使用 Azure Blob 記憶體做為檢查點存放區時,請遵循這些建議:
- 針對每個取用者群組使用不同的容器。 您可以使用相同的儲存體帳戶,但每個群組各使用一個容器。
- 請勿將容器用於其他任何項目,也不會將儲存體帳戶用於其他任何項目。
- 儲存體帳戶應位於與已部署應用程式所在的相同區域中。 如果應用程式是內部部署,請嘗試選擇最接近的區域。
在 Azure 入口網站的 [儲存體帳戶] 頁面上,於 [Blob 服務] 區段中,確定已停用下列設定。
- 階層式命名空間
- Blob 虛刪除
- 版本控制
建立 Azure 記憶體和 Blob 容器
在本快速入門中,您會使用 Azure 記憶體(特別是 Blob 記憶體)作為檢查點存放區。 檢查點是事件處理器標記或認可數據分割中最後一個成功處理事件位置的程式。 標記檢查點通常是在處理事件的函式內完成。 若要深入瞭解檢查點,請參閱 事件處理器。
請遵循這些步驟來建立 Azure 儲存體帳戶。
- 建立 Azure 儲存體帳戶
- 建立 Blob 容器
- 對 Blob 容器進行驗證
在本機開發時,請確定存取 Blob 資料的使用者帳戶具有正確的權限。 您需要儲存體 Blob 資料參與者才能讀取和寫入 Blob 資料。 若要指派此角色給您自己,您需要被指派使用者存取管理員角色,或另一個包含 Microsoft.Authorization/roleAssignments/write 動作的角色。 您可以使用 Azure 入口網站、Azure CLI 或 Azure PowerShell,將 Azure RBAC 角色指派給使用者。 您可以在範圍概觀頁面上深入了解角色指派的可用範圍。
在此案例中,您會將權限指派給使用者帳戶 (以儲存體帳戶為範圍),以遵循最低權限原則。 此做法只為使用者提供所需的最低權限,並建立更安全的實際執行環境。
下列範例將儲存體 Blob 資料參與者角色指派給使用者帳戶,以針對儲存體帳戶中的 Blob 資料提供讀取和寫入存取權。
這很重要
在大部分情況下,角色指派在 Azure 中傳播只需要一兩分鐘,但在罕見情況下,可能需要長達八分鐘。 如果您第一次執行程式碼時收到驗證錯誤,請稍候片刻再試一次。
在 Azure 入口網站中,使用主要搜尋列或左側導覽找出您的儲存體帳戶。
在儲存體帳戶概觀頁面上,從左側功能表中選取 [存取控制 (IAM)]。
在 [存取控制 (IAM)] 頁面上,選取 [角色指派] 索引標籤。
從頂端功能表選取 [+ 新增],然後從產生的下拉功能表中選取 [新增角色指派]。
使用搜尋方塊,從結果篩選出所需的角色。 在此範例中,搜尋「儲存體 Blob 資料參與者」,選取相符的結果,然後選擇 [下一步]。
在 [存取權指派對象為] 下,選取 [使用者、群組或服務主體],然後選擇 [+ 選取成員]。
在對話方塊中,搜尋 Microsoft Entra 使用者名稱 (通常是您的 user@domain 電子郵件地址),然後在對話方塊底部選擇 [選取]。
選取 [檢閱 + 指派] 以移至最終頁面,然後再次選取 [檢閱 + 指派] 以完成此程序。
將事件中樞連結庫新增至 Java 專案
在 pom.xml 檔案中新增下列相依性。
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.20.2</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.20.6</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.16.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
在 Java 檔案頂端新增下列
import
語句。import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.*; import com.azure.storage.blob.*; import java.util.function.Consumer; import com.azure.identity.*;
建立名為
Receiver
的類別,並將下列字串變數新增至 類別。 使用正確值替換佔位符。這很重要
將佔位符替換為正確的值。
- 以事件中樞命名空間名稱取代
<NAMESPACE NAME>
。 -
<EVENT HUB NAME>
使用命名空間中的事件中樞名稱。
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net"; private static final String eventHubName = "<EVENT HUB NAME>";
- 以事件中樞命名空間名稱取代
將下列
main
方法新增至 類別。這很重要
用正確的值取代佔位符。
- 將
<STORAGE ACCOUNT NAME>
取代為 Azure 儲存帳戶的名稱。 -
<CONTAINER NAME>
記憶體帳戶中 Blob 容器的名稱
// create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD) .build(); // Create a blob container client that you use later to build an event processor client to receive and process events BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .credential(credential) .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net") .containerName("<CONTAINER NAME>") .buildAsyncClient(); // Create an event processor client to receive and process events and errors. EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder() .fullyQualifiedNamespace(namespaceName) .eventHubName(eventHubName) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) .credential(credential) .buildEventProcessorClient(); System.out.println("Starting event processor"); eventProcessorClient.start(); System.out.println("Press enter to stop."); System.in.read(); System.out.println("Stopping event processor"); eventProcessorClient.stop(); System.out.println("Event processor stopped."); System.out.println("Exiting process");
- 將
將處理事件和錯誤的兩個協助程式方法 (
PARTITION_PROCESSOR
和ERROR_HANDLER
) 新增至Receiver
類別。public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> { System.out.printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); };
建置程式,並確定沒有任何錯誤。
執行應用程式
請先執行 接收者 應用程式。
然後,執行 Sender 應用程式。
在 [ 接收者 應用程式] 視窗中,確認您看到傳送者應用程式所發行的事件。
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar
在接收者應用程式視窗中按 ENTER 以停止應用程式。
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar Stopping event processor Event processor stopped. Exiting process
相關內容
請參閱 GitHub 上的下列範例: