Łączenie platformy Apache Flink® w usłudze HDInsight w usłudze AKS przy użyciu usługi Azure Event Hubs dla platformy Apache Kafka®
Uwaga
Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.
Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.
Ważne
Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.
Dobrze znany przypadek użycia platformy Apache Flink to analiza strumienia. Popularny wybór przez wielu użytkowników do korzystania ze strumieni danych, które są pozyskiwane przy użyciu platformy Apache Kafka. Typowe instalacje Flink i Kafka zaczynają się od wypychania strumieni zdarzeń do platformy Kafka, które mogą być używane przez zadania Flink. Usługa Azure Event Hubs udostępnia punkt końcowy platformy Apache Kafka w centrum zdarzeń, który umożliwia użytkownikom łączenie się z centrum zdarzeń przy użyciu protokołu Kafka.
W tym artykule dowiesz się, jak połączyć usługę Azure Event Hubs z usługą Apache Flink w usłudze HDInsight w usłudze AKS i zapoznać się z następującymi tematami
- Tworzenie przestrzeni nazw usługi Event Hubs
- Tworzenie usługi HDInsight w klastrze usługi AKS za pomocą narzędzia Apache Flink
- Uruchamianie producenta Flink
- Pakiet Jar dla platformy Apache Flink
- Przesyłanie i walidacja zadań
Tworzenie przestrzeni nazw usługi Event Hubs i usługi Event Hubs
Aby utworzyć przestrzeń nazw usługi Event Hubs i usługę Event Hubs, zobacz tutaj
Konfigurowanie klastra Flink w usłudze HDInsight w usłudze AKS
Korzystając z istniejącej puli klastrów usługi HDInsight w usłudze AKS, można utworzyć klaster Flink
Uruchom producenta Flink, dodając plik bootstrap.servers i
producer.config
informacjebootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Zastąp
{YOUR.EVENTHUBS.CONNECTION.STRING}
element parametry połączenia przestrzeni nazw usługi Event Hubs. Aby uzyskać instrukcje dotyczące uzyskiwania parametry połączenia, zobacz szczegółowe informacje na temat uzyskiwania parametry połączenia usługi Event Hubs.Na przykład:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Pakowanie pliku JAR na potrzeby
Com.example.app pakietu;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Dodaj fragment kodu, aby uruchomić producenta Flink.
Po wykonaniu kodu zdarzenia są przechowywane w temacie "topic1 "
Odwołanie
- Witryna internetowa platformy Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink i skojarzone nazwy projektów typu open source są znakami towarowymi platformy Apache Software Foundation (ASF).