Ansluta Apache Flink® på HDInsight på AKS med Azure Event Hubs för Apache Kafka®
Kommentar
Vi drar tillbaka Azure HDInsight på AKS den 31 januari 2025. Före den 31 januari 2025 måste du migrera dina arbetsbelastningar till Microsoft Fabric eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar. Återstående kluster i din prenumeration stoppas och tas bort från värden.
Endast grundläggande stöd kommer att vara tillgängligt fram till datumet för pensionering.
Viktigt!
Den här funktionen finns i förhandsgranskning. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts i allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. Om du vill ha frågor eller funktionsförslag skickar du en begäran på AskHDInsight med informationen och följer oss för fler uppdateringar i Azure HDInsight Community.
Ett välkänt användningsfall för Apache Flink är stream analytics. Det populära valet av många användare att använda dataströmmarna, som matas in med Apache Kafka. Typiska installationer av Flink och Kafka börjar med händelseströmmar som skickas till Kafka, som kan användas av Flink-jobb. Azure Event Hubs tillhandahåller en Apache Kafka-slutpunkt på en händelsehubb som gör det möjligt för användare att ansluta till händelsehubben med hjälp av Kafka-protokollet.
I den här artikeln utforskar vi hur du ansluter Azure Event Hubs med Apache Flink i HDInsight på AKS och beskriver följande
- Skapa ett Event Hubs-namnområde
- Skapa en HDInsight på AKS-kluster med Apache Flink
- Kör Flink-producent
- Paketburk för Apache Flink
- Jobböverföring och validering
Skapa Event Hubs-namnrymd och Event Hubs
Information om hur du skapar Event Hubs-namnrymd och Event Hubs finns här
Konfigurera Flink-kluster i HDInsight på AKS
Med hjälp av befintlig HDInsight i AKS-klusterpoolen kan du skapa ett Flink-kluster
Kör Flink-producenten och lägg till bootstrap.servers och informationen
producer.config
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Ersätt
{YOUR.EVENTHUBS.CONNECTION.STRING}
med anslutningssträng för Event Hubs-namnområdet. Anvisningar om hur du hämtar anslutningssträng finns i information om hur du hämtar en Event Hubs-anslutningssträng.Ett exempel:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Paketera JAR för Flink
Paket com.example.app;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Lägg till kodfragmentet för att köra Flink-producenten.
När koden har körts lagras händelserna i ämnet "topic1"
Referens
- Apache Flink-webbplats
- Apache, Apache Kafka, Kafka, Apache Flink, Flink och associerade öppen källkod projektnamn är varumärken som tillhör Apache Software Foundation (ASF).