ThreadPoolExecutor executor script with questions pyspark

CzarR 316 Reputation points
2022-06-16T15:05:23.03+00:00

Hi the code below is what I have my question on. I am trying to fully understand/learn the code. 1.) What is (line 17) "row for row in " doing? 2.) Why are we writing "future_to_url[future] (line 19) 3.) Line 21, what is %r and %s

Please help me understand. Thanks in advance.

 eventhub = dbutils.fs.ls('dbfs:/mnt/' + hubloc + '/')  
      
    for eh in eventhub:  
      if (hubsource.lower() in eh.path.lower()):  
        pth = eh.path.replace('dbfs:','/dbfs') + "*/*/*/*/*/*/*.avro"  
        #print(pth)  
        dfHubFiles = buildSearchData(pth)  
         
        dest = eh.path.replace('/'+ hubloc +'/' + eh.name,'/DataSources/IQA/' + targetdrop + '/Working').replace("dbfs:","")  
        print(dest)  
         
        #Process the files restructuring, copying, archiving and removal  
        if (1==1):  
          # We can use a with statement to ensure threads are cleaned up promptly  
          with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:  
              # Start the load operations and mark each future with its URL  
              future_to_url = {executor.submit(buildJsonFileDir, row,dest): row for row in dfHubFiles.rdd.collect()}  
              for future in concurrent.futures.as_completed(future_to_url):  
                url = future_to_url[future]  
                if future.exception() is not None:  
                      print('%r generated an exception: %s' % (url,future.exception()))  
                else:  
                  i=1  
Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
5,134 questions
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,305 questions
0 comments No comments
{count} votes

Accepted answer
  1. MartinJaffer-MSFT 26,211 Reputation points
    2022-06-17T19:31:08.713+00:00

    Hello @CzarR and welcome to Microsoft Q&A.

    I see you want to know how or why this code works. I notice this code is extremely similar to concurrent.futures.html.
    It has been a long time since I dealt with this particular nuance, but the only way I can read it, that makes sense is like this:

    future_to_url = {executor.submit(buildJsonFileDir, row,dest): row for row in dfHubFiles.rdd.collect()}  
    

    executor.submit takes a pointer to a function, and parameters to be fed to that function. dest has been previousle defined. However row has not been defined yet.
    The binding of row has been delayed, or deferred to the next statement.
    The row before for row in... is naming the output of for row in dfHubFiles.rdd.collect(). Normally when we see for x in y it is followed by a code block, however here it is not. Here it is the unpacking of the collection that is the goal.

    Try running in the interpreter:

    stuff = [1,2,3,4]  
      
    {print(f): f for f in stuff}  
    

    This should help illuminate some of what is happening. For a better explanation, I think you should check on StackOverflow. This also looks similar to lambda stuff and generator expressions and comprehensions. Yes I think it is a comprehension. ( a = {x for x in 'abracadabra' if x not in 'abc'} )

    In line 21:

    print('%r generated an exception: %s' % (url,future.exception()))  
    

    The %r and %s are symbols to insert string defined later (after %), namely the url and future.exception.

    This is printf-style string formatting. %r uses the repr() to convert to string and %s uses str() to convert to string.

    1 person found this answer helpful.

0 additional answers

Sort by: Most helpful

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.