Integrar o suporte ao Apache Kafka Connect nos Hubs de Eventos do Azure

O Apache Kafka Connect é uma estrutura para conectar e importar/exportar dados de ou para qualquer sistema externo, como o MySQL, o HDFS e o sistema de arquivos, por meio de um cluster do Kafka. Este tutorial explica como usar a estrutura do Kafka Connect com os Hubs de Eventos.

Este tutorial percorre a integração do Kafka Connect com um hub de eventos e a implantação de conectores FileStreamSource e FileStreamSink básicos. Embora esses conectores não sirvam para uso em produção, eles demonstram um cenário Kafka Connect de ponta a ponta em que os Hubs de Eventos do Azure atuam como um agente do Kafka.

Observação

Este exemplo está disponível em GitHub.

Neste tutorial, você deve executar as seguintes etapas:

  • Criar um namespace dos Hubs de Eventos
  • Clonar o projeto de exemplo
  • Configurar o Kafka Connect para os Hubs de Eventos
  • Executar o Kafka Connect
  • Criar conectores

Pré-requisitos

Para concluir este passo a passo, é necessário atender aos seguintes pré-requisitos:

Criar um namespace dos Hubs de Eventos

É necessário um namespace do Hubs de Eventos para enviar e receber de qualquer serviço de Hub de Eventos. Para obter instruções sobre como criar um namespace e um hub de eventos, confira Criar um hub de eventos. Obtenha a cadeia de conexão dos Hubs de Eventos e o FQDN (nome de domínio totalmente qualificado) para uso posterior. Para obter instruções, confira Obter uma cadeia de conexão dos Hubs de Eventos.

Clonar o projeto de exemplo

Clone o repositório dos Hubs de Eventos do Azure e navegue até a subpasta tutorials/connect:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Configurar o Kafka Connect para os Hubs de Eventos

É necessário um mínimo de reconfiguração para redirecionar a taxa de transferência do Kafka Connect para os Hubs de Eventos. O exemplo connect-distributed.properties abaixo ilustra como configurar o Connect para fazer a autenticação e se comunicar com o ponto de extremidade do Kafka nos Hubs de Eventos:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Importante

Substitua {YOUR.EVENTHUBS.CONNECTION.STRING} pela cadeia de conexão do seu namespace dos Hubs de Eventos. Para ver as instruções sobre como obter uma cadeia de conexão, confira Obter cadeia de conexão para Hubs de Eventos. Aqui está um exemplo de configuração: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Executar o Kafka Connect

Nesta etapa, um trabalho do Kafka Connect é iniciado localmente em modo distribuído, usando os Hubs de Eventos para manter o estado do cluster.

  1. Salve o arquivo acima connect-distributed.properties localmente. Substitua todos os valores entre chaves.
  2. Navegue até o local da versão do Kafka em seu computador.
  3. Execute ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. A API REST do trabalho do Connect estará pronta para interação quando você vir 'INFO Finished starting connectors and tasks'.

Observação

O Kafka Connect usa a API AdminClient do Kafka para criar automaticamente tópicos com configurações recomendadas, incluindo a compactação. Uma verificação rápida do namespace no portal do Azure revela que os tópicos internos do trabalho do Connect foram criados automaticamente.

Os tópicos internos do Kafka Connect precisam usar a compactação. A equipe dos Hubs de Eventos não será responsável por corrigir configurações inadequadas se os tópicos internos do Connect estiverem configurados incorretamente.

Criar conectores

Esta seção percorre com você a configuração de conectores FileStreamSource e FileStreamSink.

  1. Crie um diretório para os arquivos de dados de entrada e saída.

    mkdir ~/connect-quickstart
    
  2. Crie dois arquivos: um arquivo com dados semente que é lido pelo conector FileStreamSource e outro no qual o conector FileStreamSink grava.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Crie um conector FileStreamSource. Substitua as chaves pelo caminho do diretório base.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    Você deve ver o hub de eventos connect-quickstart em sua instância dos Hubs de Eventos depois de executar o comando acima.

  4. Verifique o status do conector de origem.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Opcionalmente, você pode usar o Gerenciador do Barramento de Serviço para verificar se os eventos chegaram no tópico connect-quickstart.

  5. Crie um conector FileStreamSink. Mais uma vez, substitua as chaves pelo caminho do diretório base.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Verifique o status do conector do coletor.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Verifique se dados foram replicados entre os arquivos e se os dados são idênticos nos dois arquivos.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Limpeza

O Kafka Connect cria tópicos de Hubs de Eventos para armazenar status, deslocamentos e configurações que persistem mesmo depois que o cluster do Connect é desativado. A menos que essa persistência seja desejada, é recomendável que esses tópicos sejam excluídos. Talvez seja ideal também excluir os Hubs de Eventos connect-quickstart que foram criados no decorrer deste passo a passo.

Próximas etapas

Para saber mais sobre os Hubs de Eventos para o Kafka, confira os artigos a seguir: