Поделиться через


Создание и подключение к кластерам Ray в Azure Databricks

Узнайте, как создавать, настраивать и запускать вычислительные кластеры Ray в Azure Databricks

Требования

Чтобы создать кластер Ray, необходимо иметь доступ к вычислительному ресурсу Databricks со следующими параметрами:

  • Databricks Runtime 12.2 LTS ML и более поздних версий.
  • Режим доступа должен быть одним пользователем или без общей изоляции.

Примечание.

Кластеры ray в настоящее время не поддерживаются на бессерверных вычислениях.

Установка Ray

При использовании Databricks Runtime ML 15.0 в кластерах Azure Databricks предварительно установлен Рэй.

Для сред выполнения, выпущенных до 15.0, используйте pip для установки Ray в кластере:

%pip install ray[default]>=2.3.0

Создание кластера Ray для конкретного пользователя в кластере Azure Databricks

Чтобы создать кластер Ray, используйте API ray.util.spark.setup_ray_cluster .

Примечание.

При создании кластера Ray в записной книжке он доступен только текущему пользователю записной книжки. Кластер Ray автоматически завершает работу после отключения записной книжки от кластера или через 30 минут бездействия (задачи не были отправлены в Ray). Если вы хотите создать кластер Ray, общий доступ ко всем пользователям и не подлежит активному выполнению записной книжки, используйте ray.util.spark.setup_global_ray_cluster ВМЕСТО него API.

Кластер с фиксированным размером ray

В любой записной книжке Azure Databricks, подключенной к кластеру Azure Databricks, можно выполнить следующую команду, чтобы запустить кластер Ray с фиксированным размером:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Автоматическое масштабирование кластера Ray

Сведения о запуске кластера автомасштабирования Ray см. в статье "Масштабирование кластеров Ray" в Azure Databricks.

Запуск глобального кластера ray

С помощью Ray 2.9.0 и более поздних версий можно создать глобальный кластер ray в кластере Azure Databricks. Глобальный кластер Ray позволяет всем пользователям, подключенным к кластеру Azure Databricks, также использовать кластер Ray. В этом режиме запуска кластера Ray нет активной функции времени ожидания, которая имеется в кластере с одним пользователем при запуске экземпляра кластера Ray с одним пользователем.

Чтобы запустить глобальный кластер лучей, в который может подключиться несколько пользователей и запустить задачи Ray, сначала создайте задание записной книжки Azure Databricks и подключите его к общему режиму кластера Azure Databricks, а затем выполните следующую команду:

from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
  max_worker_nodes=2,
  ...
  # other arguments are the same as with the `setup_global_ray` API.
)

Это блокирующий вызов, который будет оставаться активным, пока не прервать вызов, нажав кнопку "Прерывание" в ячейке команды записной книжки, отсоединив записную книжку от кластера Azure Databricks или завершив кластер Azure Databricks. В противном случае глобальный кластер Ray будет продолжать работать и быть доступен для отправки задач авторизованными пользователями. Дополнительные сведения о кластерах глобального режима см . в документации по API Ray.

Кластеры глобального режима имеют следующие свойства:

  • В кластере Azure Databricks можно создать только один активный глобальный кластер Ray одновременно.
  • В кластере Azure Databricks активный глобальный кластер Ray можно использовать всеми пользователями в любой подключенной записной книжке Azure Databricks. Вы можете выполнить подключение ray.init() к активному кластеру рэй глобального режима. Так как несколько пользователей могут получить доступ к этому кластеру Ray, проблема может возникнуть.
  • Глобальный setup_ray_cluster кластер Ray находится до тех пор, пока вызов не будет прерван. В нем нет времени ожидания автоматического завершения работы, так как кластеры Ray с одним пользователем выполняются.

Создание кластера GPU Ray

Для кластеров GPU эти ресурсы можно добавить в кластер Ray следующим образом:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=8,
  num_gpus_per_node=1,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Подключение к удаленному кластеру Ray с помощью клиента Ray

В ray версии 2.3.0 и выше можно создать кластер Ray с помощью API setup_ray_cluster и в той же записной книжке можно вызвать API ray.init() для подключения к этому кластеру Ray. Чтобы получить удаленный строка подключения, используйте следующее:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Затем можно подключить удаленный кластер с помощью приведенного выше удаленного строка подключения:

import ray
ray.init(remote_conn_str)

Клиент Ray не поддерживает API набора данных Ray, определенный в модуле ray.data. В качестве обходного решения можно упаковать код, вызывающий API набора данных Ray в удаленной задаче Ray, как показано в следующем коде:

import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit  --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

Значения, которые необходимо настроить, — ЭТО URL-адрес рабочей области Azure Databricks, начиная с https://, а затем значения, найденные после обнаружения в URL-адресе прокси-сервера панели мониторинга Ray, отображаемом после /driver-proxy/o/ запуска кластера Ray.

Интерфейс командной строки заданий Ray используется для отправки заданий в кластер Ray из внешних систем, но не требуется для отправки заданий в кластерах Ray в Azure Databricks. Рекомендуется развернуть задание с помощью заданий Azure Databricks, кластера Ray для каждого приложения и существующих средств Azure Databricks, таких как пакеты ресурсов Azure Databricks или триггеры рабочих процессов, которые используются для активации задания.

Установка расположения выходных данных журнала

Аргумент можно задать collect_log_to_path , чтобы указать путь назначения, в котором требуется собрать журналы кластера Ray. Коллекция журналов выполняется после завершения работы кластера Ray.

