Dela via


Anslut Apache Flink® på HDInsight på AKS med Azure Event Hubs för Apache Kafka®

Viktigt!

Den här funktionen finns i förhandsgranskning. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts i allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. Om du vill ha frågor eller funktionsförslag skickar du en begäran på AskHDInsight med informationen och följer oss för fler uppdateringar i Azure HDInsight Community.

Ett välkänt användningsfall för Apache Flink är stream analytics. Det populära valet av många användare att använda dataströmmarna, som matas in med Apache Kafka. Typiska installationer av Flink och Kafka börjar med händelseströmmar som skickas till Kafka, som kan användas av Flink-jobb. Azure Event Hubs tillhandahåller en Apache Kafka-slutpunkt på en händelsehubb som gör det möjligt för användare att ansluta till händelsehubben med hjälp av Kafka-protokollet.

I den här artikeln utforskar vi hur du ansluter Azure Event Hubs med Apache Flink i HDInsight på AKS och beskriver följande

  • Skapa ett Event Hubs-namnområde
  • Skapa en HDInsight på AKS-kluster med Apache Flink
  • Kör Flink-producent
  • Paketburk för Apache Flink
  • Jobböverföring och validering

Skapa Event Hubs-namnrymd och Event Hubs

  1. Information om hur du skapar Event Hubs-namnrymd och Event Hubs finns här

    Skärmbild som visar installation av Event Hubs.

  1. Med hjälp av befintlig HDInsight i AKS-klusterpoolen kan du skapa ett Flink-kluster

  2. Kör Flink-producenten och lägg till bootstrap.servers och informationen producer.config

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Ersätt {YOUR.EVENTHUBS.CONNECTION.STRING} med anslutningssträng för Event Hubs-namnområdet. Anvisningar om hur du hämtar anslutningssträng finns i information om hur du hämtar en Event Hubs-anslutningssträng.

    Exempel:

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Paket com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Lägg till kodfragmentet för att köra Flink-producenten.

    Skärmbild som visar hur du testar Flink i Event Hubs.

  3. När koden har körts lagras händelserna i ämnet "topic1"

    Skärmbild som visar Event Hubs som lagras i ämnet.

Referens

  • Apache Flink-webbplats
  • Apache, Apache Kafka, Kafka, Apache Flink, Flink och associerade öppen källkod projektnamn är varumärkensom tillhör Apache Software Foundation (ASF).