Samouczek: przetwarzanie zdarzeń platformy Apache Kafka dla usługi Event Hubs przy użyciu usługi Stream Analytics

W tym artykule pokazano, jak przesyłać strumieniowo dane do usługi Event Hubs i przetwarzać je za pomocą usługi Azure Stream Analytics. Przeprowadzi Cię przez następujące kroki:

  1. Utwórz przestrzeń nazw usługi Event Hubs.
  2. Utwórz klienta platformy Kafka, który wysyła komunikaty do centrum zdarzeń.
  3. Utwórz zadanie usługi Stream Analytics, które kopiuje dane z centrum zdarzeń do usługi Azure Blob Storage.

Nie trzeba zmieniać klientów protokołu ani uruchamiać własnych klastrów w przypadku korzystania z punktu końcowego platformy Kafka uwidocznionego przez centrum zdarzeń. Azure Event Hubs obsługuje platformę Apache Kafka w wersji 1.0 lub nowszej.

Wymagania wstępne

Aby ukończyć ten przewodnik Szybki start, upewnij się, że dysponujesz następującymi elementami:

Tworzenie przestrzeni nazw usługi Event Hubs

Podczas tworzenia przestrzeni nazw usługi Event Hubs punkt końcowy platformy Kafka dla przestrzeni nazw jest automatycznie włączony. Zdarzenia można przesyłać strumieniowo z aplikacji korzystających z protokołu Kafka do centrów zdarzeń. Postępuj zgodnie z instrukcjami krok po kroku w temacie Tworzenie centrum zdarzeń przy użyciu Azure Portal, aby utworzyć przestrzeń nazw usługi Event Hubs. Jeśli używasz dedykowanego klastra, zobacz Tworzenie przestrzeni nazw i centrum zdarzeń w dedykowanym klastrze.

Uwaga

Usługa Event Hubs dla platformy Kafka nie jest obsługiwana w warstwie Podstawowa .

Wysyłanie komunikatów za pomocą platformy Kafka w usłudze Event Hubs

  1. Sklonuj Azure Event Hubs repozytorium platformy Kafka na maszynę.

  2. Przejdź do folderu : azure-event-hubs-for-kafka/quickstart/java/producer.

  3. Zaktualizuj szczegóły konfiguracji producenta w programie src/main/resources/producer.config. Określ nazwę i parametry połączenia dla przestrzeni nazw centrum zdarzeń.

    bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
    
  4. Przejdź do azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/pliku i otwórz plik TestDataReporter.java w wybranym edytorze.

  5. Oznacz jako komentarz następujący wiersz kodu:

                //final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
    
  6. Dodaj następujący wiersz kodu zamiast kodu z komentarzem:

                final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");            
    

    Ten kod wysyła dane zdarzenia w formacie JSON . Podczas konfigurowania danych wejściowych dla zadania usługi Stream Analytics należy określić format JSON dla danych wejściowych.

  7. Uruchom producenta i przesyłaj strumieniowo do usługi Event Hubs. Na maszynie z systemem Windows podczas korzystania z wiersza poleceniaNode.js przejdź do azure-event-hubs-for-kafka/quickstart/java/producer folderu przed uruchomieniem tych poleceń.

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    

Sprawdzanie, czy centrum zdarzeń odbiera dane

  1. Wybierz pozycję Event Hubs w obszarze JEDNOSTKI. Upewnij się, że zostanie wyświetlone centrum zdarzeń o nazwie test.

    Centrum zdarzeń — testowanie

  2. Upewnij się, że komunikaty są wyświetlane w centrum zdarzeń.

    Centrum zdarzeń — komunikaty

Przetwarzanie danych zdarzeń przy użyciu zadania usługi Stream Analytics

W tej sekcji utworzysz zadanie usługi Azure Stream Analytics. Klient platformy Kafka wysyła zdarzenia do centrum zdarzeń. Tworzysz zadanie usługi Stream Analytics, które pobiera dane zdarzeń jako dane wejściowe i wyprowadza je do magazynu obiektów blob platformy Azure. Jeśli nie masz konta usługi Azure Storage, utwórz je.

Zapytanie w zadaniu usługi Stream Analytics przechodzi przez dane bez przeprowadzania analizy. Możesz utworzyć zapytanie, które przekształca dane wejściowe w celu wygenerowania danych wyjściowych w innym formacie lub przy użyciu szczegółowych informacji.

