Чтение общих таблиц разностного общего доступа с использованием Apache Spark DataFrames
В этой статье приведены примеры синтаксиса использования Apache Spark для запроса данных, совместно используемых с помощью разностного общего доступа. Используйте ключевое deltasharing
слово в качестве параметра формата для операций DataFrame.
Вы также можете создавать запросы, использующие имена общих таблиц в каталогах Delta Sharing, зарегистрированных в хранилище метаданных, например в следующих примерах:
SELECT * FROM shared_table_name
spark.read.table("shared_table_name")
Дополнительные сведения о настройке разностного общего доступа в Azure Databricks и запросах данных с использованием общих имен таблиц см. в статье "Чтение данных с использованием databricks to-Databricks Delta Shared "(для получателей)".
Структурированная потоковая передача можно использовать для обработки записей в общих таблицах постепенно. Чтобы использовать структурированную потоковую передачу, необходимо включить общий доступ к журналу для таблицы. См. раздел ALTER SHARE. Для общего доступа к журналам требуется Databricks Runtime 12.2 LTS или более поздней версии.
Если в общей таблице включен канал изменений данных в исходной таблице Delta и журнале, включенном в общей папке, можно использовать канал изменений при чтении разностной общей папки с структурированной потоковой передачей или пакетными операциями. См. статью Использование веб-канала изменений данных Delta Lake в Azure Databricks.
Ключевое deltasharing
слово поддерживается для операций чтения с данными Apache Spark, как показано в следующем примере:
df = (spark.read
.format("deltasharing")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)
Для таблиц, имеющих общий доступ к журналу и включенных веб-канал изменений, можно считывать записи веб-канала изменений с помощью Кадров данных Apache Spark. Для общего доступа к журналам требуется Databricks Runtime 12.2 LTS или более поздней версии.
df = (spark.read
.format("deltasharing")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)
Для таблиц, имеющих общий журнал, можно использовать общую таблицу в качестве источника для структурированной потоковой передачи. Для общего доступа к журналам требуется Databricks Runtime 12.2 LTS или более поздней версии.
streaming_df = (spark.readStream
.format("deltasharing")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)
# If CDF is enabled on the source table
streaming_cdf_df = (spark.readStream
.format("deltasharing")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)