Az Apache Kafka® használata a HDInsighton a HDInsighton futó Apache Flinkdel® az AKS-en
Fontos
Ez a szolgáltatás jelenleg előzetes kiadásban elérhető. A Microsoft Azure Előzetes verzió kiegészítő használati feltételei további jogi feltételeket tartalmaznak, amelyek a bétaverzióban, előzetes verzióban vagy egyébként még nem általánosan elérhető Azure-funkciókra vonatkoznak. Erről az adott előzetes verzióról az Azure HDInsight az AKS előzetes verziójában tájékozódhat. Ha kérdése vagy funkciójavaslata van, küldjön egy kérést az AskHDInsightban a részletekkel együtt, és kövessen minket további frissítésekért az Azure HDInsight-közösségről.
Az Apache Flink jól ismert használati esete a streamelemzés. Sok felhasználó népszerű választása az Apache Kafka használatával betöltött adatfolyamok használatára. Az Flink és a Kafka tipikus telepítései az eseménystreamek Kafkába való leküldésével kezdődnek, amelyeket a Flink-feladatok felhasználhatnak.
Ez a példa a HDInsightot használja az Flink 1.17.0-t futtató AKS-fürtökön a streamelési adatok feldolgozásához és a Kafka-témakör előállításához.
Feljegyzés
A FlinkKafkaConsumer elavult, és az Flink 1.17-gyel lesz eltávolítva, használja helyette a KafkaSource-t. A FlinkKafkaProducer elavult, és az Flink 1.15-ös verzióval lesz eltávolítva, használja helyette a KafkaSinket.
Előfeltételek
A Kafkának és az Flinknek ugyanabban a virtuális hálózatban kell lennie, vagy virtuális hálózatok közötti társviszony-létesítésnek kell lennie a két fürt között.
Virtuális hálózat létrehozása.
Hozzon létre egy Kafka-fürtöt ugyanabban a virtuális hálózatban. A HDInsighton a Kafka 3.2-t vagy a 2.4-et az aktuális használat alapján választhatja ki.
Adja hozzá a virtuális hálózat részleteit a virtuális hálózat szakaszához.
Hozzon létre egy HDInsightot az AKS-fürtkészleten ugyanazzal a virtuális hálózattal.
Hozzon létre egy Flink-fürtöt a létrehozott fürtkészlethez.
Apache Kafka Csatlakozás or
Az Flink egy Apache Kafka Csatlakozás ort biztosít, amely pontosan egyszer garantálja az adatok Kafka-témakörökből való olvasását és írását.
Maven-függőség
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
Kafka Fogadó létrehozása
A Kafka-fogadó egy építőosztályt biztosít egy KafkaSink-példány létrehozásához. Ugyanezt használjuk a Fogadó felépítéséhez és használatához a HDInsighton futó Flink-fürttel együtt az AKS-en
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
Java-program írása Event.java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
Csomagolja be a jart, és küldje el a feladatot az Flinknek
Webssh-en töltse fel az üveget, és küldje el a jart
A Flink Irányítópult felhasználói felületén
A témakör készítése – kattintások a Kafkára
A témakör felhasználása – kafkai események
Referencia
- Apache Kafka Csatlakozás or
- Az Apache, az Apache Kafka, a Kafka, az Apache Flink, a Flink és a kapcsolódó nyílt forráskód projektnevek az Apache Software Foundation (ASF) védjegyei.
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: