Бөлісу құралы:


Подключение Apache Flink® к HDInsight в AKS с помощью Центров событий Azure для Apache Kafka®

Важный

Azure HDInsight на AKS было прекращено 31 января 2025 г. Узнайте больше из этого объявления.

Необходимо перенести рабочие нагрузки в Microsoft Fabric или эквивалентный продукт Azure, чтобы избежать резкого завершения рабочих нагрузок.

Важный

Эта функция сейчас доступна в предварительной версии. Дополнительные условия использования для предварительных версий Microsoft Azure включают дополнительные юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в Azure HDInsight в предварительной версии AKS. Для вопросов или предложений по функциональности отправьте запрос на AskHDInsight, включив подробности, и подписывайтесь на обновления в Azure HDInsight Community.

Хорошо известный вариант использования Apache Flink — это stream analytics. Популярный выбор среди многих пользователей — использовать потоки данных, поступающие через Apache Kafka. Типичные инсталляции Flink и Kafka начинаются отправкой потоков событий в Kafka, после чего они могут использоваться заданиями Flink. Центры событий Azure предоставляют конечную точку Apache Kafka в концентраторе событий, которая позволяет пользователям подключаться к концентратору событий с помощью протокола Kafka.

В этой статье мы рассмотрим, как подключить Центры событий Azure к Apache Flink на HDInsight на AKS и рассмотрим следующие аспекты.

  • Создание пространства имен Центров событий
  • Создание HDInsight в кластере AKS с помощью Apache Flink
  • Запуск производителя Flink
  • Jar-пакет для Apache Flink
  • Проверка & подачи задания

Создание пространства имен и центров событий в Event Hubs

  1. Сведения о создании пространства имен Центров событий и Центров событий см. здесь

    снимок экрана: настройка Центров событий.

  1. Используя существующий HDInsight в пуле кластеров AKS, вы можете создать кластер Flink

  2. Запустите производитель Flink, добавив bootstrap.servers и сведения producer.config

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Замените {YOUR.EVENTHUBS.CONNECTION.STRING} строкой подключения для пространства имен Event Hubs. Инструкции по получению строки подключения см. в сведениях о том, как получить строку подключения Центров событий.

    Например

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Пакет com.example.app

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Добавьте фрагмент кода для запуска Flink Producer.

    снимок экрана, показывающий, как протестировать Flink в Центрах событий.

  3. После выполнения кода события хранятся в разделе "раздел1"

    снимок экрана, показывающий Event Hubs, хранящиеся в теме.

Ссылка