Поделиться через


Запуск кластера Ray в Azure Databricks

Azure Databricks упрощает процесс запуска кластера Ray, обрабатывая кластер и конфигурацию заданий так же, как и с любым заданием Apache Spark. Это связано с тем, что кластер Ray фактически запущен на основе управляемого кластера Apache Spark.

Запуск Ray на локальном компьютере

import ray

ray.init()

Запуск Ray в Azure Databricks

from ray.util.spark import setup_ray_cluster
import ray

# If the cluster has four workers with 8 CPUs each as an example
setup_ray_cluster(num_worker_nodes=4, num_cpus_per_worker=8)

# Pass any custom configuration to ray.init
ray.init(ignore_reinit_error=True)

Этот подход работает в любом масштабе кластера от нескольких до сотен узлов. Кластеры Ray в Azure Databricks также поддерживают автомасштабирование.

После создания кластера Ray можно запустить любой код приложения Ray в записной книжке Azure Databricks.

Внимание

Databricks рекомендует установить все необходимые библиотеки для приложения, %pip install <your-library-dependency> чтобы обеспечить их доступность в кластере и приложении Ray соответствующим образом. Указание зависимостей в вызове функции Ray init устанавливает зависимости в недоступном для рабочих узлов Apache Spark, что приводит к несовместимости версий и ошибкам импорта.

Например, можно запустить простое приложение Ray в записной книжке Azure Databricks следующим образом:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

Завершение работы кластера Ray

Кластеры Ray автоматически завершаются при следующих обстоятельствах:

  • Вы отсоедините интерактивную записную книжку из кластера Azure Databricks.
  • Задание Azure Databricks завершено.
  • Кластер Azure Databricks перезапускается или завершается.
  • В течение указанного времени простоя не выполняется никаких действий.

Чтобы завершить работу кластера Ray, работающего в Azure Databricks, можно вызвать ray.utils.spark.shutdown_ray_cluster API.

from ray.utils.spark import shutdown_ray_cluster
import ray

shutdown_ray_cluster()
ray.shutdown()

Следующие шаги