Потоковая обработка в бессерверной вычислительной среде

На этой странице описано, как выбрать оптимальную конфигурацию для бессерверных задач потоковой обработки в Azure Databricks, включая непрерывные конвейеры, инкрементальную загрузку и управляемые коннекторы. Выбор подходящей конфигурации зависит от источника потока, его формы и требований к задержке.

Что подсчитывается как рабочая нагрузка потоковой передачи

Рабочая нагрузка потоковой передачи считывает несвязанные данные из источника (например, облачного хранилища объектов, шины сообщений или канала изменений) и записывает данные в приемник постепенно. Azure Databricks поддерживает два шаблона потоковых рабочих нагрузок:

  • Непрерывный: конвейер, который выполняется без остановки и обрабатывает новые данные по мере поступления. Задержка измеряется в секундах.
  • Инкрементный (также называемый триггерным): конвейер, который выполняется по расписанию или по триггеру, обрабатывает все данные, поступившие с момента предыдущего запуска, и останавливается. Задержка измеряется в минутах.

Некоторые рабочие нагрузки выглядят как поточные конвейеры, но технически ими не являются. Примеры включают службу, которая содержит веб-шлюз, открытый для прослушивания событий, приложение чата, которое поддерживает постоянное подключение для каждого пользователя или приемник веб-перехватчика, обрабатывающий входящие HTTP-запросы. Это приложения, а не конвейеры потоковой передачи. Правильный вариант бессерверного сервера для этих рабочих нагрузок см. в разделе "Рабочие нагрузки, которые не являются конвейерами потоковой передачи".

Выбор правильной конфигурации потоковой передачи

Эта таблица сопоставляет варианты использования бессерверных конфигураций, которые лучше всего подходят для них. В следующих разделах на этой странице приведены дополнительные сведения об этих рекомендациях.

Сценарий использования Рекомендуемая конфигурация Почему
Непрерывный потоковый ETL или потоковые преобразования с низкой задержкой Декларативные конвейеры Lakeflow Spark в непрерывном режиме Непрерывный режим предназначен для постоянно активных потоков. Потоковая конвейеризация выполняет микропакеты одновременно, повышая пропускную способность и снижая задержку. Управляемое состояние обеспечивает автоматическое восстановление.
Инкрементальная загрузка из облачного хранилища Используйте автозагрузчик внутри декларативных конвейеров Lakeflow Spark (для низкой задержки) или в бессерверном заданииTrigger.AvailableNow() (если низкая задержка допустима). Автозагрузчик эффективно отслеживает новые файлы. Trigger.AvailableNow() обрабатывает накопившиеся задачи, а затем завершает выполнение, что подходит для запуска по расписанию или по запросу.
Управляемый сбор данных из источников SaaS или из баз данных через CDC Стандартные соединители в Lakeflow Connect Полностью управляемые соединители с бессерверными конвейерами приема данных. Код не требуется для поддерживаемых источников.
Потоковая передача SQL по разностным таблицам Потоковые таблицы Нативная для SQL инкрементальная обработка для источников с дописыванием данных, с управляемыми конвейерами и обновлением.
Периодическая микро пакетная обработка в записной книжке или задании Бессерверная задача с Trigger.AvailableNow() Экономически эффективно, если достаточно обновления данных раз в минуту. Бессерверные вычисления быстро запускаются и завершаются по окончании пакетного задания.

Непрерывная потоковая передача

Для непрерывной потоковой передачи на бессерверных вычислениях используйте декларативные конвейеры Spark Lakeflow в непрерывном режиме. Конвейер остается запущенным, обрабатывает записи по мере их поступления и автоматически восстанавливается после сбоев.

Чтобы настроить непрерывный поток, выполните приведенные действия.

Tip

Конвейерная обработка потоков включена по умолчанию в бессерверных конвейерах Lakeflow Spark Declarative Pipelines. Микропакеты выполняются параллельно, а не последовательно, что повышает пропускную способность для потоков с интенсивной загрузкой данных.

Триггеры структурированной потоковой передачи на основе времени, такие как Trigger.ProcessingTime(interval) иTrigger.Continuous(interval), недоступны в бессерверных записных книжках или заданиях. Используйте Lakeflow Spark Declarative Pipelines в непрерывном режиме для сценария постоянной работы. См. ограничения потоковой передачи. Trigger.Once() поддерживается, но не рекомендуется— перенос существующих запросов в Trigger.AvailableNow().

Инкрементальная и запускаемая по триггеру потоковая передача данных

Для инкрементной потоковой обработки запустите Structured Streaming с помощью Trigger.AvailableNow() в бессерверном задании. Каждый запуск обрабатывает все данные, поступающие с момента последней контрольной точки, а затем завершает работу.

Чтобы настроить бессерверное задание с добавочной потоковой передачей:

