Condividi tramite


Connessione Apache Flink® in HDInsight nel servizio Azure Kubernetes con Hub eventi di Azure per Apache Kafka®

Importante

Questa funzionalità è attualmente disponibile solo in anteprima. Le condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure includono termini legali più validi applicabili alle funzionalità di Azure disponibili in versione beta, in anteprima o non ancora rilasciate nella disponibilità generale. Per informazioni su questa anteprima specifica, vedere Informazioni sull'anteprima di Azure HDInsight nel servizio Azure Kubernetes. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguire microsoft per altri aggiornamenti nella community di Azure HDInsight.

Un caso d'uso noto per Apache Flink è l'analisi di flusso. Scelta comune da parte di molti utenti per l'uso dei flussi di dati, inseriti con Apache Kafka. Le installazioni tipiche di Flink e Kafka iniziano con il push dei flussi di eventi in Kafka, che possono essere utilizzati dai processi Flink. Hub eventi di Azure fornisce un endpoint Apache Kafka in un hub eventi, che consente agli utenti di connettersi all'hub eventi usando il protocollo Kafka.

Questo articolo illustra come connettersi Hub eventi di Azure con Apache Flink in HDInsight nel servizio Azure Kubernetes e illustra quanto segue

  • Creare uno spazio dei nomi di Hub eventi
  • Creare un cluster HDInsight nel servizio Azure Kubernetes con Apache Flink
  • Eseguire il producer Flink
  • Jar del pacchetto per Apache Flink
  • Invio e convalida dei processi

Creare uno spazio dei nomi di Hub eventi e Hub eventi

  1. Per creare uno spazio dei nomi di Hub eventi e Hub eventi, vedere qui

    Screenshot che mostra la configurazione di Hub eventi.

  1. Usando HDInsight esistente nel pool di cluster del servizio Azure Kubernetes, è possibile creare un cluster Flink

  2. Eseguire il producer Flink aggiungendo bootstrap.servers e le producer.config informazioni

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Sostituire {YOUR.EVENTHUBS.CONNECTION.STRING} con la stringa di connessione per lo spazio dei nomi di Hub eventi. Per istruzioni su come ottenere il stringa di connessione, vedere i dettagli su come ottenere un stringa di connessione di Hub eventi.

    ad esempio:

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Com.example.app pacchetto;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Aggiungere il frammento di codice per eseguire Flink Producer.

    Screenshot che mostra come testare Flink in Hub eventi.

  3. Dopo l'esecuzione del codice, gli eventi vengono archiviati nell'argomento "topic1"

    Screenshot che mostra Gli hub eventi archiviati nell'argomento.

Riferimento