Connessione Apache Flink® in HDInsight nel servizio Azure Kubernetes con Hub eventi di Azure per Apache Kafka®
Importante
Questa funzionalità è attualmente disponibile solo in anteprima. Le condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure includono termini legali più validi applicabili alle funzionalità di Azure disponibili in versione beta, in anteprima o non ancora rilasciate nella disponibilità generale. Per informazioni su questa anteprima specifica, vedere Informazioni sull'anteprima di Azure HDInsight nel servizio Azure Kubernetes. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguire microsoft per altri aggiornamenti nella community di Azure HDInsight.
Un caso d'uso noto per Apache Flink è l'analisi di flusso. Scelta comune da parte di molti utenti per l'uso dei flussi di dati, inseriti con Apache Kafka. Le installazioni tipiche di Flink e Kafka iniziano con il push dei flussi di eventi in Kafka, che possono essere utilizzati dai processi Flink. Hub eventi di Azure fornisce un endpoint Apache Kafka in un hub eventi, che consente agli utenti di connettersi all'hub eventi usando il protocollo Kafka.
Questo articolo illustra come connettersi Hub eventi di Azure con Apache Flink in HDInsight nel servizio Azure Kubernetes e illustra quanto segue
- Creare uno spazio dei nomi di Hub eventi
- Creare un cluster HDInsight nel servizio Azure Kubernetes con Apache Flink
- Eseguire il producer Flink
- Jar del pacchetto per Apache Flink
- Invio e convalida dei processi
Creare uno spazio dei nomi di Hub eventi e Hub eventi
Per creare uno spazio dei nomi di Hub eventi e Hub eventi, vedere qui
Configurare il cluster Flink in HDInsight nel servizio Azure Kubernetes
Usando HDInsight esistente nel pool di cluster del servizio Azure Kubernetes, è possibile creare un cluster Flink
Eseguire il producer Flink aggiungendo bootstrap.servers e le
producer.config
informazionibootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Sostituire
{YOUR.EVENTHUBS.CONNECTION.STRING}
con la stringa di connessione per lo spazio dei nomi di Hub eventi. Per istruzioni su come ottenere il stringa di connessione, vedere i dettagli su come ottenere un stringa di connessione di Hub eventi.ad esempio:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Creazione del pacchetto JAR per Flink
Com.example.app pacchetto;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Aggiungere il frammento di codice per eseguire Flink Producer.
Dopo l'esecuzione del codice, gli eventi vengono archiviati nell'argomento "topic1"
Riferimento
- Sito Web Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink e i nomi dei progetti open source associati sono marchi di Apache Software Foundation (ASF).
Commenti e suggerimenti
https://aka.ms/ContentUserFeedback.
Presto disponibile: Nel corso del 2024 verranno gradualmente disattivati i problemi di GitHub come meccanismo di feedback per il contenuto e ciò verrà sostituito con un nuovo sistema di feedback. Per altre informazioni, vedereInvia e visualizza il feedback per