Použití Apache Kafka® ve službě HDInsight s Apache Flinkem® ve službě HDInsight v AKS
Důležité
Tato funkce je aktuálně dostupná jako ukázková verze. Doplňkové podmínky použití pro Microsoft Azure Preview obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta verzi, ve verzi Preview nebo ještě nejsou vydány v obecné dostupnosti. Informace o této konkrétní verzi Preview najdete v tématu Azure HDInsight o službě AKS ve verzi Preview. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost na AskHDInsight s podrobnostmi a sledujte nás o dalších aktualizacích v komunitě Azure HDInsight.
Známý případ použití pro Apache Flink je stream analytics. Oblíbená volba mnoha uživatelů k používání datových proudů, které se ingestují pomocí Apache Kafka. Typické instalace Flinku a Kafka začínají tím, že streamy událostí se odsílají do Kafka, které můžou využívat úlohy Flink.
Tento příklad používá HDInsight v clusterech AKS se systémem Flink 1.17.0 ke zpracování streamovaných dat s využitím a vytváření tématu Kafka.
Poznámka:
FlinkKafkaConsumer je zastaralý a odebere se pomocí Flinku 1.17, použijte místo toho KafkaSource. FlinkKafkaProducer je zastaralý a bude odebrán s Flink 1.15, použijte místo toho KafkaSink.
Požadavky
Kafka i Flink musí být ve stejné virtuální síti nebo by mezi těmito dvěma clustery měly existovat vnet-peering.
Vytvořte cluster Kafka ve stejné virtuální síti. Na základě aktuálního využití můžete ve službě HDInsight zvolit Kafka 3.2 nebo 2.4.
Do části Virtuální síť přidejte podrobnosti o virtuální síti.
Vytvořte HDInsight ve fondu clusteru AKS se stejnou virtuální sítí.
Vytvořte cluster Flink s vytvořeným fondem clusteru.
Apache Kafka Připojení or
Flink poskytuje apache kafka Připojení or pro čtení dat z témat Kafka a zápis dat do témat Kafka s přesně jednou zárukou.
Závislost Mavenu
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
Sestavení jímky Kafka
Jímka Kafka poskytuje třídu tvůrce pro vytvoření instance KafkaSinku. Používáme totéž k vytvoření jímky a jejímu použití společně s clusterem Flink běžícím v HDInsight v AKS.
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
Psaní programu v Javě Event.java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
Zabalte soubor JAR a odešlete úlohu do Flinku.
Na webu Webssh nahrajte soubor JAR a odešlete soubor JAR.
V uživatelském rozhraní řídicího panelu Flink
Vytvoření tématu – kliknutí na Kafka
Využívání tématu – události v systému Kafka
Reference
- Apache Kafka Připojení or
- Názvy apache, Apache Kafka, Kafka, Apache Flink, Flink a přidružených opensourcových projektů jsou ochranné známky Apache Software Foundation (ASF).
Váš názor
https://aka.ms/ContentUserFeedback.
Připravujeme: V průběhu roku 2024 budeme postupně vyřazovat problémy z GitHub coby mechanismus zpětné vazby pro obsah a nahrazovat ho novým systémem zpětné vazby. Další informace naleznete v tématu:Odeslat a zobrazit názory pro