Integrar o suporte do Apache Kafka Connect nos Hubs de Eventos do Azure

Apache Kafka Connect é uma estrutura para conectar e importar / exportar dados de / para qualquer sistema externo, como MySQL, HDFS e sistema de arquivos através de um cluster Kafka. Este tutorial orienta você pelo uso da estrutura Kafka Connect com Hubs de Eventos.

Este tutorial orienta você na integração do Kafka Connect com um hub de eventos e na implantação de conectores básicos FileStreamSource e FileStreamSink. Embora esses conectores não se destinem ao uso em produção, eles demonstram um cenário Kafka Connect de ponta a ponta em que os Hubs de Eventos do Azure atuam como um broker Kafka.

Nota

Este exemplo está disponível no GitHub.

Neste tutorial, siga os seguintes passos:

  • Criar um espaço de nomes dos Hubs de Eventos
  • Clonar o projeto de exemplo
  • Configurar o Kafka Connect para os Hubs de Eventos
  • Executar o Kafka Connect
  • Criar conectores

Pré-requisitos

Para concluir esta orientação, confirme que tem os seguintes pré-requisitos:

Criar um espaço de nomes dos Hubs de Eventos

É necessário um espaço de nomes dos Hubs de Eventos para enviar e receber a partir de qualquer serviços dos Hubs de Eventos. Consulte Criando um hub de eventos para obter instruções sobre como criar um namespace e um hub de eventos. Obtenha a cadeia de ligação dos Hubs de Eventos e o nome de domínio completamente qualificado (FQDN), para utilizar mais tarde. Para obter instruções, veja Get an Event Hubs connection string (Obter uma cadeia de ligação dos Hubs de Eventos).

Clonar o projeto de exemplo

Clone o repositório dos Hubs de Eventos do Azure e navegue para a subpasta tutorials/connect:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Configurar o Kafka Connect para os Hubs de Eventos

Ao redirecionar o débito do Kafka Connect do Kafka para os Hubs de Eventos, é necessário fazer uma reconfiguração mínima. O exemplo seguinte, connect-distributed.properties, ilustra como configurar o Connect para se autenticar e comunicar com o ponto final do Kafka nos Hubs de Eventos:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Importante

Substitua {YOUR.EVENTHUBS.CONNECTION.STRING} pela cadeia de conexão do namespace Hubs de Eventos. Para obter instruções sobre como obter a cadeia de conexão, consulte Obter uma cadeia de conexão de Hubs de Eventos. Aqui está um exemplo de configuração: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Executar o Kafka Connect

Neste passo, é iniciada uma função de trabalho do Kafka Connect localmente no modo distribuído e são utilizados os Hubs de Eventos para manter o estado do cluster.

  1. Guarde o ficheiro anterior, connect-distributed.properties, localmente. Confirme que substitui todos os valores dentro das chavetas.
  2. Navegue para a localização da versão do Kafka no computador.
  3. Execute o ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. Quando vir 'INFO Finished starting connectors and tasks', significa que a API REST da função de trabalho do Connect está pronta para interação.

Nota

O Kafka Connect usa a API Kafka AdminClient para criar automaticamente tópicos com configurações recomendadas, incluindo compactação. Uma rápida observação ao espaço de nomes no portal do Azure mostra que os tópicos internos da função de trabalho do Connect foram criados de forma automática.

Os tópicos internos do Kafka Connect devem usar compactação. A equipe de Hubs de Eventos não é responsável por corrigir configurações inadequadas se os tópicos internos do Connect estiverem configurados incorretamente.

Criar conectores

Esta secção mostra-lhe como criar rapidamente os conectores FileStreamSource e FileStreamSink.

  1. Crie um diretório para os ficheiros de dados de entrada e de saída.

    mkdir ~/connect-quickstart
    
  2. Crie dois ficheiros. Um com dados de entrada a partir do qual o conector FileStreamSource lê e outro no qual o conector FileStreamSink escreve.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Crie um conector FileStreamSource. Confirme que substitui as chavetas pelo caminho do seu diretório de raiz.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    Você deve ver o hub connect-quickstart de eventos em sua instância de Hubs de Eventos depois de executar o comando acima.

  4. Verifique o estado do conector de origem.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Opcionalmente, pode utilizar o Service Bus Explorer para verificar se os eventos chegaram ao tópico connect-quickstart.

  5. Crie um conector FileStreamSink. Mais uma vez, confirme que substitui as chavetas pelo caminho do seu diretório de raiz.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Verifique o estado do conector sink.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Confirme que os dados foram replicados entre os ficheiros e que são idênticos em ambos os ficheiros.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Limpeza

O Kafka Connect cria tópicos de Hubs de Eventos para armazenar configurações, deslocamentos e status que persistem mesmo após o cluster Connect ter sido removido. A menos que essa persistência seja desejada, é recomendável que esses tópicos sejam excluídos. Você também pode excluir os connect-quickstart Hubs de Eventos que foram criados durante este passo a passo.

Próximos passos

Para saber mais sobre os Hubs de Eventos para Kafka, consulte os seguintes artigos: