Why can't I use Databricks Auto Loader to load data directly into a dataframe?

Árni 0 Reputation points
2023-08-15T13:28:19.2433333+00:00

Hi,

I'm using Auto Loader in Azure Databricks to load data from json files in Azure Data Lake Storage Gen2. Based on the Azure Databricks documentation and tutorials from Databricks I should be able to use something similar to the following code to load data into a dataframe (where I can transform the data) :

df = spark.readStream.format("cloudFiles") \
		.option("cloudFiles.schemaEvolutionMode","addNewColumns") \
		.option("cloudFiles.schemaLocation",target_schema_path)
		.option("cloudFiles.format","json")
		.load(source_glob_path)

but when I do this I get the following error message :

Queries with streaming sources must be executed with writeStream.start(); cloudFiles

Is it no longer possible to use Auto Loader to load data from data lake files directly into a dataframe? Am I doing something wrong? I'd really appreciate help with this.

Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,409 questions
{count} votes

1 answer

Sort by: Most helpful
  1. KranthiPakala-MSFT 46,632 Reputation points Microsoft Employee
    2023-08-16T19:22:15.1033333+00:00

    @Árni Welcome to Microsoft Q&A forum and thanks for reaching out here.

    A readStream behaves like a transformation and will need an action to run. The error simply means you need to start the stream i.e call the start() method on the stream. Since there is no start() method on a readStream object, one way to start the stream is to call writeStream.start(), an alternate for testing purpose is simply call df.display() here the stream is started, and the output written to the console (under the hood writeStream.format("console").start() is called). In other words, the start() method creates the streamingQueryObject which is a handle to the running query without which it's just a query or definition.

    In the code snippet you provided, you're using the readStream API to define how to read data from cloudFiles, but it looks like you're missing the actual execution step using writeStream.start().

    Here's the correct structure for reading from a streaming source and writing the results using writeStream.start():

    from pyspark.sql import SparkSession
    
     
    
    # Create a Spark session
    spark = SparkSession.builder.appName("StreamingExample").getOrCreate()
    
     
    
    # Define the streaming source and options
    source_glob_path = "path_to_source_data"  # Replace with your source path
    target_schema_path = "path_to_schema_file"  # Replace with your schema path
    
     
    
    df = spark.readStream.format("cloudFiles") \
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
        .option("cloudFiles.schemaLocation", target_schema_path) \
        .option("cloudFiles.format", "json") \
        .load(source_glob_path)
    
     
    
    # Perform transformations, aggregations, etc. on the DataFrame `df`
    
     
    
    # Define the writeStream and start the query
    query = df.writeStream \
        .outputMode("append") \
        .format("console") \
        .start()
    
     
    
    # Wait for the query to terminate
    query.awaitTermination()
    
     
    
    # Stop the Spark session
    spark.stop()
    

    Hope this info helps.


    Please don’t forget to Accept Answer and Yes for "was this answer helpful" wherever the information provided helps you, this can be beneficial to other community members.

    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.