Sdílet prostřednictvím


Použití Apache Kafka® ve službě HDInsight s Apache Flinkem® ve službě HDInsight v AKS

Důležité

Tato funkce je aktuálně dostupná jako ukázková verze. Doplňkové podmínky použití pro Microsoft Azure Preview obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta verzi, ve verzi Preview nebo ještě nejsou vydány v obecné dostupnosti. Informace o této konkrétní verzi Preview najdete v tématu Azure HDInsight o službě AKS ve verzi Preview. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost na AskHDInsight s podrobnostmi a sledujte nás o dalších aktualizacích v komunitě Azure HDInsight.

Známý případ použití pro Apache Flink je stream analytics. Oblíbená volba mnoha uživatelů k používání datových proudů, které se ingestují pomocí Apache Kafka. Typické instalace Flinku a Kafka začínají tím, že streamy událostí se odsílají do Kafka, které můžou využívat úlohy Flink.

Tento příklad používá HDInsight v clusterech AKS se systémem Flink 1.17.0 ke zpracování streamovaných dat s využitím a vytváření tématu Kafka.

Poznámka:

FlinkKafkaConsumer je zastaralý a odebere se pomocí Flinku 1.17, použijte místo toho KafkaSource. FlinkKafkaProducer je zastaralý a bude odebrán s Flink 1.15, použijte místo toho KafkaSink.

Požadavky

  • Kafka i Flink musí být ve stejné virtuální síti nebo by mezi těmito dvěma clustery měly existovat vnet-peering.

  • Vytvoření virtuální sítě

  • Vytvořte cluster Kafka ve stejné virtuální síti. Na základě aktuálního využití můžete ve službě HDInsight zvolit Kafka 3.2 nebo 2.4.

    Snímek obrazovky znázorňující, jak vytvořit cluster Kafka ve stejné virtuální síti

  • Do části Virtuální síť přidejte podrobnosti o virtuální síti.

  • Vytvořte HDInsight ve fondu clusteru AKS se stejnou virtuální sítí.

  • Vytvořte cluster Flink s vytvořeným fondem clusteru.

Apache Kafka Připojení or

Flink poskytuje apache kafka Připojení or pro čtení dat z témat Kafka a zápis dat do témat Kafka s přesně jednou zárukou.

Závislost Mavenu

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Sestavení jímky Kafka

Jímka Kafka poskytuje třídu tvůrce pro vytvoření instance KafkaSinku. Používáme totéž k vytvoření jímky a jejímu použití společně s clusterem Flink běžícím v HDInsight v AKS.

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Psaní programu v Javě Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

Na webu Webssh nahrajte soubor JAR a odešlete soubor JAR.

Snímek obrazovky zobrazující úlohu spuštěnou na Flinku

V uživatelském rozhraní řídicího panelu Flink

Snímek obrazovky znázorňující, jak odeslat soubor JAR s balíčkem Kafka jako úlohu do Flinku

Vytvoření tématu – kliknutí na Kafka

Snímek obrazovky znázorňující, jak vytvořit téma Kafka

Využívání tématu – události v systému Kafka

Snímek obrazovky znázorňující, jak využívat téma Kafka

Reference