Koneksi Apache Flink® di HDInsight di AKS dengan Azure Event Hubs untuk Apache Kafka®
Penting
Fitur ini masih dalam mode pratinjau. Ketentuan Penggunaan Tambahan untuk Pratinjau Microsoft Azure mencakup lebih banyak persyaratan hukum yang berlaku untuk fitur Azure yang dalam versi beta, dalam pratinjau, atau belum dirilis ke ketersediaan umum. Untuk informasi tentang pratinjau khusus ini, lihat Azure HDInsight pada informasi pratinjau AKS. Untuk pertanyaan atau saran fitur, kirimkan permintaan di AskHDInsight dengan detail dan ikuti kami untuk pembaruan lebih lanjut di Komunitas Azure HDInsight.
Kasus penggunaan terkenal untuk Apache Flink adalah analisis aliran. Pilihan populer oleh banyak pengguna untuk menggunakan aliran data, yang diserap menggunakan Apache Kafka. Penginstalan umum Flink dan Kafka dimulai dengan aliran peristiwa yang didorong ke Kafka, yang dapat dikonsumsi oleh pekerjaan Flink. Azure Event Hubs menyediakan titik akhir Apache Kafka di hub peristiwa, yang memungkinkan pengguna untuk terhubung ke pusat aktivitas menggunakan protokol Kafka.
Dalam artikel ini, kami menjelajahi cara menyambungkan Azure Event Hubs dengan Apache Flink di HDInsight di AKS dan membahas hal-hal berikut
- Membuat namespace layanan Pusat Aktivitas
- Membuat HDInsight pada Kluster AKS dengan Apache Flink
- Jalankan produser Flink
- Paket Jar untuk Apache Flink
- Pengajuan Pekerjaan & Validasi
Membuat namespace layanan Azure Event Hubs dan Azure Event Hubs
Untuk membuat namespace layanan Azure Event Hubs dan Azure Event Hubs, lihat di sini
Menyiapkan Kluster Flink di HDInsight di AKS
Menggunakan HDInsight yang ada pada kumpulan Kluster AKS, Anda dapat membuat kluster Flink
Jalankan produsen Flink yang menambahkan bootstrap.servers dan
producer.config
infobootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Ganti
{YOUR.EVENTHUBS.CONNECTION.STRING}
dengan string koneksi ke ruang nama Azure Event Hubs. Untuk petunjuk tentang mendapatkan string koneksi, lihat detail tentang cara mendapatkan string koneksi Azure Event Hubs.Contohnya,
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Mengemas JAR untuk Flink
Com.example.app paket;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Tambahkan cuplikan untuk menjalankan Flink Producer.
Setelah kode dijalankan, peristiwa disimpan dalam topik "topik1"
Referensi
- Situs Web Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink, dan nama proyek sumber terbuka terkait adalah merek dagang dari Apache Software Foundation (ASF).
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk