Подключение Apache Flink® к HDInsight в AKS с помощью Центров событий Azure для Apache Kafka®
Важный
Azure HDInsight на AKS было прекращено 31 января 2025 г. Узнайте больше из этого объявления.
Необходимо перенести рабочие нагрузки в Microsoft Fabric или эквивалентный продукт Azure, чтобы избежать резкого завершения рабочих нагрузок.
Важный
Эта функция сейчас доступна в предварительной версии. Дополнительные условия использования для предварительных версий Microsoft Azure включают дополнительные юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в Azure HDInsight в предварительной версии AKS. Для вопросов или предложений по функциональности отправьте запрос на AskHDInsight, включив подробности, и подписывайтесь на обновления в Azure HDInsight Community.
Хорошо известный вариант использования Apache Flink — это stream analytics. Популярный выбор среди многих пользователей — использовать потоки данных, поступающие через Apache Kafka. Типичные инсталляции Flink и Kafka начинаются отправкой потоков событий в Kafka, после чего они могут использоваться заданиями Flink. Центры событий Azure предоставляют конечную точку Apache Kafka в концентраторе событий, которая позволяет пользователям подключаться к концентратору событий с помощью протокола Kafka.
В этой статье мы рассмотрим, как подключить Центры событий Azure к Apache Flink на HDInsight на AKS и рассмотрим следующие аспекты.
- Создание пространства имен Центров событий
- Создание HDInsight в кластере AKS с помощью Apache Flink
- Запуск производителя Flink
- Jar-пакет для Apache Flink
- Проверка & подачи задания
Создание пространства имен и центров событий в Event Hubs
Сведения о создании пространства имен Центров событий и Центров событий см. здесь
Настройка кластера Flink на HDInsight в AKS
Используя существующий HDInsight в пуле кластеров AKS, вы можете создать кластер Flink
Запустите производитель Flink, добавив bootstrap.servers и сведения
producer.config
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Замените
{YOUR.EVENTHUBS.CONNECTION.STRING}
строкой подключения для пространства имен Event Hubs. Инструкции по получению строки подключения см. в сведениях о том, как получить строку подключения Центров событий.Например
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Упаковка JAR-файла для Flink
Пакет com.example.app
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Добавьте фрагмент кода для запуска Flink Producer.
После выполнения кода события хранятся в разделе "раздел1"
Ссылка
- веб-сайт Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink и связанные имена проектов с открытым кодом являются товарными знакамиApache Software Foundation (ASF).