Partilhar via


Eventos de troca entre consumidores e produtores que usam protocolos diferentes: AMQP, Kafka e HTTPS

Os Hubs de Eventos do Azure dão suporte a três protocolos para consumidores e produtores: AMQP, Kafka e HTTPS. Cada um desses protocolos tem sua própria maneira de representar uma mensagem, então naturalmente surge a seguinte pergunta: se um aplicativo envia eventos para um Hub de Eventos com um protocolo e os consome com um protocolo diferente, como ficam as várias partes e valores do evento quando chegam ao consumidor? Este artigo discute as práticas recomendadas para produtor e consumidor para garantir que os valores dentro de um evento sejam interpretados corretamente pelo aplicativo consumidor.

O conselho neste artigo abrange especificamente esses clientes, com as versões listadas usadas no desenvolvimento dos trechos de código:

Outros clientes AMQP podem se comportar de forma ligeiramente diferente. O AMQP tem um sistema de tipos bem definido, mas as especificidades da serialização de tipos específicos de linguagem de e para esse sistema de tipos dependem do cliente, assim como a forma como o cliente fornece acesso às partes de uma mensagem AMQP.

Corpo do Evento

Todos os clientes Microsoft AMQP representam o corpo do evento como um pacote não interpretado de bytes. Um aplicativo de produção passa uma sequência de bytes para o cliente, e um aplicativo consumidor recebe essa mesma sequência do cliente. A interpretação da sequência de bytes acontece dentro do código do aplicativo.

Ao enviar um evento via HTTPS, o corpo do evento é o conteúdo POSTed, que também é tratado como bytes não interpretados. É fácil alcançar o mesmo estado em um produtor ou consumidor Kafka usando o ByteArraySerializer e ByteArrayDeserializer fornecidos, conforme mostrado no código a seguir:

Kafka byte[] produtor

final Properties properties = new Properties();
// add other properties
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(properties);

final byte[] eventBody = new byte[] { 0x01, 0x02, 0x03, 0x04 };
ProducerRecord<byte[], byte[]> pr =
    new ProducerRecord<byte[], byte[]>(myTopic, myPartitionId, myTimeStamp, eventBody);

Kafka byte[] consumidor

final Properties properties = new Properties();
// add other properties
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(properties);

ConsumerRecord<byte[], byte[]> cr = /* receive event */
// cr.value() is a byte[] with values { 0x01, 0x02, 0x03, 0x04 }

Esse código cria um pipeline de bytes transparente entre as duas metades do aplicativo e permite que o desenvolvedor do aplicativo serialize e desserialize manualmente de qualquer maneira desejada, incluindo a tomada de decisões de desserialização em tempo de execução, por exemplo, com base em informações de tipo ou remetente em propriedades definidas pelo usuário no evento.

Os aplicativos que têm um único tipo de corpo de evento fixo podem ser capazes de usar outros serializadores Kafka e desserializadores para converter dados de forma transparente. Por exemplo, considere um aplicativo que usa JSON. A construção e interpretação da string JSON acontece no nível do aplicativo. No nível dos Hubs de Eventos, o corpo do evento é sempre uma cadeia de caracteres, uma sequência de bytes que representa caracteres na codificação UTF-8. Nesse caso, o produtor ou consumidor de Kafka pode aproveitar o StringSerializer ou StringDeserializer fornecido, conforme mostrado no código a seguir:

Kafka UTF-8 produtor de cordas

final Properties properties = new Properties();
// add other properties
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

final KafkaProducer<Long, String> producer = new KafkaProducer<Long, String>(properties);

final String exampleJson = "{\"name\":\"John\", \"number\":9001}";
ProducerRecord<Long, String> pr =
    new ProducerRecord<Long, String>(myTopic, myPartitionId, myTimeStamp, exampleJson);

Kafka UTF-8 consumidor de corda

final Properties properties = new Properties();
// add other properties
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

final KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(properties);

ConsumerRecord<Long, Bytes> cr = /* receive event */
final String receivedJson = cr.value();

Para o lado AMQP, Java e .NET fornecem maneiras internas de converter cadeias de caracteres de e para sequências de bytes UTF-8. Os clientes Microsoft AMQP representam eventos como uma classe chamada EventData. Os exemplos a seguir mostram como serializar uma cadeia de caracteres UTF-8 em um corpo de evento EventData em um produtor AMQP e como desserializar um corpo de evento EventData em uma cadeia de caracteres UTF-8 em um consumidor AMQP.

Produtor de string Java AMQP UTF-8

final String exampleJson = "{\"name\":\"John\", \"number\":9001}";
final EventData ed = EventData.create(exampleJson.getBytes(StandardCharsets.UTF_8));

