Чтение общих данных с помощью открытого общего доступа Delta Sharing (для получателей)

В этой статье описывается, как считывать данные, которыми вы предоставили общий доступ с помощью протокола открытого общего доступа Delta Sharing. При открытом совместном доступе вы используете файл учетных данных, который был предоставлен участнику команды поставщиком данных, чтобы получить безопасный доступ на чтение к общим данным. Доступ сохраняется до тех пор, пока учетные данные действительны, и поставщик продолжает предоставлять общий доступ к данным. Поставщики управляют истечением срока действия учетных данных и сменой. Обновления данных появляются почти в режиме реального времени. Вы можете считывать общие данные и создавать их копии, но не можете вносить в них изменения.

Примечание.

Если вы предоставили доступ к данным с помощью Databricks-to-Databricks Delta Shared, вам не нужен файл учетных данных для доступа к данным, и эта статья не применяется к вам. Инструкции см. в разделе "Чтение данных" с помощью Databricks to Databricks Delta Sharing (для получателей).

В следующих разделах описано, как использовать Azure Databricks, Apache Spark, pandas и Power BI для доступа к общим данным и считывания общих данных с помощью файла учетных данных. Полный список соединителей Delta Share и сведения об их использовании см. в документации по delta Sharing открытый код. При возникновении проблем с доступом к общим данным обратитесь к поставщику данных.

Примечание.

Интеграцию с партнерскими продуктами, если не указано иное, предоставляют третьи лица. Для работы с продуктами и службами определенного поставщика требуется соответствующая учетная запись. Прилагаются все усилия, чтобы поддерживать содержимое Databricks в актуальном состоянии. Но мы не делаем никаких заявлений в отношении интеграции или точности содержимого на партнерских страницах со сведениями об интеграции. По вопросам интеграции обращайтесь к соответствующим поставщикам.

Подготовка к работе

Член вашей команды должен скачать файл учетных данных, к которым предоставлен доступ поставщику данных. См. статью "Получить доступ" в открытой модели общего доступа.

Они должны использовать безопасный канал для совместного использования этого файла или расположения файлов с вами.

Azure Databricks: чтение общих данных с помощью открытых соединителей общего доступа

В этом разделе описывается, как использовать открытый соединитель общего доступа для доступа к общим данным с помощью записной книжки в рабочей области Azure Databricks. Вы или другой член вашей команды храните файл учетных данных в DBFS, а затем используете его для проверки подлинности в учетной записи azure Databricks поставщика данных и считываете данные, которые поставщик данных предоставил вам.

Примечание.

Если поставщик данных использует совместное использование Databricks to Databricks и не предоставляет общий доступ к файлу учетных данных, необходимо получить доступ к данным с помощью каталога Unity. Инструкции см. в разделе "Чтение данных" с помощью Databricks to Databricks Delta Sharing (для получателей).

В этом примере создается записная книжка с несколькими ячейками, которые можно запускать независимо. Вместо этого можно добавить команды записной книжки в ту же ячейку и запустить их в последовательности.

Шаг 1. Хранение файла учетных данных в DBFS (инструкции Python)

На этом шаге вы используете записную книжку Python в Azure Databricks для хранения файла учетных данных, чтобы пользователи в вашей команде могли получить доступ к общим данным.

Перейдите к следующему шагу, если вы или кто-то из вашей команды уже сохранили файл учетных данных в DBFS.

  1. В текстовом редакторе откройте файл учетных данных.

  2. В рабочей области Azure Databricks нажмите кнопку "Создать > записную книжку".

    • Введите имя.
    • Задайте язык по умолчанию для записной книжки в Python.
    • Выберите кластер для подключения к записной книжке.
    • Нажмите кнопку Создать.

    Записная книжка откроется в редакторе записных книжек.

  3. Чтобы с помощью Python или pandas получить доступ к общим данным, установите соединитель Python для разностного общего доступа. В редакторе записной книжки вставьте следующую команду:

    %sh pip install delta-sharing
    
  4. Запустите ячейку.

    Библиотека delta-sharing Python устанавливается в кластере, если она еще не установлена.

  5. В новой ячейке вставьте следующую команду, которая передает содержимое файла учетных данных в папку в DBFS. Замените переменные следующим образом:

    • <dbfs-path>: указывает путь к папке, в которую следует сохранить файл учетных данных.

    • <credential-file-contents>: содержимое файла учетных данных. Это не путь к файлу, а скопированное содержимое файла.

      Файл учетных данных содержит JSON, определяющий три поля: shareCredentialsVersion, endpointи bearerToken.

      %scala
      dbutils.fs.put("<dbfs-path>/config.share","""
      <credential-file-contents>
      """)
      
  6. Запустите ячейку.

    После отправки файла учетных данных эту ячейку можно удалить. Все пользователи рабочей области могут считывать файл учетных данных из DBFS, а файл учетных данных доступен в DBFS во всех кластерах и хранилищах SQL в рабочей области. Чтобы удалить ячейку, щелкните x в меню Действия ячейки действий ячейки справа.

Шаг 2. Использование записной книжки для перечисления и чтения общих таблиц

На этом шаге вы перечисляете таблицы в общей папке или набор общих таблиц и секций, а также запрашиваете таблицу.

  1. С помощью Python получите список таблиц в общей папке.

    В новой ячейке вставьте следующую команду. Замените <dbfs-path> путь, созданный на шаге 1. Сохраните файл учетных данных в DBFS (инструкции Python).

    При выполнении кода Python считывает файл учетных данных из DBFS в кластере. Доступ к данным, хранящимся в DBFS в пути /dbfs/.

    import delta_sharing
    
    client = delta_sharing.SharingClient(f"/dbfs/<dbfs-path>/config.share")
    
    client.list_all_tables()
    
  2. Запустите ячейку.

    Результатом является массив таблиц, а также метаданные для каждой таблицы. В следующих выходных данных показаны две таблицы:

    Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
    

    Если выходные данные пусты или не содержат нужных таблиц, обратитесь к поставщику данных.

  3. Запрос общей таблицы.

    • Использование Scala:

      В новой ячейке вставьте следующую команду. При выполнении кода файл учетных данных считывается из DBFS через виртуальную машину Java.

      Замените переменные следующим образом:

      • <profile-path>: путь к файлу учетных данных в DBFS. Например, /<dbfs-path>/config.share.
      • <share-name>: значение share= для таблицы.
      • <schema-name>: значение schema= для таблицы.
      • <table-name>: значение name= для таблицы.
      %scala
          spark.read.format("deltaSharing")
          .load("<profile-path>#<share-name>.<schema-name>.<table-name>").limit(10);
      

      Запустите ячейку. Каждый раз при загрузке общей таблицы отображаются новые данные из источника.

    • Использование SQL:

      Чтобы запросить данные с помощью SQL, создайте локальную таблицу в рабочей области из общей таблицы, а затем запросите локальную таблицу. Общие данные не хранятся и не кэшируются в локальной таблице. Каждый раз при запросе локальной таблицы отображается текущее состояние общих данных.

      В новой ячейке вставьте следующую команду.

      Замените переменные следующим образом:

      • <local-table-name>: имя локальной таблицы.
      • <profile-path>: расположение файла учетных данных.
      • <share-name>: значение share= для таблицы.
      • <schema-name>: значение schema= для таблицы.
      • <table-name>: значение name= для таблицы.
      %sql
      DROP TABLE IF EXISTS table_name;
      
      CREATE TABLE <local-table-name> USING deltaSharing LOCATION "<profile-path>#<share-name>.<schema-name>.<table-name>";
      
      SELECT * FROM <local-table-name> LIMIT 10;
      

      При выполнении команды запрос к общим данным отправляются напрямую. В качестве теста выполняется запрос к таблице и возвращаются первые 10 результатов.

    Если выходные данные пусты или не содержат нужных данных, обратитесь к поставщику.

Apache Spark: чтение общих данных

Выполните следующие действия, чтобы получить доступ к общим данным с помощью Spark 3.x или более поздней версии.

