Megosztás a következőn keresztül:


Csatlakozás Apache Flink® a HDInsighton az AKS-en az Azure Event Hubs for Apache Kafkával®

Fontos

Ez a szolgáltatás jelenleg előzetes kiadásban elérhető. A Microsoft Azure Előzetes verzió kiegészítő használati feltételei további jogi feltételeket tartalmaznak, amelyek a bétaverzióban, előzetes verzióban vagy egyébként még nem általánosan elérhető Azure-funkciókra vonatkoznak. Erről az adott előzetes verzióról az Azure HDInsight az AKS előzetes verziójában tájékozódhat. Ha kérdése vagy funkciójavaslata van, küldjön egy kérést az AskHDInsightban a részletekkel együtt, és kövessen minket további frissítésekért az Azure HDInsight-közösségről.

Az Apache Flink jól ismert használati esete a streamelemzés. Sok felhasználó népszerű választása az Apache Kafka használatával betöltött adatfolyamok használatára. Az Flink és a Kafka tipikus telepítései az eseménystreamek Kafkába való leküldésével kezdődnek, amelyeket a Flink-feladatok felhasználhatnak. Az Azure Event Hubs egy Apache Kafka-végpontot biztosít egy eseményközponton, amely lehetővé teszi a felhasználók számára, hogy a Kafka protokoll használatával csatlakozzanak az eseményközponthoz.

Ebből a cikkből megtudhatja, hogyan csatlakoztathatja az Azure Event Hubsot az Apache Flinkhez a HDInsighton az AKS-en , és az alábbiakat ismertetjük

  • Event Hubs-névtér létrehozása
  • HDInsight létrehozása AKS-fürtön Apache Flink használatával
  • Flink-gyártó futtatása
  • Package Jar for Apache Flink
  • Feladatbeküldés & érvényesítés

Event Hubs-névtér és Event Hubs létrehozása

  1. Az Event Hubs-névtér és az Event Hubs létrehozásához lásd itt

    Képernyőkép az Event Hubs beállításról.

  1. A meglévő HDInsight AKS-fürtkészleten való használatával Flink-fürtöt hozhat létre

  2. Futtassa az Flink-gyártót a bootstrap.servers és az producer.config információk hozzáadásával

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Cserélje le {YOUR.EVENTHUBS.CONNECTION.STRING} az Event Hubs-névtér kapcsolati sztring. Az kapcsolati sztring beszerzésére vonatkozó utasításokért tekintse meg az Event Hubs kapcsolati sztring beszerzésének részleteit.

    Például:

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Csomag com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Adja hozzá a kódrészletet a Flink Producer futtatásához.

    Képernyőkép az Flink Event Hubsban való teszteléséről.

  3. A kód végrehajtása után az események a "topic1" témakörben lesznek tárolva

    Képernyőkép a témakörben tárolt Event Hubsról.

Referencia