Поделиться через


Запрос потоковых данных

С помощью Azure Databricks можно запрашивать источники потоковой передачи, используя структурированную потоковую передачу. Azure Databricks обеспечивает обширную поддержку потоковых рабочих нагрузок в Python и Scala и поддерживает большинство функций структурированной потоковой передачи с помощью SQL.

В следующих примерах показано использование приемника памяти для ручной проверки потоковых данных во время интерактивной разработки в записных книжках. Из-за ограничений на вывод строк в пользовательском интерфейсе записной книжки вы можете не увидеть все данные, считываемые потоковыми запросами. В рабочих нагрузках следует активировать только потоковые запросы, записывая их в целевую таблицу или внешнюю систему.

Примечание.

Поддержка SQL для интерактивных запросов на потоковую передачу данных ограничена ноутбуками, работающими на всех универсальных вычислительных ресурсах. Вы также можете использовать SQL при объявлении потоковых таблиц в Databricks SQL или в декларативных конвейерах Lakehouse Spark. См. таблицы потоковой передачи и Lakeflow Spark Declarative Pipelines.

Запрос данных из систем потоковой передачи

Azure Databricks предоставляет средства чтения потоковых данных для следующих систем потоковой передачи:

  • Кафка
  • Кинезис
  • PubSub
  • Пульсар

При инициализации запросов к этим системам необходимо указать сведения о конфигурации, которые зависят от настроенной среды и системы, из которой вы выбираете чтение. См. статью "Стандартные соединители" в Lakeflow Connect.

Распространенные рабочие нагрузки, связанные с потоковыми системами, включают загрузку данных в lakehouse и потоковую обработку для передачи данных в внешние системы. Дополнительные сведения о рабочих нагрузках потоковой передачи см. в концепциях структурированной потоковой передачи.

В следующих примерах демонстрируется интерактивное потоковое чтение из Kafka:

Питон

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Выполните запрос таблицы в режиме потокового чтения

Azure Databricks создает все таблицы с помощью Delta Lake по умолчанию. При выполнении потокового запроса к таблице Delta запрос автоматически выбирает новые записи при подтверждении версии таблицы. По умолчанию потоковые запросы ожидают, что исходные таблицы содержат только добавленные записи. Если вам нужно работать с потоковыми данными, содержащими обновления и удаления, Databricks рекомендует использовать декларативные конвейеры Lakeflow Spark и AUTO CDC ... INTO. См . API AUTO CDC: упрощение отслеживания изменений с помощью конвейеров.

В следующих примерах показано выполнение интерактивной потоковой передачи из таблицы:

Питон

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Запрос данных в облачном хранилище объектов с помощью автозагрузчика

Вы можете передавать данные из облачного хранилища объектов с помощью автозагрузчика, соединителя облачных данных Azure Databricks. Соединитель можно использовать с файлами, хранящимися в томах каталога Unity или других расположениях облачного хранилища объектов. Databricks рекомендует использовать тома для управления доступом к данным в облачном хранилище объектов. См. статью "Подключение к источникам данных" и внешним службам.

Azure Databricks оптимизирует этот соединитель для приема потоковых данных в облачном хранилище объектов, которое хранится в популярных структурированных, полуструктурированных и неструктурированных форматах. Databricks рекомендует хранить принятые данные в близком к исходному формату для максимизации пропускной способности и минимизации потенциальной потери данных из-за поврежденных записей или изменений схемы данных.

Дополнительные рекомендации по приему данных из облачного хранилища объектов см. в разделе "Стандартные соединители" в Lakeflow Connect.

В следующих примерах показано интерактивное потоковое чтение из каталога JSON-файлов в томе:

Питон

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')