How do I deserialize the Kafka header sent through Event Hub in Python?

Lord Wolfenstein 46 Reputation points
2023-05-05T14:59:27.1566667+00:00

I have read this https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-exchange-events-different-protocols. I have an existing system using Kafka written in python that worked when using an actual Kafka. I use the kafka-python you install with pip client. Now I have an Azure function in C# that sends events to a event hub something like this:

private static async Task _Send(string license_id, string api_key, string data, string topic, ILogger log)
        {
            EventHubProducerClient producerClient = new EventHubProducerClient(HUB, topic);
            EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
            EventData newEventData = new EventData(data);
            newEventData.Properties.Add("license_id", "123456789");
            newEventData.Properties.Add("api_key", "AAABBBCCC");
            eventBatch.TryAdd(newEventData);
            await producerClient.SendAsync(eventBatch);
        }

and in python I got my consumer going something like this:

def data_and_headers(event: ConsumerRecord) -> tuple[str, dict]:
    data = event.value.decode("utf-8")
    headers = {}
    for k, v in event.headers:
        value = v.replace(b"\xa1\x08", b"").replace(b'\xa1"',b"")
        headers[k] = value.decode(chardet.detect(v)["encoding"])
    return data, headers

kafka_consumer = KafkaConsumer(insert_detalis_here)
kafka_consumer.subscribe(TOPICS)
while True:
    for kafka_event in kafka_consumer:
        try:
            data, headers = data_and_headers(kafka_event)
            print("kafka_event.headers =", kafka_event.headers)
            if authentication(headers):
                print("AUTHORIZED!!!")
            else:
                print("Unauthorized event.")

If I don't do the replace() the raw header look like this with unwanted bytes in front of the values

[('license_id', b'\xa1\x08123456789'), ('api_key', b'\xa1"AAABBBCCC')]

The function data_and_headers() makes it work for now but there must be a better way to solve this.

So how do I deserialize Event Hub properties to Kafka headers in a proper way using Python?

Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
720 questions
{count} votes

1 answer

Sort by: Most helpful
  1. PRADEEPCHEEKATLA 90,651 Reputation points Moderator
    2023-05-08T09:24:29.57+00:00

    @Lord Wolfenstein - Thanks for the question and using MS Q&A platform.

    To deserialize Event Hub properties to Kafka headers in Python, you can use the azure-eventhub library. This library provides a EventData class that you can use to create an Event Hub event with properties and headers.

    Here's an example of how you can use the azure-eventhub library to create an Event Hub event with properties and headers in Python:

    from azure.eventhub import EventData
    
    # Create an Event Hub event with properties and headers
    event_data = EventData(b"Hello, world!")
    event_data.properties = {"license_id": "123456789", "api_key": "AAABBBCCC"}
    event_data.header = {"license_id": "123456789", "api_key": "AAABBBCCC"}
    
    # Send the event to the Event Hub
    producer.send(event_data)
    
    

    In this example, we create an EventData object and set its properties and header attributes to a dictionary of key-value pairs. The properties attribute is used to set the properties of the Event Hub event, while the header attribute is used to set the headers of the Kafka message.

    To deserialize the Kafka header sent through Event Hub in Python, you can use the headers attribute of the ConsumerRecord object that you receive from the Kafka consumer. Here's an example of how you can deserialize the Kafka header in Python:

    def data_and_headers(event: ConsumerRecord) -> tuple[str, dict]:
        data = event.value.decode("utf-8")
        headers = {}
        for k, v in event.headers:
            headers[k.decode("utf-8")] = v.decode("utf-8")
        return data, headers
    
    kafka_consumer = KafkaConsumer(insert_detalis_here)
    kafka_consumer.subscribe(TOPICS)
    while True:
        for kafka_event in kafka_consumer:
            try:
                data, headers = data_and_headers(kafka_event)
                print("kafka_event.headers =", headers)
                if authentication(headers):
                    print("AUTHORIZED!!!")
                else:
                    print("Unauthorized event.")
            except Exception as e:
                print("Error:", e)
    

    In this example, we define a data_and_headers function that takes a ConsumerRecord object and returns a tuple of the data and headers. We then iterate over the headers attribute of the ConsumerRecord object.

    Hope this helps. Do let us know if you any further queries.

    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.