Udostępnij za pośrednictwem


Łączenie platformy Apache Flink® w usłudze HDInsight w usłudze AKS przy użyciu usługi Azure Event Hubs dla platformy Apache Kafka®

Uwaga

Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.

Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.

Ważne

Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.

Dobrze znany przypadek użycia platformy Apache Flink to analiza strumienia. Popularny wybór przez wielu użytkowników do korzystania ze strumieni danych, które są pozyskiwane przy użyciu platformy Apache Kafka. Typowe instalacje Flink i Kafka zaczynają się od wypychania strumieni zdarzeń do platformy Kafka, które mogą być używane przez zadania Flink. Usługa Azure Event Hubs udostępnia punkt końcowy platformy Apache Kafka w centrum zdarzeń, który umożliwia użytkownikom łączenie się z centrum zdarzeń przy użyciu protokołu Kafka.

W tym artykule dowiesz się, jak połączyć usługę Azure Event Hubs z usługą Apache Flink w usłudze HDInsight w usłudze AKS i zapoznać się z następującymi tematami

  • Tworzenie przestrzeni nazw usługi Event Hubs
  • Tworzenie usługi HDInsight w klastrze usługi AKS za pomocą narzędzia Apache Flink
  • Uruchamianie producenta Flink
  • Pakiet Jar dla platformy Apache Flink
  • Przesyłanie i walidacja zadań

Tworzenie przestrzeni nazw usługi Event Hubs i usługi Event Hubs

  1. Aby utworzyć przestrzeń nazw usługi Event Hubs i usługę Event Hubs, zobacz tutaj

    Zrzut ekranu przedstawiający konfigurację usługi Event Hubs.

  1. Korzystając z istniejącej puli klastrów usługi HDInsight w usłudze AKS, można utworzyć klaster Flink

  2. Uruchom producenta Flink, dodając plik bootstrap.servers i producer.config informacje

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Zastąp {YOUR.EVENTHUBS.CONNECTION.STRING} element parametry połączenia przestrzeni nazw usługi Event Hubs. Aby uzyskać instrukcje dotyczące uzyskiwania parametry połączenia, zobacz szczegółowe informacje na temat uzyskiwania parametry połączenia usługi Event Hubs.

    Na przykład:

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Com.example.app pakietu;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Dodaj fragment kodu, aby uruchomić producenta Flink.

    Zrzut ekranu przedstawiający sposób testowania w usłudze Event Hubs.

  3. Po wykonaniu kodu zdarzenia są przechowywane w temacie "topic1 "

    Zrzut ekranu przedstawiający usługę Event Hubs przechowywaną w temacie.

Odwołanie