Korzystanie z platformy Apache Kafka® w usłudze HDInsight z platformą Apache Flink® w usłudze HDInsight w usłudze AKS
Uwaga
Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.
Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.
Ważne
Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.
Dobrze znany przypadek użycia platformy Apache Flink to analiza strumienia. Popularny wybór przez wielu użytkowników do korzystania ze strumieni danych, które są pozyskiwane przy użyciu platformy Apache Kafka. Typowe instalacje Flink i Kafka zaczynają się od wypychania strumieni zdarzeń do platformy Kafka, które mogą być używane przez zadania Flink.
W tym przykładzie użyto usługi HDInsight w klastrach usługi AKS z systemem Flink 1.17.0 w celu przetwarzania danych przesyłanych strumieniowo i tworzenia tematu platformy Kafka.
Uwaga
FlinkKafkaConsumer jest przestarzały i zostanie usunięty z Flink 1.17, zamiast tego użyj platformy KafkaSource. FlinkKafkaProducer jest przestarzały i zostanie usunięty z Flink 1.15, zamiast tego użyj platformy KafkaSink.
Wymagania wstępne
Zarówno platforma Kafka, jak i Flink muszą znajdować się w tej samej sieci wirtualnej lub między tymi dwoma klastrami powinna istnieć komunikacja równorzędna między sieciami wirtualnymi.
Tworzenie sieci wirtualnej.
Utwórz klaster platformy Kafka w tej samej sieci wirtualnej. Możesz wybrać platformę Kafka 3.2 lub 2.4 w usłudze HDInsight w oparciu o bieżące użycie.
Dodaj szczegóły sieci wirtualnej w sekcji sieć wirtualna.
Utwórz usługę HDInsight w puli klastrów usługi AKS z tą samą siecią wirtualną.
Utwórz klaster Flink do utworzonej puli klastrów.
Łącznik platformy Apache Kafka
Funkcja Flink udostępnia łącznik platformy Apache Kafka do odczytywania danych z tematów platformy Kafka i zapisywania ich w tych tematach z dokładnie jednokrotnymi gwarancjami.
Zależność narzędzia Maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
Kompilowanie ujścia platformy Kafka
Ujście platformy Kafka udostępnia klasę konstruktora do konstruowania wystąpienia platformy KafkaSink. Używamy tego samego do konstruowania ujścia i używania go wraz z klastrem Flink działającym w usłudze HDInsight w usłudze AKS
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
Pisanie Event.java programu Java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
Spakuj plik jar i prześlij zadanie do Flink
W witrynie Webssh przekaż plik jar i prześlij plik jar
W interfejsie użytkownika pulpitu nawigacyjnego Flink
Tworzenie tematu — klika pozycję Kafka
Korzystanie z tematu — zdarzenia na platformie Kafka
Odwołanie
- Łącznik platformy Apache Kafka
- Apache, Apache Kafka, Kafka, Apache Flink, Flink i skojarzone nazwy projektów typu open source są znakami towarowymi platformy Apache Software Foundation (ASF).