Tworzenie zadania usługi Stream Analytics

  1. Wybierz pozycję + Utwórz zasób w Azure Portal.
  2. Wybierz pozycję Analiza w menu Azure Marketplace i wybierz pozycję Zadanie usługi Stream Analytics.
  3. Na stronie Nowa usługa Stream Analytics wykonaj następujące czynności:
    1. Wprowadź nazwę zadania.

    2. Wybierz subskrypcję.

    3. Wybierz pozycję Utwórz nową dla grupy zasobów i wprowadź nazwę. Możesz również użyć istniejącej grupy zasobów.

    4. Wybierz lokalizację zadania.

    5. Wybierz pozycję Utwórz , aby utworzyć zadanie.

      Nowe zadanie usługi Stream Analytics

Konfigurowanie danych wejściowych zadania

  1. W komunikacie z powiadomieniem wybierz pozycję Przejdź do zasobu, aby wyświetlić stronę zadania usługi Stream Analytics .

  2. Wybierz pozycję Dane wejściowe w sekcji TOPOLOGIA ZADANIA w menu po lewej stronie.

  3. Wybierz pozycję Dodaj dane wejściowe strumienia, a następnie wybierz pozycję Centrum zdarzeń.

    Dodawanie centrum zdarzeń jako danych wejściowych

  4. Na stronie Konfiguracja danych wejściowych centrum zdarzeń wykonaj następujące czynności:

    1. Określ alias dla danych wejściowych.

    2. Wybierz subskrypcję platformy Azure.

    3. Wybierz utworzoną wcześniej przestrzeń nazw centrum zdarzeń .

    4. Wybierz pozycję test dla centrum zdarzeń.

    5. Wybierz pozycję Zapisz.

      Konfiguracja danych wejściowych centrum zdarzeń

Konfigurowanie danych wyjściowych zadania

  1. Wybierz pozycję Dane wyjściowe w sekcji TOPOLOGIA ZADANIA w menu.
  2. Wybierz pozycję + Dodaj na pasku narzędzi i wybierz pozycję Blob Storage
  3. Na stronie Ustawienia danych wyjściowych usługi Blob Storage wykonaj następujące czynności:
    1. Określ alias dla danych wyjściowych.

    2. Wybierz subskrypcję platformy Azure.

    3. Wybierz konto usługi Azure Storage.

    4. Wprowadź nazwę kontenera , który przechowuje dane wyjściowe z zapytania usługi Stream Analytics.

    5. Wybierz pozycję Zapisz.

      Konfiguracja danych wyjściowych usługi Blob Storage

Definiowanie zapytania

Po skonfigurowaniu zadania usługi Stream Analytics do odczytu przychodzącego strumienia danych następnym krokiem jest utworzenie przekształcenia, które analizuje dane w czasie rzeczywistym. Zapytanie przekształcenia należy zdefiniować przy użyciu języka zapytań usługi Stream Analytics. W tym przewodniku zdefiniujesz zapytanie, które przechodzi przez dane bez wykonywania żadnych przekształceń.

  1. Wybierz pozycję Zapytanie.

  2. W oknie zapytania zastąp [YourOutputAlias] element aliasem wyjściowym utworzonym wcześniej.

  3. Zastąp [YourInputAlias] element aliasem wejściowym utworzonym wcześniej.

  4. Wybierz pozycję Zapisz na pasku narzędzi.

    Zrzut ekranu przedstawia okno zapytania z wartościami zmiennych wejściowych i wyjściowych.

Uruchamianie zadania usługi Stream Analytics

  1. Wybierz pozycję Przegląd w menu po lewej stronie.

  2. Wybierz pozycję Uruchom.

    Menu Start

  3. Na stronie Uruchamianie zadania wybierz pozycję Uruchom.

    Strona uruchamiania zadania

  4. Poczekaj, aż stan zadania zmieni się z Uruchamianie na uruchomione.

    Stan zadania — uruchomione

Testowanie scenariusza

  1. Uruchom ponownie producenta platformy Kafka , aby wysyłać zdarzenia do centrum zdarzeń.

    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    
  2. Upewnij się, że dane wyjściowe są generowane w usłudze Azure Blob Storage. W kontenerze zostanie wyświetlony plik JSON zawierający 100 wierszy, które wyglądają podobnie do następujących przykładowych wierszy:

    {"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    

    Zadanie usługi Azure Stream Analytics odebrało dane wejściowe z centrum zdarzeń i przechowywało je w magazynie obiektów blob platformy Azure w tym scenariuszu.

Następne kroki

W tym artykule przedstawiono sposób przesyłania strumieniowego do usługi Event Hubs bez zmieniania klientów protokołu ani uruchamiania własnych klastrów. Aby dowiedzieć się więcej na temat usługi Event Hubs dla platformy Apache Kafka, zobacz Przewodnik dewelopera platformy Apache Kafka dotyczący platformy Azure Event Hubs.