Integrowanie obsługi narzędzia Apache Kafka Connect w usłudze Azure Event Hubs

Apache Kafka Połączenie to struktura umożliwiająca łączenie i importowanie/eksportowanie danych z/do dowolnego systemu zewnętrznego, takiego jak MySQL, HDFS i system plików za pośrednictwem klastra Platformy Kafka. Ten samouczek przeprowadzi Cię przez proces korzystania z platformy Kafka Połączenie z usługą Event Hubs.

Ten samouczek przeprowadzi Cię przez proces integrowania platformy Kafka Połączenie z centrum zdarzeń i wdrażania podstawowych łączników FileStreamSource i FileStreamSink. Chociaż te łączniki nie są przeznaczone do użytku produkcyjnego, przedstawiają kompleksowe scenariusze platformy Kafka Połączenie, w którym usługa Azure Event Hubs działa jako broker platformy Kafka.

Uwaga

Ten przykład jest dostępny w witrynie GitHub.

W tym samouczku wykonasz następujące kroki:

  • Tworzenie przestrzeni nazw usługi Event Hubs
  • Klonowanie projektu przykładowego
  • Konfigurowanie narzędzia Kafka Connect dla usługi Event Hubs
  • Uruchamianie narzędzia Kafka Connect
  • Tworzenie łączników

Wymagania wstępne

Aby ukończyć ten przewodnik, upewnij się, że dysponujesz następującymi elementami:

Tworzenie przestrzeni nazw usługi Event Hubs

Przestrzeń nazw usługi Event Hubs jest wymagana do wysyłania i odbierania zdarzeń z dowolnej usługi Event Hubs. Aby uzyskać instrukcje dotyczące tworzenia przestrzeni nazw i centrum zdarzeń, zobacz Tworzenie centrum zdarzeń. Pobierz parametry połączenia usługi Event Hubs i w pełni kwalifikowaną nazwę domeny (FQDN) w celu późniejszego użycia. Aby uzyskać instrukcje, zobacz Get an Event Hubs connection string (Pobieranie parametrów połączenia usługi Event Hubs).

Klonowanie projektu przykładowego

Sklonuj repozytorium usługi Azure Event Hubs i przejdź do podfolderu tutorials/connect:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Konfigurowanie narzędzia Kafka Connect dla usługi Event Hubs

Przekierowywanie przepływności narzędzia Kafka Connect z platformy Kafka do usługi Event Hubs wymaga minimalnej rekonfiguracji. W poniższym przykładowym pliku connect-distributed.properties pokazano, jak skonfigurować narzędzie Connect do uwierzytelnienia i komunikowania się z punktem końcowym platformy Kafka w usłudze Event Hubs:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Ważne

Zastąp {YOUR.EVENTHUBS.CONNECTION.STRING} element parametry połączenia przestrzeni nazw usługi Event Hubs. Aby uzyskać instrukcje dotyczące uzyskiwania parametry połączenia, zobacz Pobieranie parametry połączenia usługi Event Hubs. Oto przykładowa konfiguracja: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Uruchamianie narzędzia Kafka Connect

W tym kroku proces roboczy narzędzia Kafka Connect został uruchomiony lokalnie w trybie rozproszonym przy użyciu usługi Event Hubs w celu zachowania stanu klastra.

  1. Zapisz powyższy plik connect-distributed.properties lokalnie. Zamień wszystkie wartości w nawiasach klamrowych.
  2. Przejdź do lokalizacji platformy Kafka w maszynie.
  3. Uruchom program ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. Pojawienie się tekstu 'INFO Finished starting connectors and tasks' oznacza, że interfejs API REST procesu roboczego narzędzia Connect jest gotowy do interakcji.

Uwaga

Platforma Kafka Połączenie używa interfejsu API platformy Kafka Administracja Client do automatycznego tworzenia tematów z zalecanymi konfiguracjami, w tym kompaktowaniem. Z szybkiego sprawdzenia przestrzeni nazw w witrynie Azure Portal wynika, że tematy wewnętrzne procesu roboczego narzędzia Connect zostały utworzone automatycznie.

Platforma Kafka Połączenie tematy wewnętrzne muszą używać kompaktowania. Zespół usługi Event Hubs nie jest odpowiedzialny za naprawianie nieprawidłowych konfiguracji, jeśli wewnętrzne Połączenie tematy są niepoprawnie skonfigurowane.

Tworzenie łączników

Ta sekcja przeprowadzi Cię przez proces tworzenia łączników FileStreamSource i FileStreamSink.

  1. Utwórz katalog dla plików danych wejściowych i wyjściowych.

    mkdir ~/connect-quickstart
    
  2. Utwórz dwa pliki: plik z danymi inicjatora, które odczytuje łącznik FileStreamSource, i plik, w którym zapisuje dane łącznik FileStreamSink.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Utwórz łącznik FileStreamSource. Zamień wartości w nawiasach klamrowych na ścieżkę do katalogu głównego.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    Centrum zdarzeń connect-quickstart powinno zostać wyświetlone w wystąpieniu usługi Event Hubs po uruchomieniu powyższego polecenia.

  4. Sprawdź stan łącznika źródła.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Opcjonalnie możesz użyć Eksploratora usługi Service Bus, aby sprawdzić, czy zdarzenia zostały dostarczone do tematu connect-quickstart.

  5. Utwórz łącznik FileStreamSink. Ponownie zamień wartości w nawiasach klamrowych na ścieżkę do katalogu głównego.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Sprawdź stan łącznika ujścia.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Sprawdź, czy dane zostały zreplikowane między plikami oraz czy dane są takie same w obu plikach.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Czyszczenie

Platforma Kafka Połączenie tworzy tematy usługi Event Hubs w celu przechowywania konfiguracji, przesunięć i stanu, które utrzymują się nawet po usunięciu klastra Połączenie. Jeśli ta trwałość nie jest wymagana, zaleca się usunięcie tych tematów. Możesz również usunąć usługę connect-quickstart Event Hubs, która została utworzona podczas tego przewodnika.

Następne kroki

Aby dowiedzieć się więcej o usłudze Event Hubs dla platformy Kafka, zobacz następujące artykuły: