Руководство по Обработка событий Центров событий в Apache Kafka с использованием Stream Analytics
В этой статье показано, как выполняется потоковая передача данных в Центры событий и обработка этих данных с помощью Azure Stream Analytics. Здесь подробно описаны следующие действия:
- Создайте пространство имен Центров событий.
- создание клиента Kafka, который отправляет сообщения в концентратор событий;
- создание задания Stream Analytics, которое копирует данные из концентратора событий в хранилище BLOB-объектов Azure.
Вам не обязательно изменять протокол клиентов или запускать собственные кластеры при использовании конечной точки Kafka, предоставленной концентратором событий. Центры событий Azure поддерживают Apache Kafka 1.0. и более поздних версий.
Предварительные требования
Ниже указаны требования для работы с этим кратким руководством.
- Подписка Azure. Если у вас еще нет подписки Azure, создайте бесплатную учетную запись, прежде чем начать работу.
- Комплект разработчика Java (JDK) 1.7+.
- Скачайте и установите двоичный архив Maven.
- Git;
- Учетная запись хранения Azure. Если ее у вас нет, то, прежде чем продолжить, создайте учетную запись хранения Azure. В этом пошаговом руководстве задание Stream Analytics состоит в сохранении выходных данных в хранилище BLOB-объектов Azure.
Создание пространства имен в Центрах событий
Если вы создаете пространство имен в Центрах событий, конечная точка Kafka для этого пространства имен включается автоматически. Вы можете выполнять потоковую передачу событий из приложений, использующих протокол Kafka, в концентраторы событий. Выполните пошаговые инструкции в статье Создание концентратора событий с помощью портала Azure, чтобы создать пространство имен в Центрах событий. Если вы используете выделенный кластер, ознакомьтесь с разделом Создание пространства имен и концентратора событий в выделенном кластере.
Примечание
Центры событий для Kafka не поддерживаются на уровне Базовый.
Отправка сообщений с использованием Kafka в Центрах событий
Клонируйте репозиторий Центров событий Azure для Kafka на компьютер.
Перейдите в папку
azure-event-hubs-for-kafka/quickstart/java/producer
.Обновите сведения о конфигурации для отправителя в файле по адресу
src/main/resources/producer.config
. Укажите имя и строку подключения для пространства имен концентратора событий.bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
Перейдите в
azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/
и откройте файл TestDataReporter.java в любом редакторе по своему усмотрению.Закомментируйте следующую строку кода.
//final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
Добавьте следующую строку кода вместо закомментированного кода.
final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");
Этот код отправляет данные события в формате JSON. При настройке входных данных для задания Stream Analytics необходимо указать JSON как формат входных данных.
Выполните код отправителя и потоковую передачу данных в Центры событий. При использовании командной строки Node.js перед выполнением этих команд на компьютере Windows переключитесь на папку
azure-event-hubs-for-kafka/quickstart/java/producer
.mvn clean package mvn exec:java -Dexec.mainClass="TestProducer"
Проверка получения данных концентратором событий
Выберите Центры событий в разделе Сущности. Убедитесь, что вы видите концентратор событий с именем test.
Убедитесь, что вы видите сообщения, входящие в концентратор событий.
Обработка данных событий с помощью задания Stream Analytics
В этом разделе будет создано задание Azure Stream Analytics. Клиент Kafka отправит события в концентратор событий. Вы создадите задание Stream Analytics, которое принимает данные о событиях в качестве входных данных и выводит их в хранилище BLOB-объектов Azure. Если у вас нет учетной записи службы хранилища Azure, то потребуется ее создать.
Запрос в задании Stream Analytics проходит через данные без выполнения какой-либо аналитики. Можно создать запрос, преобразующий входные данные в выходные данные в другом формате или с полученными аналитическими сведениями.
Создание задания Stream Analytics
- Выберите + Создать ресурс на портале Azure.
- Выберите Analytics в меню Azure Marketplace, и затем выберите задание Stream Analytics.
- На странице Новое задание Stream Analytics выполните следующие действия.
Введите имя для задания.
Выберите свою подписку.
Выберите Создать новую для группы ресурсов и введите имя. Вы также можете использовать существующую группу ресурсов.
Выберите расположение для задания.
Щелкните Создать, чтобы создать задание.
Настройка входных данных для задания
В сообщении уведомления, щелкните Перейти к ресурсу, чтобы открыть страницу задания Stream Analytics.
Выберите Входные данные в разделе Топология задания в меню слева.
Выберите Добавить потоковый вход, а затем выберите Концентратор событий.
На странице конфигурации входных данных концентратора событий выполните следующие действия.
Укажите псевдоним для входных данных.
Выберите свою подписку Azure.
Выберите пространство имен концентратора событий созданное ранее.
Выберите тестирование для концентратора событий.
Щелкните Сохранить.
Настройка выходных данных для задания
- Выберите Выходные данные в меню раздела Топология задания.
- Выберите + Добавить на панели инструментов и выберите Хранилище BLOB-объектов
- На странице параметров выходных данных хранилища BLOB-объектов выполните следующие действия.
Укажите псевдоним для выходных данных.
Выберите свою подписку Azure.
Выберите свою учетную запись службы хранилища Azure.
Введите имя для контейнера, в котором хранятся выходные данные из запроса Stream Analytics.
Щелкните Сохранить.
Определение запроса
Настроив задание Stream Analytics для считывания входящего потока данных, необходимо создать преобразование, которое анализирует данные в реальном времени. Определите запрос преобразования с помощью языка запросов Stream Analytics. В этом пошаговом руководстве необходимо определить запрос, который проходит через данные без выполнения какого-либо преобразования.
Выберите Запрос.
В окне запроса замените
[YourOutputAlias]
псевдонимом выходных данных, созданным ранее.Замените
[YourInputAlias]
псевдонимом входных данных, созданным ранее.На панели инструментов щелкните Сохранить.
Выполнение задания Stream Analytics
В меню слева выберите Обзор.
Щелкните Запуск.
На странице Запуск задания выберите Запуск.
Подождите, пока состояние задания не изменится с запуска на выполняется.
Тестирование сценария
Запустите еще раз отправитель Kafka, чтобы отправить события в концентратор событий.
mvn exec:java -Dexec.mainClass="TestProducer"
Убедитесь, что вы видите выходные данные сгенерированные в хранилище BLOB-объектов Azure. Вы увидите JSON-файл в контейнере с сотней строк, которые выглядят следующим образом.
{"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
Задание Azure Stream Analytics получило входные данные из концентратора событий и в этом сценарии сохранило их в хранилище BLOB-объектов Azure.
Дальнейшие действия
Из этой статьи вы узнали, как выполнять потоковую передачу данных в Центры событий без необходимости менять клиенты протоколов или запускать собственные кластеры. Сведения о Центрах событий Azure для Apache Kafka см. в этом руководстве для разработчиков.