Hi everyone,
I have a Python program that takes loads of events from an EventHub, does some complex processing and sends results to another Eventhub. We are using the async option of EventHubConsumerClient and EventHubProducerClient to create the client connection.
We have been trying two options to create the connections and we are having some issues with each of them:
First Option:
We create a Producer connection every time we receive an EventBatch from the consumer Eventhub. This option uses all of the events (at least that is out believe) but CPU shoots up and we have issues with memory leakage.
I would be something like
async def on_event_batch(partition_context, event_batch):
producer = EventHubProducerClient.from_connection_string(...)
async with producer:
event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
...
Second Option:
We create a singleton Producer and use it every time to send EventBatches to the target Eventhub.
This option reduces CPU and memory leaks but we lose a considerable amount of Event. A schematic piece of the code.
I would be something like
producer = EventHubProducerClient.from_connection_string(...)
async def on_event_batch(partition_context, event_batch):
event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
...
Second option seems more logical for us but we cannot solve the events lost issue.
Any help is really appreciated.
Best regards
A more complete piece of the code of First Option.
import ...
with open(configFilePath) as json_data_file:
config = json.load(json_data_file)
producer = EventHubProducerClient.from_connection_string(conn_str = config.get('EventHub').get('Producer').get('connection_str')
, eventhub_name = config.get('EventHub').get('Producer').get('eventhub_name'))
async def on_event_batch(partition_context, event_batch):
print(partition_context.partition_id)
# Create a batch
event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
for x in event_batch:
can_event, devicetype = canonizer.main(x.body_as_str())
ED_can_event = EventData(str(can_event))
try:
event_data_batch.add(ED_can_event) # Add event data to the batch.
except Exception as e:
print(e)
try:
await producer.send_batch(event_data_batch)
event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
event_data_batch.add(ED_can_event)
except Exception as e:
print(e)
event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
try:
await producer.send_batch(event_data_batch) # Send batch of events to the event hub.
except Exception as e:
print(e)
pass
await partition_context.update_checkpoint()
async def main(max_batch_size = 250):
checkpoint_store = BlobCheckpointStore.from_connection_string(config.get('connection_str'),config.get('container_name'))
consumer = EventHubConsumerClient.from_connection_string(conn_str= config.get('EventHub').get('Consumer').get('connection_str')
, consumer_group = config.get('EventHub').get('Consumer').get('consumer_group')
, eventhub_name= config.get('EventHub').get('Consumer').get('eventhub_name')
, checkpoint_store=checkpoint_store)
async with consumer,producer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
max_batch_size=max_batch_size,
)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
A more complete piece of the code of Second Option.
import ...
with open(configFilePath) as json_data_file:
config = json.load(json_data_file)
async def on_event_batch(partition_context, event_batch):
producer = EventHubProducerClient.from_connection_string(conn_str = config.get('connection_str')
, eventhub_name = config.get('eventhub_name'))
async with producer:
# Create a batch
event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
for x in event_batch:
can_event, devicetype = canonizer.main(x.body_as_str()) #Primer elemento de la lista es la trama y el segundo el tipo de dispositivo emisor
ED_can_event = EventData(str(can_event))
ED_can_event.properties = x.properties
ED_can_event.properties.update({"DeviceType": devicetype, "Canonized_At": int(time.time())})
try:
event_data_batch.add(ED_can_event)
except Exception as e:
print(e)
try:
await producer.send_batch(event_data_batch)
event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
event_data_batch.add(ED_can_event)
except Exception as e:
print(e)
event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
try:
await producer.send_batch(event_data_batch) # Send batch of events to the event hub.
except Exception as e:
print(e)
pass
await partition_context.update_checkpoint()
async def main(max_batch_size = 250):
checkpoint_store = BlobCheckpointStore.from_connection_string(config.get('Storage').get('connection_str'),config.get('Storage').get('container_name'))
consumer = EventHubConsumerClient.from_connection_string(conn_str= config.get('EventHub').get('Consumer').get('connection_str')
, consumer_group = config.get('EventHub').get('Consumer').get('consumer_group')
, eventhub_name= config.get('EventHub').get('Consumer').get('eventhub_name')
, checkpoint_store=checkpoint_store)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
max_batch_size=max_batch_size)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())