Read Delta Sharing shared tables using Apache Spark DataFrames
This article provides syntax examples of using Apache Spark to query data shared using Delta Sharing. Use the deltasharing
keyword as a format option for DataFrame operations.
You can also create queries that use shared table names in Delta Sharing catalogs registered in the metastore, such as those in the following examples:
SELECT * FROM shared_table_name
spark.read.table("shared_table_name")
For more on configuring Delta Sharing in Azure Databricks and querying data using shared table names, see Read data shared using Databricks-to-Databricks Delta Sharing (for recipients).
You can use Structured Streaming to process records in shared tables incrementally. To use Structured Streaming, you must enable history sharing for the table. See ALTER SHARE. History sharing requires Databricks Runtime 12.2 LTS or above.
If the shared table has change data feed enabled on the source Delta table and history enabled on the share, you can use change data feed while reading a Delta share with Structured Streaming or batch operations. See Use Delta Lake change data feed on Azure Databricks.
The deltasharing
keyword is supported for Apache Spark DataFrame read operations, as shown in the following example:
df = (spark.read
.format("deltasharing")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)
For tables that have history shared and change data feed enabled, you can read change data feed records using Apache Spark DataFrames. History sharing requires Databricks Runtime 12.2 LTS or above.
df = (spark.read
.format("deltasharing")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)
For tables that have history shared, you can use the shared table as a source for Structured Streaming. History sharing requires Databricks Runtime 12.2 LTS or above.
streaming_df = (spark.readStream
.format("deltasharing")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)
# If CDF is enabled on the source table
streaming_cdf_df = (spark.readStream
.format("deltasharing")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)