Использование Apache Kafka® в HDInsight с Apache Flink® в HDInsight в AKS
Примечание.
Мы отставим Azure HDInsight в AKS 31 января 2025 г. До 31 января 2025 г. необходимо перенести рабочие нагрузки в Microsoft Fabric или эквивалентный продукт Azure, чтобы избежать резкого прекращения рабочих нагрузок. Оставшиеся кластеры в подписке будут остановлены и удалены из узла.
До даты выхода на пенсию будет доступна только базовая поддержка.
Внимание
Эта функция в настоящее время доступна для предварительного ознакомления. Дополнительные условия использования для предварительных версий Microsoft Azure включают более юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в статье Azure HDInsight в предварительной версии AKS. Для вопросов или предложений функций отправьте запрос на AskHDInsight с подробными сведениями и следуйте за нами для получения дополнительных обновлений в сообществе Azure HDInsight.
Хорошо известный вариант использования Apache Flink — это stream analytics. Популярный выбор многих пользователей для использования потоков данных, которые используются с помощью Apache Kafka. Типичные установки Flink и Kafka начинаются с потоков событий, отправляемых в Kafka, которые могут использоваться заданиями Flink.
В этом примере hdInsight в кластерах AKS, работающих под управлением Flink 1.17.0, для обработки потоковой передачи данных и создания раздела Kafka.
Примечание.
FlinkKafkaConsumer устарел и будет удален с Flink 1.17, используйте KafkaSource вместо этого. FlinkKafkaProducer устарел и будет удален с Flink 1.15, используйте KafkaSink вместо этого.
Необходимые компоненты
Как Kafka, так и Flink должны находиться в одной виртуальной сети или между двумя кластерами должен быть пиринг между виртуальными сетями.
Создайте кластер Kafka в той же виртуальной сети. Вы можете выбрать Kafka 3.2 или 2.4 в HDInsight на основе текущего использования.
Добавьте сведения о виртуальной сети в разделе виртуальной сети.
Создайте HDInsight в пуле кластеров AKS с одной виртуальной сетью.
Создайте кластер Flink в созданном пуле кластера.
Соединитель Apache Kafka
Flink предоставляет соединитель Apache Kafka для чтения данных из и записи данных в разделы Kafka с точной гарантией.
Зависимость Maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
Строительство приемника Kafka
Приемник Kafka предоставляет класс построителя для создания экземпляра KafkaSink. Мы используем то же самое для создания приемника и использования его вместе с кластером Flink, работающим в HDInsight в AKS
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
Написание программы Java Event.java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
Упаковка jar-файла и отправка задания в Flink
В Webssh отправьте jar-файл и отправьте jar-файл.
Пользовательский интерфейс панели мониторинга Flink
Создание раздела — щелкает Kafka
Использование раздела — события в Kafka
Справочные материалы
- Соединитель Apache Kafka
- Apache, Apache Kafka, Kafka, Apache Flink, Flink и связанные открытый код имена проектов являются товарными знаками Apache Software Foundation (ASF).