Integrowanie obsługi narzędzia Apache Kafka Connect w usłudze Azure Event Hubs
Apache Kafka Połączenie to struktura umożliwiająca łączenie i importowanie/eksportowanie danych z/do dowolnego systemu zewnętrznego, takiego jak MySQL, HDFS i system plików za pośrednictwem klastra Platformy Kafka. Ten samouczek przeprowadzi Cię przez proces korzystania z platformy Kafka Połączenie z usługą Event Hubs.
Ten samouczek przeprowadzi Cię przez proces integrowania platformy Kafka Połączenie z centrum zdarzeń i wdrażania podstawowych łączników FileStreamSource i FileStreamSink. Chociaż te łączniki nie są przeznaczone do użytku produkcyjnego, przedstawiają kompleksowe scenariusze platformy Kafka Połączenie, w którym usługa Azure Event Hubs działa jako broker platformy Kafka.
Uwaga
Ten przykład jest dostępny w witrynie GitHub.
W tym samouczku wykonasz następujące kroki:
- Tworzenie przestrzeni nazw usługi Event Hubs
- Klonowanie projektu przykładowego
- Konfigurowanie narzędzia Kafka Connect dla usługi Event Hubs
- Uruchamianie narzędzia Kafka Connect
- Tworzenie łączników
Wymagania wstępne
Aby ukończyć ten przewodnik, upewnij się, że dysponujesz następującymi elementami:
- Subskrypcja platformy Azure. Jeśli jej nie masz, utwórz bezpłatne konto.
- Usługa Git
- Linux/MacOS
- Najnowsza wersja platformy Kafka dostępna w kafka.apache.org
- Przeczytaj artykuł z wprowadzeniem Usługa Event Hubs dla platformy Apache Kafka
Tworzenie przestrzeni nazw usługi Event Hubs
Przestrzeń nazw usługi Event Hubs jest wymagana do wysyłania i odbierania zdarzeń z dowolnej usługi Event Hubs. Aby uzyskać instrukcje dotyczące tworzenia przestrzeni nazw i centrum zdarzeń, zobacz Tworzenie centrum zdarzeń. Pobierz parametry połączenia usługi Event Hubs i w pełni kwalifikowaną nazwę domeny (FQDN) w celu późniejszego użycia. Aby uzyskać instrukcje, zobacz Get an Event Hubs connection string (Pobieranie parametrów połączenia usługi Event Hubs).
Klonowanie projektu przykładowego
Sklonuj repozytorium usługi Azure Event Hubs i przejdź do podfolderu tutorials/connect:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Konfigurowanie narzędzia Kafka Connect dla usługi Event Hubs
Przekierowywanie przepływności narzędzia Kafka Connect z platformy Kafka do usługi Event Hubs wymaga minimalnej rekonfiguracji. W poniższym przykładowym pliku connect-distributed.properties
pokazano, jak skonfigurować narzędzie Connect do uwierzytelnienia i komunikowania się z punktem końcowym platformy Kafka w usłudze Event Hubs:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
Ważne
Zastąp {YOUR.EVENTHUBS.CONNECTION.STRING}
element parametry połączenia przestrzeni nazw usługi Event Hubs. Aby uzyskać instrukcje dotyczące uzyskiwania parametry połączenia, zobacz Pobieranie parametry połączenia usługi Event Hubs. Oto przykładowa konfiguracja: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Uruchamianie narzędzia Kafka Connect
W tym kroku proces roboczy narzędzia Kafka Connect został uruchomiony lokalnie w trybie rozproszonym przy użyciu usługi Event Hubs w celu zachowania stanu klastra.
- Zapisz powyższy plik
connect-distributed.properties
lokalnie. Zamień wszystkie wartości w nawiasach klamrowych. - Przejdź do lokalizacji platformy Kafka w maszynie.
- Uruchom program
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
. Pojawienie się tekstu'INFO Finished starting connectors and tasks'
oznacza, że interfejs API REST procesu roboczego narzędzia Connect jest gotowy do interakcji.
Uwaga
Platforma Kafka Połączenie używa interfejsu API platformy Kafka Administracja Client do automatycznego tworzenia tematów z zalecanymi konfiguracjami, w tym kompaktowaniem. Z szybkiego sprawdzenia przestrzeni nazw w witrynie Azure Portal wynika, że tematy wewnętrzne procesu roboczego narzędzia Connect zostały utworzone automatycznie.
Platforma Kafka Połączenie tematy wewnętrzne muszą używać kompaktowania. Zespół usługi Event Hubs nie jest odpowiedzialny za naprawianie nieprawidłowych konfiguracji, jeśli wewnętrzne Połączenie tematy są niepoprawnie skonfigurowane.
Tworzenie łączników
Ta sekcja przeprowadzi Cię przez proces tworzenia łączników FileStreamSource i FileStreamSink.
Utwórz katalog dla plików danych wejściowych i wyjściowych.
mkdir ~/connect-quickstart
Utwórz dwa pliki: plik z danymi inicjatora, które odczytuje łącznik FileStreamSource, i plik, w którym zapisuje dane łącznik FileStreamSink.
seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Utwórz łącznik FileStreamSource. Zamień wartości w nawiasach klamrowych na ścieżkę do katalogu głównego.
curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
Centrum zdarzeń
connect-quickstart
powinno zostać wyświetlone w wystąpieniu usługi Event Hubs po uruchomieniu powyższego polecenia.Sprawdź stan łącznika źródła.
curl -s http://localhost:8083/connectors/file-source/status
Opcjonalnie możesz użyć Eksploratora usługi Service Bus, aby sprawdzić, czy zdarzenia zostały dostarczone do tematu
connect-quickstart
.Utwórz łącznik FileStreamSink. Ponownie zamień wartości w nawiasach klamrowych na ścieżkę do katalogu głównego.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Sprawdź stan łącznika ujścia.
curl -s http://localhost:8083/connectors/file-sink/status
Sprawdź, czy dane zostały zreplikowane między plikami oraz czy dane są takie same w obu plikach.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Czyszczenie
Platforma Kafka Połączenie tworzy tematy usługi Event Hubs w celu przechowywania konfiguracji, przesunięć i stanu, które utrzymują się nawet po usunięciu klastra Połączenie. Jeśli ta trwałość nie jest wymagana, zaleca się usunięcie tych tematów. Możesz również usunąć usługę connect-quickstart
Event Hubs, która została utworzona podczas tego przewodnika.
Następne kroki
Aby dowiedzieć się więcej o usłudze Event Hubs dla platformy Kafka, zobacz następujące artykuły:
- Dublowanie brokera platformy Kafka w centrum zdarzeń
- Połączenie platformy Apache Spark do centrum zdarzeń
- Połączenie apache Flink do centrum zdarzeń
- Eksplorowanie przykładów w witrynie GitHub
- Połączenie Strumienie Akka do centrum zdarzeń
- Przewodnik dla deweloperów platformy Apache Kafka dotyczący usługi Azure Event Hubs