Udostępnij za pośrednictwem


Korzystanie z platformy Apache Kafka® w usłudze HDInsight z platformą Apache Flink® w usłudze HDInsight w usłudze AKS

Uwaga

Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.

Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.

Ważne

Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.

Dobrze znany przypadek użycia platformy Apache Flink to analiza strumienia. Popularny wybór przez wielu użytkowników do korzystania ze strumieni danych, które są pozyskiwane przy użyciu platformy Apache Kafka. Typowe instalacje Flink i Kafka zaczynają się od wypychania strumieni zdarzeń do platformy Kafka, które mogą być używane przez zadania Flink.

W tym przykładzie użyto usługi HDInsight w klastrach usługi AKS z systemem Flink 1.17.0 w celu przetwarzania danych przesyłanych strumieniowo i tworzenia tematu platformy Kafka.

Uwaga

FlinkKafkaConsumer jest przestarzały i zostanie usunięty z Flink 1.17, zamiast tego użyj platformy KafkaSource. FlinkKafkaProducer jest przestarzały i zostanie usunięty z Flink 1.15, zamiast tego użyj platformy KafkaSink.

Wymagania wstępne

  • Zarówno platforma Kafka, jak i Flink muszą znajdować się w tej samej sieci wirtualnej lub między tymi dwoma klastrami powinna istnieć komunikacja równorzędna między sieciami wirtualnymi.

  • Tworzenie sieci wirtualnej.

  • Utwórz klaster platformy Kafka w tej samej sieci wirtualnej. Możesz wybrać platformę Kafka 3.2 lub 2.4 w usłudze HDInsight w oparciu o bieżące użycie.

    Zrzut ekranu przedstawiający sposób tworzenia klastra platformy Kafka w tej samej sieci wirtualnej.

  • Dodaj szczegóły sieci wirtualnej w sekcji sieć wirtualna.

  • Utwórz usługę HDInsight w puli klastrów usługi AKS z tą samą siecią wirtualną.

  • Utwórz klaster Flink do utworzonej puli klastrów.

Łącznik platformy Apache Kafka

Funkcja Flink udostępnia łącznik platformy Apache Kafka do odczytywania danych z tematów platformy Kafka i zapisywania ich w tych tematach z dokładnie jednokrotnymi gwarancjami.

Zależność narzędzia Maven

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Kompilowanie ujścia platformy Kafka

Ujście platformy Kafka udostępnia klasę konstruktora do konstruowania wystąpienia platformy KafkaSink. Używamy tego samego do konstruowania ujścia i używania go wraz z klastrem Flink działającym w usłudze HDInsight w usłudze AKS

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Pisanie Event.java programu Java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

W witrynie Webssh przekaż plik jar i prześlij plik jar

Zrzut ekranu przedstawiający zadanie uruchomione na flinku.

W interfejsie użytkownika pulpitu nawigacyjnego Flink

Zrzut ekranu przedstawiający sposób przesyłania pliku JAR spakowanego tematu platformy Kafka jako zadania do Flink.

Tworzenie tematu — klika pozycję Kafka

Zrzut ekranu przedstawiający sposób tworzenia tematu platformy Kafka.

Korzystanie z tematu — zdarzenia na platformie Kafka

Zrzut ekranu przedstawiający sposób korzystania z tematu platformy Kafka.

Odwołanie