Consumidor de string Java AMQP UTF-8

EventData ed = /* receive event */
String receivedJson = new String(ed.getBytes(), StandardCharsets.UTF_8);

Produtor de string C# .NET UTF-8

string exampleJson = "{\"name\":\"John\", \"number\":9001}";
EventData working = new EventData(Encoding.UTF8.GetBytes(exampleJson));

C# .NET UTF-8 consumidor de cadeia de caracteres

EventData ed = /* receive event */

// getting the event body bytes depends on which .NET client is used
byte[] bodyBytes = ed.Body.Array;  // Microsoft Azure Event Hubs Client for .NET
// byte[] bodyBytes = ed.GetBytes(); // Microsoft Azure Service Bus

string receivedJson = Encoding.UTF8.GetString(bodyBytes);

Como o Kafka é de código aberto, o desenvolvedor de aplicativos pode inspecionar a implementação de qualquer serializador ou desserializador e implementar código, que produz ou consome uma sequência compatível de bytes no lado AMQP.

Propriedades do usuário do evento

As propriedades do conjunto de usuários podem ser definidas e recuperadas de clientes AMQP (nos clientes Microsoft AMQP são chamadas de propriedades) e Kafka (onde são chamados de cabeçalhos). Os remetentes HTTPS podem definir as propriedades do usuário em um evento fornecendo-as como cabeçalhos HTTP na operação POST. No entanto, Kafka trata os corpos de eventos e os valores de cabeçalho de eventos como sequências de bytes. Enquanto nos clientes AMQP, os valores de propriedade têm tipos, que são comunicados codificando os valores de propriedade de acordo com o sistema de tipos AMQP.

HTTPS é um caso especial. No ponto de envio, todos os valores de propriedade são texto UTF-8. O serviço Hubs de Eventos faz uma quantidade limitada de interpretação para converter valores de propriedade apropriados em inteiros assinados de 32 bits e 64 bits codificados por AMQP, números de ponto flutuante de 64 bits e booleanos. Qualquer valor de propriedade, que não se encaixe em um desses tipos é tratado como uma cadeia de caracteres.

Misturar essas abordagens para a digitação de propriedades significa que um consumidor Kafka vê a sequência de bytes codificada em AMQP bruto, incluindo as informações de tipo AMQP. Enquanto um consumidor AMQP vê a sequência de bytes não tipados enviada pelo produtor de Kafka, que o aplicativo deve interpretar.

Para consumidores Kafka que recebem propriedades de produtores AMQP ou HTTPS, use a classe AmqpDeserializer, que é modelada de acordo com os outros desserializadores no ecossistema Kafka. Ele interpreta as informações de tipo nas sequências de bytes codificadas em AMQP para desserializar os bytes de dados em um tipo Java.

Como prática recomendada, recomendamos que você inclua uma propriedade em mensagens enviadas via AMQP ou HTTPS. O consumidor Kafka pode usá-lo para determinar se os valores de cabeçalho precisam de desserialização AMQP. O valor do imóvel não é importante. Ele só precisa de um nome bem conhecido que o consumidor Kafka pode encontrar na lista de cabeçalhos e ajustar seu comportamento de acordo.

Nota

O serviço Hubs de Eventos converte nativamente algumas das propriedades AmqpMessage específicas do EventHubs em cabeçalhos de registro do Kafka como cadeias de caracteres. Kafka message header é uma lista de chaves, pares de valores> onde chave é string e value é sempre uma matriz de <bytes. Para essas propriedades suportadas, a matriz de bytes terá uma cadeia de caracteres codificada UTF8.

Aqui está a lista de propriedades imutáveis que os Hubs de Eventos suportam nesta conversão hoje. Se você definir valores para propriedades de usuário com os nomes nesta lista, não precisará desserializar no lado do consumidor Kafka.

  • ID da mensagem
  • ID de utilizador
  • para
  • resposta-a
  • tipo de conteúdo
  • codificação de conteúdo
  • tempo de criação

AMQP para Kafka parte 1: criar e enviar um evento em C# (.NET) com propriedades

// Create an event with properties "MyStringProperty" and "MyIntegerProperty"
EventData working = new EventData(Encoding.UTF8.GetBytes("an event body"));
working.Properties.Add("MyStringProperty", "hello");
working.Properties.Add("MyIntegerProperty", 1234);

// BEST PRACTICE: include a property which indicates that properties will need AMQP deserialization
working.Properties.Add("AMQPheaders", 0);

AMQP para Kafka parte 2: use AmqpDeserializer para desserializar essas propriedades em um consumidor Kafka