В этих инструкциях предполагается, что у вас есть доступ к файлу учетных данных, который был предоставлен поставщиком данных. См. статью "Получить доступ" в открытой модели общего доступа.

Установка соединителей Python и Spark для разностного общего доступа

Чтобы получить доступ к метаданным, связанным с общими данными, например списком таблиц, которыми вы поделились, выполните указанные ниже действия. В этом примере используется Python.

  1. Установите соединитель Python для разностного доступа:

    pip install delta-sharing
    
  2. Установите соединитель Apache Spark.

Вывод списка общих таблиц с помощью Spark

Получите список таблиц в общей папке. В следующем примере замените <profile-path> на расположение файла учетных данных.

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

Результатом является массив таблиц, а также метаданные для каждой таблицы. В следующих выходных данных показаны две таблицы:

Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]

Если выходные данные пусты или не содержат нужных таблиц, обратитесь к поставщику данных.

Доступ к общим данным с помощью Spark

Выполните следующие действия, заменив эти переменные:

  • <profile-path>: расположение файла учетных данных.
  • <share-name>: значение share= для таблицы.
  • <schema-name>: значение schema= для таблицы.
  • <table-name>: значение name= для таблицы.
  • <version-as-of>: необязательно. Версия таблицы для загрузки данных. Работает только в том случае, если поставщик данных предоставляет общий доступ к журналу таблицы. Требуется delta-sharing-spark 0.5.0 или более поздней версии.
  • <timestamp-as-of>: необязательно. Загрузите данные в версию до или в заданной метке времени. Работает только в том случае, если поставщик данных предоставляет общий доступ к журналу таблицы. Требуется delta-sharing-spark 0.6.0 или более поздней версии.

Python

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)

spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)

spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

Scala

Выполните следующие действия, заменив эти переменные:

  • <profile-path>: расположение файла учетных данных.
  • <share-name>: значение share= для таблицы.
  • <schema-name>: значение schema= для таблицы.
  • <table-name>: значение name= для таблицы.
  • <version-as-of>: необязательно. Версия таблицы для загрузки данных. Работает только в том случае, если поставщик данных предоставляет общий доступ к журналу таблицы. Требуется delta-sharing-spark 0.5.0 или более поздней версии.
  • <timestamp-as-of>: необязательно. Загрузите данные в версию до или в заданной метке времени. Работает только в том случае, если поставщик данных предоставляет общий доступ к журналу таблицы. Требуется delta-sharing-spark 0.6.0 или более поздней версии.
