Partager via


Connecter Apache Flink® sur HDInsight sur AKS avec Azure Event Hubs pour Apache Kafka®

Important

Cette fonctionnalité est disponible actuellement en mode Aperçu. L’Avenant aux conditions d’utilisation pour les préversions de Microsoft Azure contient des conditions légales en plus qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou pas encore en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez informations sur Azure HDInsight sur AKS en préversion. Pour toute question ou tout envoi de suggestions sur la fonctionnalité, veuillez soumettre une demande sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur la Communauté Azure HDInsight.

Un cas d’usage bien connu d’Apache Flink est l’analyse de flux. Le choix populaire de nombreux utilisateurs d’utiliser les flux de données ingérés à l’aide d’Apache Kafka. Les installations typiques de Flink et Kafka commencent par le transfert de flux d’événements vers Kafka, qui peuvent être consommés par les tâches Flink. Azure Event Hubs fournit un point de terminaison Apache Kafka sur un hub d’événements, qui permet aux utilisateurs de se connecter au hub d’événements à l’aide du protocole Kafka.

Dans cet article, nous allons découvrir comment connecter Azure Event Hubs avec Apache Flink sur HDInsight sur AKS et couvrir les éléments suivants

  • Créer un espace de noms Event Hubs
  • Créer un cluster HDInsight sur AKS avec Apache Flink
  • Exécuter le producteur de Flink
  • Empaqueter un fichier Jar pour Apache Flink
  • Soumission de tâche et validation

Créer un espace de noms Event Hubs et des Event Hubs

  1. Pour créer un espace de noms Event Hubs et des Event Hubs, consultez cet article

    Capture d’écran montrant la configuration d’Event Hubs.

  1. À l’aide du pool de clusters HDInsight existant sur AKS, vous pouvez créer un cluster Flink.

  2. Exécutez le producteur Flink en ajoutant les bootstrap.servers et les informations producer.config.

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Remplacez {YOUR.EVENTHUBS.CONNECTION.STRING} par la chaîne de connexion de votre espace de noms Event Hubs. Pour savoir comment obtenir la chaîne de connexion, consultez les détails sur la façon d’obtenir une chaîne de connexion Event Hubs.

    Par exemple,

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Empaqueter com.example.app ;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Ajoutez l’extrait de code pour exécuter le producteur de Flink.

    Capture d’écran montrant comment tester Flink dans Event Hubs.

  3. Une fois le code exécuté, les événements sont stockés dans la rubrique "topic1"

    Capture d’écran montrant Event Hubs stocké dans la rubrique.

Référence