Подключение Apache Flink® в HDInsight в AKS с Центры событий Azure для Apache Kafka®

Важно!

Эта функция в настоящее время доступна для предварительного ознакомления. Дополнительные условия использования для предварительных версий Microsoft Azure включают более юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в статье Azure HDInsight в предварительной версии AKS. Для вопросов или предложений функций отправьте запрос на AskHDInsight с подробными сведениями и следуйте за нами для получения дополнительных обновлений в сообществе Azure HDInsight.

Хорошо известный вариант использования Apache Flink — это stream analytics. Популярный выбор многих пользователей для использования потоков данных, которые используются с помощью Apache Kafka. Типичные установки Flink и Kafka начинаются с потоков событий, отправляемых в Kafka, которые могут использоваться заданиями Flink. Центры событий Azure предоставляет конечную точку Apache Kafka в концентраторе событий, которая позволяет пользователям подключаться к концентратору событий с помощью протокола Kafka.

В этой статье мы рассмотрим, как подключить Центры событий Azure с Apache Flink в HDInsight в AKS и рассмотрим следующее.

  • Создание пространства имен в Центрах событий Azure
  • Создание HDInsight в кластере AKS с помощью Apache Flink
  • Запуск производителя Flink
  • Jar-пакет для Apache Flink
  • Отправка заданий и проверка

Создание пространства имен Центров событий и Центров событий

  1. Сведения о создании пространства имен Центров событий и Центров событий см . здесь

    Снимок экрана: настройка Центров событий.

  1. С помощью существующего кластера HDInsight в пуле кластеров AKS можно создать кластер Flink

  2. Запустите производитель Flink, добавив bootstrap.servers и producer.config сведения

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Замените {YOUR.EVENTHUBS.CONNECTION.STRING} строками подключения для вашего пространства имен Центров событий. Инструкции по получению строка подключения см. в сведениях о том, как получить строка подключения Центров событий.

    Например,

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Com.example.app пакета;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Добавьте фрагмент кода для запуска Flink Producer.

    Снимок экрана: проверка Flink в Центрах событий.

  3. После выполнения кода события хранятся в разделе "тема1 "

    Снимок экрана: центры событий, хранящиеся в разделе.

Справочные материалы