final AmqpDeserializer amqpDeser = new AmqpDeserializer();

ConsumerRecord<Long, Bytes> cr = /* receive event */
final Header[] headers = cr.headers().toArray();

final Header headerNamedMyStringProperty = /* find header with key "MyStringProperty" */
final Header headerNamedMyIntegerProperty = /* find header with key "MyIntegerProperty" */
final Header headerNamedAMQPheaders = /* find header with key "AMQPheaders", or null if not found */

// BEST PRACTICE: detect whether AMQP deserialization is needed
if (headerNamedAMQPheaders != null) {
    // The deserialize() method requires no prior knowledge of a property's type.
    // It returns Object and the application can check the type and perform a cast later.
    Object propertyOfUnknownType = amqpDeser.deserialize("topicname", headerNamedMyStringProperty.value());
    if (propertyOfUnknownType instanceof String) {
        final String propertyString = (String)propertyOfUnknownType;
        // do work here
    }
    propertyOfUnknownType = amqpDeser.deserialize("topicname", headerNamedMyIntegerProperty.value());
    if (propertyOfUnknownType instanceof Integer) {
        final Integer propertyInt = (Integer)propertyOfUnknownType;
        // do work here
    }
} else {
    /* event sent via Kafka, interpret header values the Kafka way */
}

Se o aplicativo souber o tipo esperado para uma propriedade, há métodos de desserialização que não exigem uma conversão posterior, mas geram um erro se a propriedade não for do tipo esperado.

AMQP para Kafka parte 3: uma maneira diferente de usar AmqpDeserializer em um consumidor Kafka

// BEST PRACTICE: detect whether AMQP deserialization is needed
if (headerNamedAMQPheaders != null) {
    // Property "MyStringProperty" is expected to be of type string.
    try {
        final String propertyString = amqpDeser.deserializeString(headerNamedMyStringProperty.value());
        // do work here
    }
    catch (IllegalArgumentException e) {
        // property was not a string
    }

    // Property "MyIntegerProperty" is expected to be a signed integer type.
    // The method returns long because long can represent the value range of all AMQP signed integer types.
    try {
        final long propertyLong = amqpDeser.deserializeSignedInteger(headerNamedMyIntegerProperty.value());
        // do work here
    }
    catch (IllegalArgumentException e) {
        // property was not a signed integer
    }
} else {
    /* event sent via Kafka, interpret header values the Kafka way */
}

Ir na outra direção é mais envolvido, porque os cabeçalhos definidos por um produtor Kafka são sempre vistos por um consumidor AMQP como bytes brutos (digite org.apache.qpid.proton.amqp.Binary para o Cliente de Hubs de Eventos do Microsoft Azure para Java ou System.Byte[] para clientes .NET AMQP da Microsoft). O caminho mais fácil é usar um dos serializadores fornecidos pelo Kafka para gerar os bytes para os valores de cabeçalho no lado do produtor Kafka e, em seguida, escrever um código de desserialização compatível no lado do consumidor AMQP.

Tal como acontece com AMQP-to-Kafka, a melhor prática que recomendamos é incluir uma propriedade em mensagens enviadas via Kafka. O consumidor AMQP pode usar a propriedade para determinar se os valores de cabeçalho precisam de desserialização. O valor do imóvel não é importante. Ele só precisa de um nome bem conhecido que o consumidor AMQP pode encontrar na lista de cabeçalhos e ajustar seu comportamento de acordo. Se o produtor Kafka não puder ser alterado, também é possível para o aplicativo consumidor verificar se o valor da propriedade é de um tipo binário ou byte e tentar a desserialização com base no tipo.

Kafka para AMQP parte 1: criar e enviar um evento de Kafka com propriedades

final String topicName = /* topic name */
final ProducerRecord<Long, String> pr = new ProducerRecord<Long, String>(topicName, /* other arguments */);
final Headers h = pr.headers();

// Set headers using Kafka serializers
IntegerSerializer intSer = new IntegerSerializer();
h.add("MyIntegerProperty", intSer.serialize(topicName, 1234));

LongSerializer longSer = new LongSerializer();
h.add("MyLongProperty", longSer.serialize(topicName, 5555555555L));

ShortSerializer shortSer = new ShortSerializer();
h.add("MyShortProperty", shortSer.serialize(topicName, (short)22222));

FloatSerializer floatSer = new FloatSerializer();
h.add("MyFloatProperty", floatSer.serialize(topicName, 1.125F));

DoubleSerializer doubleSer = new DoubleSerializer();
h.add("MyDoubleProperty", doubleSer.serialize(topicName, Double.MAX_VALUE));

