Bagikan melalui


Koneksi Apache Flink® di HDInsight di AKS dengan Azure Event Hubs untuk Apache Kafka®

Penting

Fitur ini masih dalam mode pratinjau. Ketentuan Penggunaan Tambahan untuk Pratinjau Microsoft Azure mencakup lebih banyak persyaratan hukum yang berlaku untuk fitur Azure yang dalam versi beta, dalam pratinjau, atau belum dirilis ke ketersediaan umum. Untuk informasi tentang pratinjau khusus ini, lihat Azure HDInsight pada informasi pratinjau AKS. Untuk pertanyaan atau saran fitur, kirimkan permintaan di AskHDInsight dengan detail dan ikuti kami untuk pembaruan lebih lanjut di Komunitas Azure HDInsight.

Kasus penggunaan terkenal untuk Apache Flink adalah analisis aliran. Pilihan populer oleh banyak pengguna untuk menggunakan aliran data, yang diserap menggunakan Apache Kafka. Penginstalan umum Flink dan Kafka dimulai dengan aliran peristiwa yang didorong ke Kafka, yang dapat dikonsumsi oleh pekerjaan Flink. Azure Event Hubs menyediakan titik akhir Apache Kafka di hub peristiwa, yang memungkinkan pengguna untuk terhubung ke pusat aktivitas menggunakan protokol Kafka.

Dalam artikel ini, kami menjelajahi cara menyambungkan Azure Event Hubs dengan Apache Flink di HDInsight di AKS dan membahas hal-hal berikut

  • Membuat namespace layanan Pusat Aktivitas
  • Membuat HDInsight pada Kluster AKS dengan Apache Flink
  • Jalankan produser Flink
  • Paket Jar untuk Apache Flink
  • Pengajuan Pekerjaan & Validasi

Membuat namespace layanan Azure Event Hubs dan Azure Event Hubs

  1. Untuk membuat namespace layanan Azure Event Hubs dan Azure Event Hubs, lihat di sini

    Cuplikan layar memperlihatkan penyiapan Azure Event Hubs.

  1. Menggunakan HDInsight yang ada pada kumpulan Kluster AKS, Anda dapat membuat kluster Flink

  2. Jalankan produsen Flink yang menambahkan bootstrap.servers dan producer.config info

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Ganti {YOUR.EVENTHUBS.CONNECTION.STRING} dengan string koneksi ke ruang nama Azure Event Hubs. Untuk petunjuk tentang mendapatkan string koneksi, lihat detail tentang cara mendapatkan string koneksi Azure Event Hubs.

    Contohnya,

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Com.example.app paket;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Tambahkan cuplikan untuk menjalankan Flink Producer.

    Cuplikan layar memperlihatkan cara menguji Flink di Azure Event Hubs.

  3. Setelah kode dijalankan, peristiwa disimpan dalam topik "topik1"

    Cuplikan layar memperlihatkan Azure Event Hubs yang disimpan dalam topik.

Referensi