Azure Databricks рекомендует задать путь, начинающийся с /dbfs/ или Unity Catalog Volume, чтобы сохранить журналы, даже если вы завершаете кластер Apache Spark. В противном случае журналы не восстанавливаются, так как локальное хранилище в кластере удаляется при завершении работы кластера.

После создания кластера Ray можно запустить любой код приложения Ray непосредственно в записной книжке. Щелкните "Открыть панель мониторинга кластера Ray" на новой вкладке , чтобы просмотреть панель мониторинга Ray для кластера.

Включение трассировок стека и графов пламени на странице "Субъекты панели мониторинга Луча"

На странице "Субъекты панели мониторинга Луча" можно просматривать трассировки стека и графики пламени для активных субъектов Лучей. Чтобы просмотреть эти сведения, используйте следующую команду, чтобы установить py-spy перед запуском кластера Ray:

%pip install py-spy

Создание и настройка рекомендаций

В этом разделе рассматриваются рекомендации по созданию и настройке кластеров Ray.

Рабочие нагрузки, отличные от GPU

Кластер Ray выполняется поверх кластера Azure Databricks Spark. Типичным сценарием является использование задания Spark и UDF Spark для выполнения простых задач предварительной обработки данных, которые не требуют ресурсов GPU. Затем используйте Ray для выполнения сложных задач машинного обучения, которые используют графические процессоры. В этом случае Azure Databricks рекомендует задать параметр конфигурации уровня кластера Apache Spark spark.task.resource.gpu.0, чтобы все преобразования DataFrame Apache Spark и выполнение UDF Apache Spark не использовали ресурсы GPU.

Преимущества этой конфигурации приведены ниже.

  • Он увеличивает параллелизм заданий Apache Spark, так как тип экземпляра GPU обычно имеет гораздо больше ядер ЦП, чем устройства GPU.
  • Если кластер Apache Spark совместно используется несколькими пользователями, эта конфигурация не позволяет заданиям Apache Spark конкурировать с ресурсами GPU одновременно с рабочими нагрузками Ray.

Отключение transformers интеграции MLflow тренера при использовании в задачах Ray

Интеграция transformers обучения MLflow включена по умолчанию из библиотеки transformers . Если вы используете обучение Ray для точной transformers настройки модели, задачи Ray завершаются ошибкой из-за проблемы с учетными данными. Однако эта проблема не применяется, если вы напрямую используете MLflow для обучения. Чтобы избежать этой проблемы, можно задать DISABLE_MLFLOW_INTEGRATION для переменной среды значение TRUE из конфигурации кластера Azure Databricks при запуске кластера Apache Spark.

Ошибка выбора удаленной функции Address Ray

Чтобы выполнить задачи Ray, Рэй выбирает функцию задачи. Если вы обнаружите, что сбой при выборе, необходимо диагностировать, какая часть кода вызывает сбой. Распространенными причинами ошибок выбора являются обработка внешних ссылок, закрытия и ссылок на объекты с отслеживанием состояния. Одна из самых простых ошибок для проверки и быстрого исправления может быть устранена путем перемещения инструкций импорта в объявлении функции задачи.

Например, datasets.load_dataset широко используется функция, которая исправлена на стороне драйвера среды выполнения Azure Databricks, отрисовка ссылки на неисключаемую. Чтобы устранить эту проблему, можно просто написать функцию задачи следующим образом:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Отключение монитора памяти Ray, если задача Ray неожиданно убита с ошибкой "Вне памяти" (OOM)

В Ray 2.9.3 монитор памяти Ray имеет несколько известных проблем, которые могут привести к тому, что задачи Ray непреднамеренно остановлены без причин. Чтобы устранить проблему, можно отключить монитор памяти Ray, задав переменную RAY_memory_monitor_refresh_ms 0 среды в конфигурации кластера Azure Databricks при запуске кластера Apache Spark.

Применение функций преобразования к пакетам данных

При обработке данных в пакетах рекомендуется использовать API данных Ray с map_batches функцией. Этот подход может быть более эффективным и масштабируемым, особенно для больших наборов данных или сложных вычислений, которые пользуются пакетной обработкой. Любой кадр данных Spark можно преобразовать в набор данных Ray с помощью ray.data.from_spark API. Обработанные выходные данные из вызова ЭТОГО API преобразования можно записать в таблицы UC Azure Databricks с помощью API ray.data.write_databricks_table.

Использование MLflow в задачах Ray

Чтобы использовать MLflow в задачах Ray, вам потребуется:

  • Определите учетные данные MLflow Azure Databricks в задачах Ray.
  • Создание MLflow выполняется в драйвере Apache Spark и передает созданные run_id задачи Ray.

В следующем примере кода показано, как это сделать:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in <AS> driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in <AS> driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Использование библиотек Python в области записных книжек или кластерных библиотек Python в задачах Ray

В настоящее время Ray имеет известная проблема, из-за которой задачи Ray не могут использовать библиотеки Python с областью действия записной книжки или библиотеки python кластера. Чтобы использовать дополнительные зависимости в заданиях Ray, необходимо вручную установить библиотеки с помощью %pip волшебной команды перед запуском кластера Ray-on-Spark, который будет использовать эти зависимости в задачах. Например, чтобы обновить версию Ray, которая будет использоваться для запуска кластера Ray, можно выполнить следующую команду в записной книжке:

%pip install ray==<The Ray version you want to use> --force-reinstall

Затем выполните следующую команду в записной книжке, чтобы перезапустить ядро Python:

dbutils.library.restartPython()

Следующие шаги