Csatlakozás Apache Flink® a HDInsighton az AKS-en az Azure Event Hubs for Apache Kafkával®
Fontos
Ez a szolgáltatás jelenleg előzetes kiadásban elérhető. A Microsoft Azure Előzetes verzió kiegészítő használati feltételei további jogi feltételeket tartalmaznak, amelyek a bétaverzióban, előzetes verzióban vagy egyébként még nem általánosan elérhető Azure-funkciókra vonatkoznak. Erről az adott előzetes verzióról az Azure HDInsight az AKS előzetes verziójában tájékozódhat. Ha kérdése vagy funkciójavaslata van, küldjön egy kérést az AskHDInsightban a részletekkel együtt, és kövessen minket további frissítésekért az Azure HDInsight-közösségről.
Az Apache Flink jól ismert használati esete a streamelemzés. Sok felhasználó népszerű választása az Apache Kafka használatával betöltött adatfolyamok használatára. Az Flink és a Kafka tipikus telepítései az eseménystreamek Kafkába való leküldésével kezdődnek, amelyeket a Flink-feladatok felhasználhatnak. Az Azure Event Hubs egy Apache Kafka-végpontot biztosít egy eseményközponton, amely lehetővé teszi a felhasználók számára, hogy a Kafka protokoll használatával csatlakozzanak az eseményközponthoz.
Ebből a cikkből megtudhatja, hogyan csatlakoztathatja az Azure Event Hubsot az Apache Flinkhez a HDInsighton az AKS-en , és az alábbiakat ismertetjük
- Event Hubs-névtér létrehozása
- HDInsight létrehozása AKS-fürtön Apache Flink használatával
- Flink-gyártó futtatása
- Package Jar for Apache Flink
- Feladatbeküldés & érvényesítés
Event Hubs-névtér és Event Hubs létrehozása
Az Event Hubs-névtér és az Event Hubs létrehozásához lásd itt
Flink-fürt beállítása a HDInsighton az AKS-en
A meglévő HDInsight AKS-fürtkészleten való használatával Flink-fürtöt hozhat létre
Futtassa az Flink-gyártót a bootstrap.servers és az
producer.config
információk hozzáadásávalbootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Cserélje le
{YOUR.EVENTHUBS.CONNECTION.STRING}
az Event Hubs-névtér kapcsolati sztring. Az kapcsolati sztring beszerzésére vonatkozó utasításokért tekintse meg az Event Hubs kapcsolati sztring beszerzésének részleteit.Például:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
A JAR for Flink csomagolása
Csomag com.example.app;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Adja hozzá a kódrészletet a Flink Producer futtatásához.
A kód végrehajtása után az események a "topic1" témakörben lesznek tárolva
Referencia
- Apache Flink webhely
- Az Apache, az Apache Kafka, a Kafka, az Apache Flink, a Flink és a kapcsolódó nyílt forráskód projektnevek az Apache Software Foundation (ASF) védjegyei.
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: