Использование Java для отправки и получения событий в Центрах событий Azure
В этом кратком руководстве показано, как отправлять события в концентратор событий и получать события из него с помощью пакета Java azure-messaging-eventhubs.
Совет
Если вы работаете с ресурсами Центры событий Azure в приложении Spring, рекомендуется использовать Spring Cloud Azure в качестве альтернативы. Spring Cloud Azure — это проект с открытым исходным кодом, который обеспечивает простую интеграцию Spring со службами Azure. Дополнительные сведения о Spring Cloud Azure и примерах использования Центров событий см. в статье Spring Cloud Stream с Центры событий Azure.
Необходимые компоненты
Если вы впервые используете Центры событий Azure, ознакомьтесь с общими сведениями, прежде чем приступить к работе с этим руководством.
Для работы с данным руководством необходимо следующее:
- Подписка Microsoft Azure. Чтобы использовать службы Azure, в том числе Центры событий Azure, потребуется действующая подписка. Если у вас еще нет учетной записи Azure, зарегистрируйтесь для работы с бесплатной пробной версией или активируйте преимущества для подписчиков MSDN при создании учетной записи.
- Среда разработки Java. В рамках этого краткого руководства используется Eclipse. Необходим комплект SDK Java (JDK) версии 8 или более поздней.
- Создайте пространство имен Центров событий и концентратор событий. Первым шагом является использование портала Azure для создания пространства имен типа Центров событий и получение учетных данных управления, необходимых приложению для взаимодействия с концентратором событий. Чтобы создать пространство имен и концентратор событий, выполните инструкции из этой статьи. Получите строку подключения для пространства имен Центров событий, следуя инструкциям из статьи Получение строки подключения. Строка подключения понадобится нам позже в рамках этого краткого руководства.
Отправка событий
Из этого раздела вы узнаете, как создать приложение Java, которое отправляет события в концентратор событий.
Добавление ссылки на библиотеку Центров событий Azure
Сначала создайте проект Maven для консольного или оболочкного приложения в любимой среде разработки Java. pom.xml
Обновите файл следующим образом. Клиентская библиотека Java для Центров событий доступна в центральном репозитории Maven.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.18.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.11.2</version>
<scope>compile</scope>
</dependency>
Примечание.
Обновите версию до последней, опубликованной в репозитории Maven.
Проверка подлинности приложения в Azure
В этом кратком руководстве показано два способа подключения к Центры событий Azure: без пароля и строка подключения. Первый вариант показывает, как использовать субъект безопасности в идентификаторе Microsoft Entra ID и управлении доступом на основе ролей (RBAC) для подключения к пространству имен Центров событий. Вам не нужно беспокоиться о наличии жестко закодированных строка подключения в коде или в файле конфигурации или в безопасном хранилище, например Azure Key Vault. Второй вариант показывает, как использовать строка подключения для подключения к пространству имен Центров событий. Если вы не знакомы с Azure, вы можете найти вариант строка подключения проще следовать. Мы рекомендуем использовать параметр без пароля в реальных приложениях и рабочих средах. Дополнительные сведения см. в разделе "Проверка подлинности и авторизация". Дополнительные сведения о проверке подлинности без пароля см. на странице обзора.
Назначение ролей пользователю Microsoft Entra
При локальной разработке убедитесь, что учетная запись пользователя, которая подключается к Центры событий Azure имеет правильные разрешения. Для отправки и получения сообщений вам потребуется роль владельца данных Центры событий Azure. Чтобы назначить себе эту роль, вам потребуется роль Администратор istrator пользователя или другую роль, которая включает Microsoft.Authorization/roleAssignments/write
действие. Роли Azure RBAC можно назначить пользователю с помощью портала Azure, Azure CLI или Azure PowerShell. Узнайте больше о доступных область для назначений ролей на странице обзора область.
В следующем примере роль назначается Azure Event Hubs Data Owner
учетной записи пользователя, которая предоставляет полный доступ к ресурсам Центры событий Azure. В реальном сценарии следуйте принципу наименьших привилегий , чтобы предоставить пользователям только минимальные разрешения, необходимые для более безопасной рабочей среды.
Встроенные роли Azure для Центров событий Azure
Для Центры событий Azure управление пространствами имен и всеми связанными ресурсами с помощью портал Azure и API управления ресурсами Azure уже защищено с помощью модели Azure RBAC. Azure предоставляет следующие встроенные роли Azure для авторизации доступа к пространству имен Центров событий:
- Центры событий Azure владелец данных: включает доступ к пространству имен Центров событий и его сущностям (очереди, разделы, подписки и фильтры)
- Центры событий Azure отправителя данных: используйте эту роль, чтобы предоставить отправителю доступ к пространству имен Центров событий и его сущностям.
- Центры событий Azure приемник данных. Используйте эту роль, чтобы предоставить получателю доступ к пространству имен Центров событий и его сущностям.
Если вы хотите создать пользовательскую роль, см. раздел "Права", необходимые для операций Центров событий.
Внимание
В большинстве случаев для распространения назначения ролей в Azure потребуется несколько минут. В редких случаях может потребоваться до восьми минут. Если при первом запуске кода возникают ошибки аутентификации, подождите несколько минут и повторите попытку.
В портал Azure найдите пространство имен Центров событий с помощью главной панели поиска или навигации слева.
На странице обзора выберите элемент управления доступом (IAM) в меню слева.
На странице Контроль доступа (IAM) откройте вкладку Назначения ролей.
Выберите + Добавить в верхнем меню, а затем выберите Добавить назначение роли в появившемся раскрывающемся меню.
Используйте поле поиска, чтобы отфильтровать результаты для отображения нужной роли. В этом примере найдите
Azure Event Hubs Data Owner
и выберите соответствующий результат. Теперь щелкните Далее.В разделе Назначение доступа для выберите Пользователь, группа или субъект-служба и + Выбрать членов.
В диалоговом окне найдите имя пользователя Microsoft Entra (обычно ваш user@domain адрес электронной почты), а затем выберите в нижней части диалогового окна.
Нажмите кнопку Проверить и назначить, чтобы перейти на последнюю страницу, а затем еще раз Проверить и назначить, чтобы завершить процесс.
Написание кода для отправки сообщений в концентратор событий
Добавьте класс с именем Sender
и добавьте в него следующий код.
Внимание
- Обновите
<NAMESPACE NAME>
имя пространства имен Центров событий. - Обновите
<EVENT HUB NAME>
имя концентратора событий.
package ehubquickstart;
import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;
import com.azure.identity.*;
public class SenderAAD {
// replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
// Example: private static final String namespaceName = "contosons.servicebus.windows.net";
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
// Replace <EVENT HUB NAME> with the name of your event hub.
// Example: private static final String eventHubName = "ordersehub";
private static final String eventHubName = "<EVENT HUB NAME>";
public static void main(String[] args) {
publishEvents();
}
/**
* Code sample for publishing events.
* @throws IllegalArgumentException if the EventData is bigger than the max batch size.
*/
public static void publishEvents() {
// create a token using the default Azure credential
DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
.authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
.build();
// create a producer client
EventHubProducerClient producer = new EventHubClientBuilder()
.fullyQualifiedNamespace(namespaceName)
.eventHubName(eventHubName)
.credential(credential)
.buildProducerClient();
// sample events in an array
List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
for (EventData eventData : allEvents) {
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
}
Скомпилируйте программу и убедитесь в отсутствии ошибок. Эту программу вы запустите после запуска программы получателя.
Получение событий
Код в этом руководстве основан на шаблоне EventProcessorClient в GitHub. Изучите его, чтобы получить полное представление о рабочем приложении.
Следуйте этим рекомендациям при использовании Хранилище BLOB-объектов Azure в качестве хранилища проверка point:
- Используйте отдельный контейнер для каждой группы потребителей. Вы можете использовать одну и ту же учетную запись хранения, но использовать один контейнер для каждой группы.
- Не используйте контейнер для других компонентов и не используйте учетную запись хранения для других действий.
- служба хранилища учетная запись должна находиться в том же регионе, в который находится развернутое приложение. Если приложение находится в локальной среде, попробуйте выбрать ближайший регион.
На странице учетной записи служба хранилища в портал Azure в разделе службы BLOB-объектов убедитесь, что следующие параметры отключены.
- Иерархическое пространство имен
- Обратимое удаление BLOB-объекта
- Управление версиями
Создание службы хранилища Azure и контейнера больших двоичных объектов
В этом кратком руководстве показано, как настроить службу хранилища Azure (Хранилище BLOB-объектов) в качестве хранилища контрольных точек. Назначение контрольных точек — это процесс, в ходе которого обработчик событий отмечает или фиксирует положение последнего успешно обработанного события в секции. Контрольная точка обычно отмечается в функции, которая обрабатывает события. См. статью Балансировка нагрузки секций между несколькими экземплярами приложения.
Выполните указанные ниже действия, чтобы создать учетную запись хранения Azure.
- Создайте учетную запись хранения Azure
- Создание контейнера больших двоичных объектов
- Проверка подлинности в контейнере BLOB-объектов
Если вы выполняете разработку локально, убедитесь, что учетная запись пользователя, через которую осуществляется доступ к данным BLOB-объектов, имеет правильные разрешения. Вам потребуется служба хранилища участник данных BLOB-объектов для чтения и записи данных BLOB-объектов. Чтобы назначить себе эту роль, вам потребуется назначить роль Администратор istrator для доступа пользователей или другую роль, включающую действие Microsoft.Authorization/roleAssignments/write. Роли Azure RBAC можно назначить пользователю с помощью портала Azure, Azure CLI или Azure PowerShell. Дополнительные сведения о доступных областях назначения ролей можно узнать на странице обзора области.
В этом сценарии вы назначите разрешения учетной записи пользователя, которая ограничена учетной записью хранения, чтобы обеспечить соблюдение принципа минимальных привилегий. В рамках этой практики пользователям предоставляются только минимальные необходимые разрешения, что позволяет создавать более защищенные рабочие среды.
В следующем примере будет назначена роль участника данных BLOB-объектов служба хранилища учетной записи пользователя, которая предоставляет доступ для чтения и записи к данным BLOB-объектов в вашей учетной записи хранения.
Внимание
В большинстве случаев для распространения назначения ролей в Azure потребуется минута или две, но в редких случаях может потребоваться до восьми минут. Если при первом запуске кода возникают ошибки аутентификации, подождите несколько минут и повторите попытку.
На портале Azure найдите свою учетную запись хранения, воспользовавшись основной панелью поиска или областью навигации слева.
На странице обзора учетной записи хранения выберите Контроль доступа (IAM) в меню слева.
На странице Контроль доступа (IAM) откройте вкладку Назначения ролей.
Выберите + Добавить в верхнем меню, а затем выберите Добавить назначение роли в появившемся раскрывающемся меню.
Используйте поле поиска, чтобы отфильтровать результаты для отображения нужной роли. В этом примере найдите участника данных BLOB-объектов хранилища и выберите соответствующий результат, а затем нажмите кнопку Далее.
В разделе Назначение доступа для выберите Пользователь, группа или субъект-служба и + Выбрать членов.
В диалоговом окне найдите имя пользователя Microsoft Entra (обычно ваш user@domain адрес электронной почты), а затем выберите в нижней части диалогового окна.
Нажмите кнопку Проверить и назначить, чтобы перейти на последнюю страницу, а затем еще раз Проверить и назначить, чтобы завершить процесс.
Добавление библиотек Центров событий в проект Java
Добавьте следующие зависимости в файл pom.xml.
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.15.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
Добавьте следующие
import
инструкции в верхней части файла Java.import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.*; import com.azure.storage.blob.*; import java.util.function.Consumer; import com.azure.identity.*;
Создайте класс с именем
Receiver
и добавьте в него следующие строковые переменные. Замените значения заполнителей своими значениями.Внимание
Замените значения заполнителей своими значениями.
<NAMESPACE NAME>
замените именем пространства имен для Центров событий;- Замените
<EVENT HUB NAME>
именем пространства имен для концентратора событий.
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net"; private static final String eventHubName = "<EVENT HUB NAME>";
Добавьте в класс следующий метод
main
.Внимание
Замените значения заполнителей своими значениями.
<STORAGE ACCOUNT NAME>
с именем учетной записи служба хранилища Azure.<CONTAINER NAME>
с именем контейнера BLOB-объектов в учетной записи хранения
// create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD) .build(); // Create a blob container client that you use later to build an event processor client to receive and process events BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .credential(credential) .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net") .containerName("<CONTAINER NAME>") .buildAsyncClient(); // Create an event processor client to receive and process events and errors. EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder() .fullyQualifiedNamespace(namespaceName) .eventHubName(eventHubName) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) .credential(credential) .buildEventProcessorClient(); System.out.println("Starting event processor"); eventProcessorClient.start(); System.out.println("Press enter to stop."); System.in.read(); System.out.println("Stopping event processor"); eventProcessorClient.stop(); System.out.println("Event processor stopped."); System.out.println("Exiting process");
Добавьте два вспомогательных метода (
PARTITION_PROCESSOR
иERROR_HANDLER
), которые обрабатывают события и ошибки в классеReceiver
.public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> { System.out.printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); };
Скомпилируйте программу и убедитесь в отсутствии ошибок.
Запуск приложений
Сначала запустите приложение получателя.
Затем запустите приложение отправителя.
Убедитесь, что в окне приложения получателя отображаются события, опубликованные приложением отправителя.
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar
Нажмите ENTER в окне приложения получателя, чтобы прервать работу приложения.
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar Stopping event processor Event processor stopped. Exiting process
Следующие шаги
См. следующие примеры на сайте GitHub: