Подключение Apache Flink® в HDInsight в AKS с Центры событий Azure для Apache Kafka®
Примечание.
Мы отставим Azure HDInsight в AKS 31 января 2025 г. До 31 января 2025 г. необходимо перенести рабочие нагрузки в Microsoft Fabric или эквивалентный продукт Azure, чтобы избежать резкого прекращения рабочих нагрузок. Оставшиеся кластеры в подписке будут остановлены и удалены из узла.
До даты выхода на пенсию будет доступна только базовая поддержка.
Внимание
Эта функция в настоящее время доступна для предварительного ознакомления. Дополнительные условия использования для предварительных версий Microsoft Azure включают более юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в статье Azure HDInsight в предварительной версии AKS. Для вопросов или предложений функций отправьте запрос на AskHDInsight с подробными сведениями и следуйте за нами для получения дополнительных обновлений в сообществе Azure HDInsight.
Хорошо известный вариант использования Apache Flink — это stream analytics. Популярный выбор многих пользователей для использования потоков данных, которые используются с помощью Apache Kafka. Типичные установки Flink и Kafka начинаются с потоков событий, отправляемых в Kafka, которые могут использоваться заданиями Flink. Центры событий Azure предоставляет конечную точку Apache Kafka в концентраторе событий, которая позволяет пользователям подключаться к концентратору событий с помощью протокола Kafka.
В этой статье мы рассмотрим, как подключить Центры событий Azure с Apache Flink в HDInsight в AKS и рассмотрим следующее.
- Создание пространства имен в Центрах событий Azure
- Создание HDInsight в кластере AKS с помощью Apache Flink
- Запуск производителя Flink
- Jar-пакет для Apache Flink
- Отправка заданий и проверка
Создание пространства имен Центров событий и Центров событий
Сведения о создании пространства имен Центров событий и Центров событий см . здесь
Настройка кластера Flink в HDInsight в AKS
С помощью существующего кластера HDInsight в пуле кластеров AKS можно создать кластер Flink
Запустите производитель Flink, добавив bootstrap.servers и
producer.config
сведенияbootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Замените
{YOUR.EVENTHUBS.CONNECTION.STRING}
строками подключения для вашего пространства имен Центров событий. Инструкции по получению строка подключения см. в сведениях о том, как получить строка подключения Центров событий.Например,
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Упаковка JAR-файла для Flink
Com.example.app пакета;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Добавьте фрагмент кода для запуска Flink Producer.
После выполнения кода события хранятся в разделе "тема1 "
Справочные материалы
- Веб-сайт Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink и связанные открытый код имена проектов являются товарными знаками Apache Software Foundation (ASF).