В следующем примере считываются новые файлы из облачного хранилища (source_path) с автозагрузчиком, обрабатываются все данные, доступные во время выполнения, и записывается в таблицу Delta:

(spark.readStream
   .format("cloudFiles")
   .option("cloudFiles.format", "json")
   .option("cloudFiles.maxFilesPerTrigger", 1000)
   .load(source_path)
   .writeStream
   .trigger(availableNow=True)
   .option("checkpointLocation", checkpoint_path)
   .toTable("catalog.schema.target_table"))

Задание по расписанию Trigger.AvailableNow() — это наиболее экономичная схема потоковой обработки в бессерверной среде, когда допустима задержка порядка минуты. Вычислительные ресурсы запускаются за считанные секунды, выполняют пакетное задание и отключаются.

Управляемое прием

Если источником является приложение SaaS или операционная база данных, используйте Lakeflow Connect вместо написания структурированного кода потоковой передачи. Lakeflow Connect запускает бессерверные конвейеры приема данных для таких коннекторов, как Salesforce, Workday, SQL Server CDC и PostgreSQL CDC. См. раздел "Управляемые соединители" в Lakeflow Connect.

Этот путь является правильным ответом, когда:

  • Для вашего источника доступен коннектор.
  • Требуется управляемый конвейер, а не пользовательский код.
  • Вам нужны поддержка эволюции схемы, отслеживание происхождения данных и мониторинг из коробки.

Добавочная обработка данных, управляемых SQL

Для команд, ориентированных на SQL, используйте потоковые таблицы для задач потоковой обработки, изначально ориентированных на SQL. Таблицы потоковой передачи можно определить в декларативных конвейерах Spark Lakeflow или как автономные таблицы потоковой передачи.

Для автономных таблиц потоковой передачи, созданных с помощью CREATE OR REFRESH STREAMING TABLE инструкции SQL, начальное обновление данных и заполнение начинаются немедленно. Выделенный бессерверный конвейер автоматически создается и управляется системой для каждой потоковой таблицы.

Если вам нужны результаты запроса пакетной семантики с управляемым обновлением, используйте материализованные представления. См. материализованные представления.

Рабочие нагрузки, не являющиеся потоковыми конвейерами

Рабочая нагрузка, которая должна поддерживать долговременное соединение, прослушивать порт или отвечать на входящие HTTP-запросы, не является потоковым конвейером; это приложение. Не выполняйте эти рабочие нагрузки на бессерверном задании. Верные варианты Databricks:

  • Длительные службы, которым требуется постоянное подключение или конечная точка HTTP: создание службы с помощью приложений Databricks. Databricks Apps — это бессерверная платформа для размещения пользовательских приложений на Azure Databricks, включая FastAPI, Flask, Streamlit, Dash, Gradio, Node.jsи приложения Shiny. См. статью "Приложения Databricks".
  • Входящие вебхуки или обработчики событий: опубликуйте конечную точку HTTP в Databricks Apps или принимайте вебхуки во внешнем сервисе и записывайте события в облачное хранилище или шину сообщений, а затем считывайте их с помощью бессерверного потокового конвейера.
  • Пользовательский токен или обмен учетными данными: используйте субъекты-службы с OAuth или вызывайте REST API Databricks из приложения. Потоковые конвейеры не хранят сеансы отдельных пользователей или произвольное состояние токена.

Если вы оцениваете, подходит ли ваша рабочая нагрузка для потокового конвейера, спросите себя:

  • Считывает ли рабочая нагрузка данные из неограниченного источника данных и записывает их в приёмник? Если да, это конвейер потоковой передачи.
  • Должна ли рабочая нагрузка поддерживать соединение с клиентом открытым? Если да, это приложение; используйте Приложения Databricks.

Ограничения

Бессерверные вычисления накладывают следующие ограничения потоковой передачи. Ни один из них не предотвращает указанные выше рабочие нагрузки при паре с правильным продуктом.

  • Триггеры структурированной потоковой передачи на основе времени (Trigger.ProcessingTime(interval) и Trigger.Continuous(interval)) не поддерживаются в бессерверных записных книжках или заданиях. Используйте декларативные конвейеры Lakeflow Spark в непрерывном режиме для постоянно работающих потоков или Trigger.AvailableNow() для запусков по триггеру. См. ограничения потоковой передачи.
  • Потоковые запросы без явного триггера завершаются с ошибкой INFINITE_STREAMING_TRIGGER_NOT_SUPPORTED. Apache Spark по умолчанию использует Trigger.ProcessingTime("0 seconds"), что не поддерживается в бессерверной среде вычислений. Всегда устанавливайте Trigger.AvailableNow() для каждого потокового запроса или используйте декларативные конвейеры Spark Lakeflow в непрерывном режиме.
  • Все ограничения потоковой передачи в стандартном режиме доступа также применяются к бессерверным вычислениям. См. ограничения потоковой передачи.

Дальнейшие действия