Sdílet prostřednictvím


Integrace podpory připojení Apache Kafka ve službě Azure Event Hubs

Apache Kafka Connect je architektura pro připojení a import/export dat z/do libovolného externího systému, jako je MySQL, HDFS a systém souborů prostřednictvím clusteru Kafka. Tento článek vás provede používáním architektury Kafka Connect se službou Event Hubs.

Tento článek vás provede integrací služby Kafka Connect s centrem událostí a nasazením základních FileStreamSource konektorů a FileStreamSink konektorů. I když tyto konektory nejsou určené pro produkční použití, předvádějí kompletní scénář připojení Kafka, ve kterém služba Azure Event Hubs funguje jako zprostředkovatel Kafka.

Poznámka:

Tato ukázka je k dispozici na GitHubu.

Požadavky

Abyste mohli dokončit tento návod, ujistěte se, že máte následující požadavky:

Vytvořte obor názvů služby Event Hubs

K odesílání a příjmu z jakékoli služby Event Hubs je vyžadován obor názvů Event Hubs. Pokyny pro vytvoření oboru názvů a centra událostí najdete v části Vytvoření centra událostí. Získejte připojovací řetězec služby Event Hubs a plně kvalifikovaný název domény (FQDN) pro pozdější použití. Pokyny najdete v tématu Získání připojovacího řetězce služby Event Hubs.

Naklonování ukázkového projektu

Naklonujte úložiště Azure Event Hubs a přejděte do podsložky tutorials/connect:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Konfigurace připojení Kafka pro službu Event Hubs

Přesměrování propustnosti připojení Kafka z Kafka do služby Event Hubs vyžaduje minimální konfiguraci. Následující ukázka connect-distributed.properties znázorňuje, jak nakonfigurovat Connect pro ověřování a komunikaci s koncovým bodem Kafka ve službě Event Hubs:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs 

Důležité

Nahraďte {YOUR.EVENTHUBS.CONNECTION.STRING} připojovací řetězec pro váš obor názvů Event Hubs. Pokyny k získání připojovacího řetězce najdete v části Získání připojovacího řetězce služby Event Hubs. Tady je příklad konfigurace: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Spuštění Kafka Connect

V tomto kroku se místně spustí pracovní proces připojení Kafka v distribuovaném režimu a k zachování stavu clusteru s použije služba Event Hubs.

  1. connect-distributed.properties Uložte soubor místně. Nezapomeňte nahradit všechny hodnoty v závorkách.
  2. Na svém počítači přejděte do umístění verze Kafka.
  3. Spusťte ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. Jakmile se zobrazí 'INFO Finished starting connectors and tasks', je REST API pracovníka Connect připraveno k interakci.

Poznámka:

Kafka Connect používá rozhraní Kafka AdminClient API k automatickému vytváření témat s doporučenými konfiguracemi, včetně komprimace. Rychlá kontrola oboru názvů v portálu Azure ukazuje, že interní témata pracovníka Connect byla vytvořena automaticky.

Interní témata Kafka Connect musí používat kompaktování. Tým služby Event Hubs neodpovídá za opravu nesprávných konfigurací, pokud jsou interní témata připojení nesprávně nakonfigurovaná.

Vytvoření konektorů

Tato část vás provede spuštěním FileStreamSource a FileStreamSink konektorů.

  1. Vytvořte adresář pro vstupní a výstupní datové soubory.

    mkdir ~/connect-quickstart
    
  2. Vytvořte dva soubory: jeden soubor s počátečními daty, ze kterých FileStreamSource konektor čte, a druhý soubor, do kterého náš FileStreamSink konektor zapisuje.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Vytvoření konektoru FileStreamSource Nezapomeňte nahradit složené závorky cestou k vašemu domovskému adresáři.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    Po spuštění příkazu byste měli vidět centrum connect-quickstart událostí ve vaší instanci služby Event Hubs.

  4. Zkontrolujte stav konektoru zdroje.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Volitelně můžete pomocí Service Bus Exploreru ověřit, že události dorazí do connect-quickstart tématu.

  5. Vytvořte konektor FileStreamSink. Ujistěte se, že opět nahradíte složené závorky cestou k vašemu domovskému adresáři.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Zkontrolujte stav konektoru jímky.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Ověřte, že se data replikovala mezi soubory a že jsou v obou souborech identická.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Vyčištění

Kafka Connect vytváří témata služby Event Hubs pro ukládání konfigurací, posunů a stavu, které se uchovávají i po ukončení clusteru Connect. Pokud tato trvalost není požadovaná, doporučujeme tato témata odstranit. Můžete také chtít odstranit connect-quickstart Event Hubs, které byly vytvořeny v tomto názorném postupu.

Další informace o službě Event Hubs pro Kafka najdete v následujících článcích: