@Árni Welcome to Microsoft Q&A forum and thanks for reaching out here.
A readStream behaves like a transformation and will need an action to run. The error simply means you need to start the stream i.e call the start() method on the stream. Since there is no start() method on a readStream object, one way to start the stream is to call writeStream.start(), an alternate for testing purpose is simply call df.display() here the stream is started, and the output written to the console (under the hood writeStream.format("console").start() is called). In other words, the start() method creates the streamingQueryObject which is a handle to the running query without which it's just a query or definition.
In the code snippet you provided, you're using the readStream
API to define how to read data from cloudFiles
, but it looks like you're missing the actual execution step using writeStream.start()
.
Here's the correct structure for reading from a streaming source and writing the results using writeStream.start():
from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder.appName("StreamingExample").getOrCreate()
# Define the streaming source and options
source_glob_path = "path_to_source_data" # Replace with your source path
target_schema_path = "path_to_schema_file" # Replace with your schema path
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
.option("cloudFiles.schemaLocation", target_schema_path) \
.option("cloudFiles.format", "json") \
.load(source_glob_path)
# Perform transformations, aggregations, etc. on the DataFrame `df`
# Define the writeStream and start the query
query = df.writeStream \
.outputMode("append") \
.format("console") \
.start()
# Wait for the query to terminate
query.awaitTermination()
# Stop the Spark session
spark.stop()
Hope this info helps.
Please don’t forget to Accept Answer
and Yes
for "was this answer helpful" wherever the information provided helps you, this can be beneficial to other community members.