Condividi tramite


Connettere Apache Flink® in HDInsight su AKS con Hub eventi di Azure per Apache Kafka®

Nota

Azure HDInsight su AKS verrà ritirato il 31 gennaio 2025. Prima del 31 gennaio 2025, sarà necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare interruzioni improvvise dei carichi di lavoro. I cluster rimanenti nella sottoscrizione verranno arrestati e rimossi dall’host.

Solo il supporto di base sarà disponibile fino alla data di ritiro.

Importante

Questa funzionalità è attualmente disponibile solo in anteprima. Le Condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure includono termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale. Per informazioni su questa anteprima specifica, vedere Informazioni sull'anteprima di Azure HDInsight nel servizio Azure Kubernetes. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguire Microsoft per altri aggiornamenti nella Community di Azure HDInsight.

Un caso d'uso noto per Apache Flink è l'analisi di flusso. Scelta comune da parte di molti utenti per l'uso dei flussi di dati, inseriti con Apache Kafka. Le installazioni tipiche di Flink e Kafka iniziano con il push dei flussi di eventi in Kafka, che possono essere utilizzati dai processi Flink. Hub eventi di Azure fornisce un endpoint Apache Kafka in un hub eventi, che consente agli utenti di connettersi all'hub eventi usando il protocollo Kafka.

Questo articolo illustra come connettere hub eventi di Azure con Apache Flink in HDInsight su AKS e illustra quanto segue

  • Creare uno spazio dei nomi di Hub eventi
  • Creare un cluster HDInsight su AKS con Apache Flink
  • Eseguire il producer Flink
  • Jar del pacchetto per Apache Flink
  • Invio e convalida dei processi

Creare uno spazio dei nomi di Hub eventi e hub eventi

  1. Per creare uno spazio dei nomi di Hub eventi e hub eventi vedere qui

    Screenshot che mostra la configurazione degli hub eventi.

  1. Usando il pool di cluster HDInsight su AKS esistente, è possibile creare un cluster Flink

  2. Eseguire il produttore Flink aggiungendo il bootstrap.servers e le informazioni producer.config

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Sostituire {YOUR.EVENTHUBS.CONNECTION.STRING} con la stringa di connessione per lo spazio dei nomi di Hub eventi. Per istruzioni su come ottenere la stringa di connessione, vedere dettagli su come Ottenere una stringa di connessione ad Hub eventi.

    ad esempio:

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Pacchetto com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Aggiungere il frammento di codice per eseguire Flink Producer.

    Screenshot che mostra come testare Flink in Hub eventi.

  3. Dopo l'esecuzione del codice, gli eventi vengono archiviati nell'argomento "topic1"

    Screenshot che mostra gli hub eventi archiviati nell'argomento.

Riferimento