Freigeben über


Verbinden von Apache Flink® auf HDInsight auf AKS mit Azure Event Hubs für Apache Kafka®

Wichtig

Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bedingungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight in AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.

Ein bekannter Anwendungsfall für Apache Flink ist Stream Analytics. Dies ist eine beliebte Wahl vieler Benutzer*innen, um die Datenströme zu verwenden, die mit Apache Kafka erfasst werden. Typische Installationen von Flink und Kafka beginnen mit Ereignisstreams, die an Kafka übertragen werden und von Flink-Aufträgen genutzt werden können. Azure Event Hubs stellt einen Apache Kafka-Endpunkt auf einem Event Hub bereit, mit dem Benutzer mithilfe des Kafka-Protokolls eine Verbindung mit dem Event Hub herstellen können.

In diesem Artikel erkunden wir, wie man Azure Event Hubs mit Apache Flink auf HDInsight auf AKS verbindet und es wird Folgendes behandelt

  • Erstellen eines Event Hubs-Namespace
  • Erstellen eines HDInsight on AKS-Clusters mit Apache Flink
  • Ausführen des Flink-Producers
  • Packen der JAR-Datei für Apache Flink
  • Auftragsübermittlung und Überprüfung

Erstellen eines Azure Event Hubs-Namespace und einer Event Hubs-Instanz

  1. Informationen zum Erstellen eines Event Hubs-Namespace und einer Event Hubs-Instanz finden Sie hier.

    Screenshot der Event Hubs-Einrichtung.

  1. Mithilfe eines vorhandenen HDInsight on AKS-Clusterpools können Sie einen Flink-Cluster erstellen.

  2. Führen Sie den Flink-Producer aus, der bootstrap.servers und die producer.config-Informationen hinzufügt.

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Ersetzen Sie {YOUR.EVENTHUBS.CONNECTION.STRING} durch die Verbindungszeichenfolge für Ihren Event Hubs-Namespace. Detaillierte Anweisungen zum Abrufen der Verbindungszeichenfolge finden Sie unter Abrufen einer Event Hubs-Verbindungszeichenfolge.

    Ein auf ein Objekt angewendeter

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Packen Sie „com.example.app“:

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Fügen Sie den Codeschnipsel hinzu, um den Flink-Producer auszuführen:

    Screenshot des Testens von Flink in Event Hubs.

  3. Nach dem Ausführen des Codes werden die Ereignisse im Thema topic1 gespeichert.

    Screenshot von im Thema gespeichertem Event Hubs.

Verweis