Samouczek: przetwarzanie zdarzeń platformy Apache Kafka dla usługi Event Hubs przy użyciu usługi Stream Analytics
W tym artykule pokazano, jak przesyłać strumieniowo dane do usługi Event Hubs i przetwarzać je za pomocą usługi Azure Stream Analytics. Przeprowadzi Cię przez następujące kroki:
- Utwórz przestrzeń nazw usługi Event Hubs.
- Utwórz klienta platformy Kafka, który wysyła komunikaty do centrum zdarzeń.
- Utwórz zadanie usługi Stream Analytics, które kopiuje dane z centrum zdarzeń do usługi Azure Blob Storage.
Nie trzeba zmieniać klientów protokołu ani uruchamiać własnych klastrów w przypadku korzystania z punktu końcowego platformy Kafka uwidocznionego przez centrum zdarzeń. Azure Event Hubs obsługuje platformę Apache Kafka w wersji 1.0 lub nowszej.
Wymagania wstępne
Aby ukończyć ten przewodnik Szybki start, upewnij się, że dysponujesz następującymi elementami:
- Subskrypcja platformy Azure. Jeśli nie masz subskrypcji, przed rozpoczęciem utwórz bezpłatne konto.
- Zestaw Java Development Kit (JDK) 1.7+
- Pobierz i zainstaluj archiwum binarne Maven.
- Usługa Git
- Konto usługi Azure Storage. Jeśli go nie masz, utwórz go przed kontynuowaniem. Zadanie usługi Stream Analytics w tym przewodniku przechowuje dane wyjściowe w magazynie obiektów blob platformy Azure.
Tworzenie przestrzeni nazw usługi Event Hubs
Podczas tworzenia przestrzeni nazw usługi Event Hubs punkt końcowy platformy Kafka dla przestrzeni nazw jest automatycznie włączony. Zdarzenia można przesyłać strumieniowo z aplikacji korzystających z protokołu Kafka do centrów zdarzeń. Postępuj zgodnie z instrukcjami krok po kroku w temacie Tworzenie centrum zdarzeń przy użyciu Azure Portal, aby utworzyć przestrzeń nazw usługi Event Hubs. Jeśli używasz dedykowanego klastra, zobacz Tworzenie przestrzeni nazw i centrum zdarzeń w dedykowanym klastrze.
Uwaga
Usługa Event Hubs dla platformy Kafka nie jest obsługiwana w warstwie Podstawowa .
Wysyłanie komunikatów za pomocą platformy Kafka w usłudze Event Hubs
Sklonuj Azure Event Hubs repozytorium platformy Kafka na maszynę.
Przejdź do folderu :
azure-event-hubs-for-kafka/quickstart/java/producer
.Zaktualizuj szczegóły konfiguracji producenta w programie
src/main/resources/producer.config
. Określ nazwę i parametry połączenia dla przestrzeni nazw centrum zdarzeń.bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
Przejdź do
azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/
pliku i otwórz plik TestDataReporter.java w wybranym edytorze.Oznacz jako komentarz następujący wiersz kodu:
//final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
Dodaj następujący wiersz kodu zamiast kodu z komentarzem:
final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");
Ten kod wysyła dane zdarzenia w formacie JSON . Podczas konfigurowania danych wejściowych dla zadania usługi Stream Analytics należy określić format JSON dla danych wejściowych.
Uruchom producenta i przesyłaj strumieniowo do usługi Event Hubs. Na maszynie z systemem Windows podczas korzystania z wiersza poleceniaNode.js przejdź do
azure-event-hubs-for-kafka/quickstart/java/producer
folderu przed uruchomieniem tych poleceń.mvn clean package mvn exec:java -Dexec.mainClass="TestProducer"
Sprawdzanie, czy centrum zdarzeń odbiera dane
Wybierz pozycję Event Hubs w obszarze JEDNOSTKI. Upewnij się, że zostanie wyświetlone centrum zdarzeń o nazwie test.
Upewnij się, że komunikaty są wyświetlane w centrum zdarzeń.
Przetwarzanie danych zdarzeń przy użyciu zadania usługi Stream Analytics
W tej sekcji utworzysz zadanie usługi Azure Stream Analytics. Klient platformy Kafka wysyła zdarzenia do centrum zdarzeń. Tworzysz zadanie usługi Stream Analytics, które pobiera dane zdarzeń jako dane wejściowe i wyprowadza je do magazynu obiektów blob platformy Azure. Jeśli nie masz konta usługi Azure Storage, utwórz je.
Zapytanie w zadaniu usługi Stream Analytics przechodzi przez dane bez przeprowadzania analizy. Możesz utworzyć zapytanie, które przekształca dane wejściowe w celu wygenerowania danych wyjściowych w innym formacie lub przy użyciu szczegółowych informacji.
Tworzenie zadania usługi Stream Analytics
- Wybierz pozycję + Utwórz zasób w Azure Portal.
- Wybierz pozycję Analiza w menu Azure Marketplace i wybierz pozycję Zadanie usługi Stream Analytics.
- Na stronie Nowa usługa Stream Analytics wykonaj następujące czynności:
Wprowadź nazwę zadania.
Wybierz subskrypcję.
Wybierz pozycję Utwórz nową dla grupy zasobów i wprowadź nazwę. Możesz również użyć istniejącej grupy zasobów.
Wybierz lokalizację zadania.
Wybierz pozycję Utwórz , aby utworzyć zadanie.
Konfigurowanie danych wejściowych zadania
W komunikacie z powiadomieniem wybierz pozycję Przejdź do zasobu, aby wyświetlić stronę zadania usługi Stream Analytics .
Wybierz pozycję Dane wejściowe w sekcji TOPOLOGIA ZADANIA w menu po lewej stronie.
Wybierz pozycję Dodaj dane wejściowe strumienia, a następnie wybierz pozycję Centrum zdarzeń.
Na stronie Konfiguracja danych wejściowych centrum zdarzeń wykonaj następujące czynności:
Określ alias dla danych wejściowych.
Wybierz subskrypcję platformy Azure.
Wybierz utworzoną wcześniej przestrzeń nazw centrum zdarzeń .
Wybierz pozycję test dla centrum zdarzeń.
Wybierz pozycję Zapisz.
Konfigurowanie danych wyjściowych zadania
- Wybierz pozycję Dane wyjściowe w sekcji TOPOLOGIA ZADANIA w menu.
- Wybierz pozycję + Dodaj na pasku narzędzi i wybierz pozycję Blob Storage
- Na stronie Ustawienia danych wyjściowych usługi Blob Storage wykonaj następujące czynności:
Określ alias dla danych wyjściowych.
Wybierz subskrypcję platformy Azure.
Wybierz konto usługi Azure Storage.
Wprowadź nazwę kontenera , który przechowuje dane wyjściowe z zapytania usługi Stream Analytics.
Wybierz pozycję Zapisz.
Definiowanie zapytania
Po skonfigurowaniu zadania usługi Stream Analytics do odczytu przychodzącego strumienia danych następnym krokiem jest utworzenie przekształcenia, które analizuje dane w czasie rzeczywistym. Zapytanie przekształcenia należy zdefiniować przy użyciu języka zapytań usługi Stream Analytics. W tym przewodniku zdefiniujesz zapytanie, które przechodzi przez dane bez wykonywania żadnych przekształceń.
Wybierz pozycję Zapytanie.
W oknie zapytania zastąp
[YourOutputAlias]
element aliasem wyjściowym utworzonym wcześniej.Zastąp
[YourInputAlias]
element aliasem wejściowym utworzonym wcześniej.Wybierz pozycję Zapisz na pasku narzędzi.
Uruchamianie zadania usługi Stream Analytics
Wybierz pozycję Przegląd w menu po lewej stronie.
Wybierz pozycję Uruchom.
Na stronie Uruchamianie zadania wybierz pozycję Uruchom.
Poczekaj, aż stan zadania zmieni się z Uruchamianie na uruchomione.
Testowanie scenariusza
Uruchom ponownie producenta platformy Kafka , aby wysyłać zdarzenia do centrum zdarzeń.
mvn exec:java -Dexec.mainClass="TestProducer"
Upewnij się, że dane wyjściowe są generowane w usłudze Azure Blob Storage. W kontenerze zostanie wyświetlony plik JSON zawierający 100 wierszy, które wyglądają podobnie do następujących przykładowych wierszy:
{"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
Zadanie usługi Azure Stream Analytics odebrało dane wejściowe z centrum zdarzeń i przechowywało je w magazynie obiektów blob platformy Azure w tym scenariuszu.
Następne kroki
W tym artykule przedstawiono sposób przesyłania strumieniowego do usługi Event Hubs bez zmieniania klientów protokołu ani uruchamiania własnych klastrów. Aby dowiedzieć się więcej na temat usługi Event Hubs dla platformy Apache Kafka, zobacz Przewodnik dewelopera platformy Apache Kafka dotyczący platformy Azure Event Hubs.