spark.read.format("deltaSharing")
.option("versionAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

spark.read.format("deltaSharing")
.option("timestampAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

Доступ к общему каналу данных об изменениях с помощью Spark

Если журнал таблиц предоставлен вам, а веб-канал изменений (CDF) включен в исходной таблице, вы можете получить доступ к веб-каналу измененных данных, выполнив указанные ниже действия, заменив эти переменные. Требуется delta-sharing-spark 0.5.0 или более поздней версии.

Необходимо указать только один и только один начальный параметр.

  • <profile-path>: расположение файла учетных данных.
  • <share-name>: значение share= для таблицы.
  • <schema-name>: значение schema= для таблицы.
  • <table-name>: значение name= для таблицы.
  • <starting-version>: необязательно. Начальная версия запроса включительно. Укажите значение long.
  • <ending-version>: необязательно. Конечная версия запроса включительно. Если конечная версия не указана, API использует последнюю версию таблицы.
  • <starting-timestamp>: необязательно. Начальная метка времени запроса преобразуется в версию, созданную больше или равной этой метке времени. Укажите строку в формате yyyy-mm-dd hh:mm:ss[.fffffffff].
  • <ending-timestamp>: необязательно. Конечная метка времени запроса преобразуется в версию, созданную ранее или равной этой метке времени. Укажите строку в формате yyyy-mm-dd hh:mm:ss[.fffffffff]

Python

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<ending-version>)

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Scala

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("statingVersion", <starting-version>)
.option("endingVersion", <ending-version>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("startingTimestamp", <starting-timestamp>)
.option("endingTimestamp", <ending-timestamp>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Если выходные данные пусты или не содержат нужных данных, обратитесь к поставщику.

Доступ к общей таблице с помощью структурированной потоковой передачи Spark

Если журнал таблиц предоставлен вам общий доступ, вы можете выполнять потоковую передачу общих данных. Требуется delta-sharing-spark 0.6.0 или более поздней версии.

Поддерживаемые параметры:

  • ignoreDeletes: игнорировать транзакции, которые удаляют данные.
  • ignoreChanges: повторно обработать обновления, если файлы были перезаписаны в исходной таблице из-за операции изменения данных, например UPDATE, ( MERGE INTODELETE в разделах) или OVERWRITE. Неизменяемые строки по-прежнему можно выдавать. Таким образом, подчиненные потребители должны иметь возможность обрабатывать дубликаты. Удаления не распространяются по нисходящей. ignoreChanges включает ignoreDeletes. Поэтому, если используется ignoreChanges, поток не будет нарушаться удалением или обновлением исходной таблицы.
  • startingVersion: версия общей таблицы для начала. Все изменения таблицы, начиная с этой версии (включительно), будут считываться источником потоковой передачи.
  • startingTimestamp: Метка времени для запуска. Все изменения в таблице, зафиксированные в момент или после метки времени (включительно), считываются источником потоковой передачи. Пример: "2023-01-01 00:00:00.0".
  • maxFilesPerTrigger: количество новых файлов, которые следует учитывать в каждом микропакете.
  • maxBytesPerTrigger: объем данных, обрабатываемых в каждом микропакете. Этот параметр задает значение "мягкого максимума", то есть пакет обрабатывает приблизительно такой объем данных и может обработать больше, чтобы потоковый запрос продолжился в случаях, когда наименьший входной блок превышает это ограничение.
  • readChangeFeed: поток считывает веб-канал измененных данных общей таблицы.

Неподдерживаемые параметры:

  • Trigger.availableNow

Примеры структурированных запросов потоковой передачи

Scala
spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
Python
spark.readStream.format("deltaSharing")\
.option("startingVersion", 0)\
.option("ignoreDeletes", true)\
.option("maxBytesPerTrigger", 10000)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

См. также потоковую передачу в Azure Databricks.

Чтение таблиц с включенными векторами удаления или сопоставлением столбцов

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

Векторы удаления — это функция оптимизации хранилища, которую поставщик может включить в общих таблицах Delta. См. раздел " Что такое векторы удаления?".

Azure Databricks также поддерживает сопоставление столбцов для таблиц Delta. См. раздел "Переименование и удаление столбцов" с сопоставлением столбцов Delta Lake.

Если поставщик предоставил доступ к таблице с включенными векторами удаления или сопоставлением столбцов, вы можете прочитать таблицу с помощью вычислений, работающих под delta-sharing-spark управлением 3.1 или более поздней версии. Если вы используете кластеры Databricks, вы можете выполнять пакетные операции чтения с помощью кластера под управлением Databricks Runtime 14.1 или более поздней версии. Для запросов CDF и потоковой передачи требуется Среда выполнения Databricks 14.2 или более поздней версии.

Пакетные запросы можно выполнять как есть, так как они могут автоматически разрешаться responseFormat на основе функций таблицы общей таблицы.

Чтобы считывать веб-канал изменений (CDF) или выполнять потоковые запросы к общим таблицам с включенными векторами удаления или сопоставлением столбцов, необходимо задать дополнительный параметр responseFormat=delta.

В следующих примерах показаны пакетные запросы, CDF и потоковой передачи:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
        .builder()
        .appName("...")
        .master("...")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()

val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

// Batch query
spark.read.format("deltaSharing").load(tablePath)

// CDF query
spark.read.format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("responseFormat", "delta")
  .option("startingVersion", 1)
  .load(tablePath)

// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)

Pandas: чтение общих данных

Чтобы получить доступ к общим данным в pandas 0.25.3 или более поздней версии, выполните указанные ниже действия.

В этих инструкциях предполагается, что у вас есть доступ к файлу учетных данных, который был предоставлен поставщиком данных. См. статью "Получить доступ" в открытой модели общего доступа.

Установка соединителя Python для разностного общего доступа

Чтобы получить доступ к метаданным, связанным с общими данными, например списком таблиц, которыми вы поделились, необходимо установить соединитель Python для разностного доступа.

pip install delta-sharing

Вывод списка общих таблиц с помощью pandas

Чтобы получить список таблиц в общей папке, выполните следующую команду, заменив <profile-path>/config.share расположение файла учетных данных.

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

Если выходные данные пусты или не содержат нужных таблиц, обратитесь к поставщику данных.

Доступ к общим данным с помощью pandas

Чтобы получить доступ к общим данным в pandas с помощью Python, выполните следующие действия, заменив переменные следующим образом:

  • <profile-path>: расположение файла учетных данных.
  • <share-name>: значение share= для таблицы.
  • <schema-name>: значение schema= для таблицы.
  • <table-name>: значение name= для таблицы.
import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")

Доступ к общему каналу данных об изменениях с помощью pandas

Чтобы получить доступ к каналу измененных данных для общей таблицы в pandas с помощью Python, выполните следующие действия, заменив переменные следующим образом. Веб-канал изменений может быть недоступен в зависимости от того, предоставил ли поставщик данных общий доступ к веб-каналу изменений для таблицы.

  • <starting-version>: необязательно. Начальная версия запроса включительно.
  • <ending-version>: необязательно. Конечная версия запроса включительно.
  • <starting-timestamp>: необязательно. Начальная метка времени запроса. Это преобразуется в версию, созданную больше или равной этой метке времени.
  • <ending-timestamp>: необязательно. Конечная метка времени запроса. Это преобразуется в версию, созданную ранее или равной этой метке времени.
import delta_sharing
delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<starting-version>)

delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

Если выходные данные пусты или не содержат нужных данных, обратитесь к поставщику.

Power BI: чтение общих данных

Соединитель Delta Sharing Power BI позволяет обнаруживать, анализировать и визуализировать наборы данных, которыми вы поделились с помощью открытого протокола Delta Sharing.

Требования

  • Power BI Desktop 2.99.621.0 или более поздняя версия.
  • Доступ к файлу учетных данных, который был предоставлен поставщиком данных. См. статью "Получить доступ" в открытой модели общего доступа.

Подключение к Databricks

Чтобы подключиться к Azure Databricks с помощью соединителя Delta Sharing, сделайте следующее:

  1. Откройте общий файл учетных данных с текстовым редактором, чтобы получить URL-адрес конечной точки и маркер.
  2. Запустите Power BI Desktop.
  3. В меню Получение данных найдите выполните поиск по запросу Разностный общий доступ.
  4. Выберите соединитель и щелкните Подключиться.
  5. Введите URL-адрес конечной точки, скопированный из файла учетных данных, в поле URL-адрес сервера разностного общего доступа.
  6. При необходимости на вкладке Дополнительные параметры задайте максимальное число строк которое можно скачать. По умолчанию задано значение в 1 000 000 строк.
  7. Щелкните OK.
  8. В разделе Проверка подлинности скопируйте токен, полученный из файла учетных данных, в поле Маркер носителя.
  9. Щелкните Подключить.

Ограничения соединителя разностного доступа Power BI

Подключение совместного использования Power BI Delta Имеет следующие ограничения:

  • Данные, которые загружает соединитель, должны соответствовать памяти компьютера. Чтобы убедиться в этом, соединитель ограничивает количество импортированных строк ограничением строк, заданным на вкладке "Дополнительные параметры" в Power BI Desktop.

Запрос новых учетных данных

Если URL-адрес активации учетных данных или скачанные учетные данные потеряны, повреждены или скомпрометированы, или срок действия учетных данных истекает без отправки нового поставщика, обратитесь к поставщику, чтобы запросить новые учетные данные.