StringSerializer stringSer = new StringSerializer();
h.add("MyStringProperty", stringSer.serialize(topicName, "hello world"));

// BEST PRACTICE: include a property which indicates that properties will need deserialization
h.add("RawHeaders", intSer.serialize(0));

Kafka para AMQP parte 2: desserializar manualmente essas propriedades em C# (.NET)

EventData ed = /* receive event */

// BEST PRACTICE: detect whether manual deserialization is needed
if (ed.Properties.ContainsKey("RawHeaders"))
{
    // Kafka serializers send bytes in big-endian order, whereas .NET on x86/x64 is little-endian.
    // Therefore it is frequently necessary to reverse the bytes before further deserialization.

    byte[] rawbytes = ed.Properties["MyIntegerProperty"] as System.Byte[];
    if (BitConverter.IsLittleEndian)
    {
            Array.Reverse(rawbytes);
    }
    int myIntegerProperty = BitConverter.ToInt32(rawbytes, 0);

    rawbytes = ed.Properties["MyLongProperty"] as System.Byte[];
    if (BitConverter.IsLittleEndian)
    {
            Array.Reverse(rawbytes);
    }
    long myLongProperty = BitConverter.ToInt64(rawbytes, 0);

    rawbytes = ed.Properties["MyShortProperty"] as System.Byte[];
    if (BitConverter.IsLittleEndian)
    {
            Array.Reverse(rawbytes);
    }
    short myShortProperty = BitConverter.ToInt16(rawbytes, 0);

    rawbytes = ed.Properties["MyFloatProperty"] as System.Byte[];
    if (BitConverter.IsLittleEndian)
    {
            Array.Reverse(rawbytes);
    }
    float myFloatProperty = BitConverter.ToSingle(rawbytes, 0);

    rawbytes = ed.Properties["MyDoubleProperty"] as System.Byte[];
    if (BitConverter.IsLittleEndian)
    {
            Array.Reverse(rawbytes);
    }
    double myDoubleProperty = BitConverter.ToDouble(rawbytes, 0);

    rawbytes = ed.Properties["MyStringProperty"] as System.Byte[];
string myStringProperty = Encoding.UTF8.GetString(rawbytes);
}

Kafka para AMQP parte 3: desserializar manualmente essas propriedades em Java

final EventData ed = /* receive event */

// BEST PRACTICE: detect whether manual deserialization is needed
if (ed.getProperties().containsKey("RawHeaders")) {
    byte[] rawbytes =
        ((org.apache.qpid.proton.amqp.Binary)ed.getProperties().get("MyIntegerProperty")).getArray();
    int myIntegerProperty = 0;
    for (byte b : rawbytes) {
        myIntegerProperty <<= 8;
        myIntegerProperty |= ((int)b & 0x00FF);
    }

    rawbytes = ((org.apache.qpid.proton.amqp.Binary)ed.getProperties().get("MyLongProperty")).getArray();
    long myLongProperty = 0;
    for (byte b : rawbytes) {
        myLongProperty <<= 8;
        myLongProperty |= ((long)b & 0x00FF);
    }

    rawbytes = ((org.apache.qpid.proton.amqp.Binary)ed.getProperties().get("MyShortProperty")).getArray();
    short myShortProperty = (short)rawbytes[0];
    myShortProperty <<= 8;
    myShortProperty |= ((short)rawbytes[1] & 0x00FF);

    rawbytes = ((org.apache.qpid.proton.amqp.Binary)ed.getProperties().get("MyFloatProperty")).getArray();
    int intbits = 0;
    for (byte b : rawbytes) {
        intbits <<= 8;
        intbits |= ((int)b & 0x00FF);
    }
    float myFloatProperty = Float.intBitsToFloat(intbits);

    rawbytes = ((org.apache.qpid.proton.amqp.Binary)ed.getProperties().get("MyDoubleProperty")).getArray();
    long longbits = 0;
    for (byte b : rawbytes) {
        longbits <<= 8;
        longbits |= ((long)b & 0x00FF);
    }
    double myDoubleProperty = Double.longBitsToDouble(longbits);

    rawbytes = ((org.apache.qpid.proton.amqp.Binary)ed.getProperties().get("MyStringProperty")).getArray();
String myStringProperty = new String(rawbytes, StandardCharsets.UTF_8);
}

Próximos passos

Neste artigo, você aprendeu como transmitir para Hubs de Eventos sem alterar seus clientes de protocolo ou executar seus próprios clusters. Para saber mais sobre Hubs de Eventos e Hubs de Eventos para Kafka, consulte os seguintes artigos: