how to send data to eventhub with time trigger

nadine.benharrath 21 Reputation points
2021-05-27T12:34:50.157+00:00

hello everyone i 'm trying to send csv file to eventhub and process it in databricks

here is my code :

async def send_event_data_batch(producer,data):

    event_data_batch = await producer.create_batch()

    event_data_batch.add(EventData(data))

    await producer.send_batch(event_data_batch)

async def run():

    producer = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)

    async with producer:

      for claim in getclaims(data):

        await send_event_data_batch(producer,json.dumps(claim))

        print(claim)

  

loop = asyncio.get_event_loop()

start_time = time.time()

loop.run_until_complete(run())

print("Send messages in {} seconds.".format(time.time() - start_time))

  the problem is that i want to send batch of data every 1 minute , how can i do that ??  

Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
717 questions
{count} votes

1 answer

Sort by: Most helpful
  1. MartinJaffer-MSFT 26,236 Reputation points
    2021-06-01T20:50:15.377+00:00

    @nadine.benharrath
    It has been a long time since I worked on multi-thread or async, but I got a few ideas. They will need your help cleaning up, as I am out of practice.

    Refactor the for claim in getclaims(data): into a generator. If I recall correctly, a generator in python is kind of like an iterable, but masks how many items there are, and produces one item each call.

    Replace the loops with

    event_data_batch = await producer.create_batch() #instantiate first batch  
    send_time = time.now + 60 #set time to send first batch 1 minute in the future  
      
    while (record = generate_claim) is not null:  #get a claim, break loop if no more claims to get  
        event_data_batch.add(record)  
        if time.now > send_time: #if enough time has passed,  
            await producer.send_batch(event_data_batch) #send the batch!  
            event_data_batch = await producer.create_batch()  #clear out and create new batch for the next loop  
    

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.