Compartilhar via


Criar e conectar-se a clusters do Ray no Azure Databricks

Saiba como criar, configurar e executar clusters de computação do Ray no Azure Databricks

Requisitos

Para criar um cluster do Ray, você deve ter acesso a um recurso de computação para todas as finalidades do Databricks com as seguintes configurações:

  • Databricks Runtime 12.2 LTS ML e superior.
  • O modo de acesso deve ser Usuário único ou Sem isolamento compartilhado.

Observação

Atualmente, não há suporte para clusters do Ray na computação sem servidor.

Instalar o Ray

Com o Databricks Runtime ML 15.0 em diante, o Ray vem pré-instalado em clusters do Azure Databricks.

Para runtimes lançados antes da versão 15.0, use o pip para instalar o Ray em seu cluster:

%pip install ray[default]>=2.3.0

Criar um cluster do Ray específico a um usuário em um cluster do Azure Databricks

Para criar um cluster do Ray, use a API ray.util.spark.setup_ray_cluster.

Observação

Quando você cria um cluster Ray em um notebook, ele só está disponível para o usuário do notebook atual. O cluster Ray é desligado automaticamente depois que o notebook é desanexado do cluster ou após 30 minutos de inatividade (nenhuma tarefa foi enviada ao Ray). Se você quer criar um cluster do Ray compartilhado com todos os usuários e não estiver sujeito a um notebook em execução ativa, é melhor usar a API ray.util.spark.setup_global_ray_cluster.

Cluster Ray de tamanho fixo

Em qualquer notebook do Azure Databricks anexado a um cluster do Azure Databricks, você pode executar o seguinte comando para iniciar um cluster do Ray de tamanho fixo:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Dimensionamento automático do cluster do Ray

Para saber como iniciar um cluster do Ray de dimensionamento automático, consulte Dimensionar clusters do Ray no Azure Databricks.

Como iniciar um cluster do Ray no modo global

Usando o Ray 2.9.0 e superior, você pode criar um cluster do Ray de modo global em um cluster do Azure Databricks. Um cluster do Ray de modo global permite que todos os usuários anexados ao cluster do Azure Databricks também usem o cluster do Ray. Esse modo de executar um cluster Ray não tem a funcionalidade de tempo limite ativa que um cluster de usuário único tem ao executar uma instância de cluster Ray de usuário único.

Para iniciar um cluster do Ray global, no qual vários usuários podem anexar e executar tarefas do Ray, comece criando um trabalho de notebook do Azure Databricks, anexe-o a um cluster do Azure Databricks no modo compartilhado e execute o seguinte comando:

from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
  max_worker_nodes=2,
  ...
  # other arguments are the same as with the `setup_global_ray` API.
)

Essa é uma chamada de bloqueio que permanecerá ativa até que você interrompa a chamada clicando no botão “Interromper” na célula de comando do notebook, desanexe o notebook do cluster do Azure Databricks ou encerre o cluster do Azure Databricks. Caso contrário, o cluster Ray do modo global continuará a ser executado e estará disponível para envio de tarefas por usuários autorizados. Para obter mais informações sobre clusters de modo global, consulte Documentação da API do Ray.

Os clusters de modo global têm as seguintes propriedades:

  • Em um cluster do Azure Databricks, você só pode criar um cluster do Ray de modo global ativo por vez.
  • Em um cluster do Azure Databricks, o cluster do Ray do modo global ativo pode ser usado por todos os usuários em qualquer notebook do Azure Databricks anexado. Você pode executar ray.init() para se conectar ao cluster Ray do modo global ativo. Como vários usuários podem acessar esse cluster do Ray, a contenção de recursos pode ser um problema.
  • O cluster Ray do modo global está ativo até que a chamada setup_ray_cluster seja interrompida. Ele não tem um tempo limite de desligamento automático como ocorre nos clusters Ray de usuário único.

Criar um cluster de GPU do Ray

Os recursos de GPU podem ser adicionados ao cluster do Ray da seguinte maneira:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=8,
  num_gpus_per_node=1,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Conectar-se ao cluster do Ray remoto usando o cliente Ray

No Ray versão 2.3.0 e superior, você pode criar um cluster do Ray usando a API setup_ray_cluster e, no mesmo notebook, você pode chamar a API ray.init() para se conectar a esse cluster do Ray. Para obter a cadeia de conexão remota, use o seguinte:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Em seguida, você pode conectar o cluster remoto usando a cadeia de conexão remota acima:

import ray
ray.init(remote_conn_str)

O cliente do Ray não dá suporte à API do conjunto de dados do Ray definida no módulo ray.data. Como solução alternativa, você pode encapsular seu código que chama a API do conjunto de dados do Ray dentro de uma tarefa Ray remota, conforme mostrado no seguinte código:

import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit  --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

Os valores que precisam ser configurados são a URL do workspace do Azure Databricks, que começam com https:// e, em seguida, os valores encontrados após o /driver-proxy/o/ são encontrados na URL do proxy do Painel do Ray exibida após o cluster do Ray ser iniciado.

A CLI do Ray Job é usada para enviar trabalhos de sistemas externos para um cluster do Ray, mas não é necessária para enviar trabalhos em clusters do Ray no Azure Databricks. É recomendável que o trabalho seja implantado usando fluxos de trabalho do Azure Databricks, um cluster do Ray por aplicativo seja criado e as ferramentas existentes do Azure Databricks, como pacotes de ativos do Azure Databricks ou gatilhos de fluxo de trabalho, sejam usadas para disparar o trabalho.

Definir um local de saída de log

Você pode definir o argumento collect_log_to_path para especificar o caminho de destino em que deseja coletar os logs do cluster Ray. A coleção de logs é executada depois que o cluster Ray é desligado.

O Azure Databricks recomenda definir um caminho começando com /dbfs/ ou o caminho volume do Catálogo do Unity para preservar os logs mesmo se você terminar o cluster do Apache Spark. Caso contrário, seus logs não poderão ser recuperados, pois o armazenamento local no cluster é excluído quando o cluster é desligado.

Após criar um cluster do Ray, você poderá executar qualquer código de aplicativo do Ray diretamente no seu notebook. Clique em Abrir painel do cluster do Ray em uma nova guia para exibir o painel do Ray para o cluster.

Habilitar rastreamentos de pilha e gráficos de chama na página Atores do Painel do Ray

Na página Atores do Painel do Ray, você pode exibir rastreamentos de pilha e gráficos de chama para atores do Ray ativos. Para exibir essas informações, use o seguinte comando para instalar o py-spy antes de iniciar o cluster do Ray:

%pip install py-spy

Práticas recomendadas para criar e configurar

Esta seção aborda as práticas recomendadas para criar e configurar clusters do Ray.

Cargas de trabalho que não usem GPU

O cluster do Ray é executado sobre um cluster do Spark do Azure Databricks. Um cenário típico é usar um trabalho do Spark e o UDF do Spark para realizar tarefas simples de pré-processamento de dados que não precisem de recursos de GPU. Em seguida, use o Ray para executar tarefas complicadas de aprendizado de máquina que se beneficiem de GPUs. Nesse caso, o Azure Databricks recomenda definir o parâmetro de configuração de nível de cluster do Apache Spark spark.task.resource.gpu.amount como 0 para que todas as transformações de DataFrame do Apache Spark e execuções de UDF do Apache Spark não usem recursos de GPU.

Os benefícios dessa configuração são os seguintes:

  • Isso aumenta o paralelismo de trabalho do Apache Spark, pois o tipo de instância de GPU geralmente tem muito mais núcleos de CPU do que dispositivos de GPU.
  • Se o cluster do Apache Spark for compartilhado com vários usuários, essa configuração impedirá que trabalhos do Apache Spark concorram por recursos de GPU com cargas de trabalho do Ray em execução simultânea.

Desabilite a integração treinador transformers do MLflow se estiver usando-a em tarefas do Ray

A integração do MLflow do treinador transformers é habilitada por padrão na biblioteca transformers. Se você usar o treinamento do Ray para ajustar um modelo transformers, as tarefas do Ray falharão devido a um problema de credencial. No entanto, esse problema não se aplicará se você usar diretamente o MLflow para treinamento. Para evitar esse problema, você pode definir a variável de ambiente DISABLE_MLFLOW_INTEGRATION como "TRUE" dentro da configuração do cluster do Azure Databricks ao iniciar o cluster do Apache Spark.

Erro de decapagem da função remota do Address Ray

Para executar tarefas, o Ray serializa a função da tarefa. Se houver falha na serialização, você deverá diagnosticar qual parte do código causou a falha. As causas comuns de erros de serialização são o tratamento de referências externas, fechamentos e referências a objetos com estado. Para corrigir rapidamente um dos erros mais fáceis de verificar, mova instruções de importação da declaração de função da tarefa.

Por exemplo, datasets.load_dataset é uma função amplamente usada que é corrigida no lado do driver do Azure Databricks Runtime, renderizando a referência de maneira impossível de serializar. Para resolver isso, basta escrever a função de tarefa da seguinte maneira:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Desabilite o monitor de memória do Ray se a tarefa do Ray for interrompida inesperadamente com erro OOM (memória insuficiente)

No Ray 2.9.3, o monitor de memória do Ray tem vários problemas conhecidos que podem fazer com que as tarefas do Ray sejam interrompidas inadvertidamente sem motivo. Para resolver o problema, você pode desabilitar o monitor de memória do Ray definindo a variável de ambiente RAY_memory_monitor_refresh_ms como 0 na configuração do cluster do Azure Databricks ao iniciar o cluster do Apache Spark.

Ler dados do Spark do Ray

Um caso de uso comum é ler dados de um DataFrame do Spark no Ray para processamento adicional. No Databricks Runtime 15.0 para ML e posteriores, uma função está disponível para simplificar o carregamento dos dados contidos em um DataFrame do Spark diretamente no Ray.

Para usar efetivamente esse recurso, verifique se a configuração do cluster Spark spark.databricks.pyspark.dataFrameChunk.enabled está definida como true antes de executar ray.init() para iniciar o cluster Ray.

import ray.data

source_table = "my_database.my_table"

spark_dataframe = spark.read.table(source_table)
ray_dataset = ray.data.from_spark(spark_dataframe)

Ray buscará o conteúdo do DataFrame do Spark diretamente sem a necessidade de uma gravação temporária dos dados, tornando-os diretamente disponíveis para processamento.

Ler dados do Ray do Spark

Da mesma forma que ler dados do Spark do Ray, há suporte para a capacidade de ler os resultados de uma tarefa do Ray como um DataFrame do Spark usando o Catálogo do Unity. Para usar esse recurso, você deve estar executando o Databricks Runtime 15.0 para ML e superior de dentro de um workspace habilitado para o Catálogo do Unity.

Para usar esse recurso, verifique se a variável de ambiente "_RAY_UC_VOLUMES_FUST_TEMP_DIR" está definida como um caminho de volume de catálogo do Unity válido e acessível, como "/Volumes/MyCatalog/MySchema/MyVolume/MyRayData"

import ray.data

source_table = "my_database.my_table"

spark_dataframe = spark.read.table(source_table)
ray_dataset = ray.data.from_spark(spark_dataframe)

# Write to the specified (via environment variable) UC Volume from Ray
ray_dataset.write_databricks_table()

Em versões do Databricks Runtime anteriores à 15.0 para ML, você pode gravar diretamente em um local do repositório de objetos usando o gravador parquet do Ray, ray_dataset.write_parquet() do módulo ray.data. O Spark pode ler esses dados parquet com leitores nativos.

Aplicando funções de transformação a lotes de dados

Ao processar dados em lotes, é recomendável usar a API de Dados do Ray com a função map_batches. Essa abordagem pode ser mais eficiente e escalonável, especialmente para grandes conjuntos de dados ou cálculos complexos que se beneficiam do processamento em lote. Qualquer DataFrame do Spark pode ser convertido em um conjunto de dados do Ray usando a API ray.data.from_spark. A saída processada da chamada a essa API de transformação pode ser gravada em tabelas UC do Azure Databricks usando a API ray.data.write_databricks_table.

Com usar o MLflow em tarefas do Ray. Para usar o MLflow em tarefas do Ray, você deverá:

  • Definir as credenciais do MLflow do Azure Databricks nas tarefas do Ray.
  • Criar execuções do MLflow no Driver do Apache Spark e passar as run_id criadas para as tarefas do Ray.

O seguinte exemplo de código demonstra como fazer isso:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars( <Databricks>")

experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in <AS> driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in <AS> driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Usar bibliotecas do Python com escopo de notebook ou bibliotecas do Python de cluster em tarefas do Ray

Atualmente, o Ray tem um problema conhecido em que tarefas do Ray não podem usar bibliotecas do Python com escopo de notebook nem bibliotecas do Python de cluster. Para utilizar dependências adicionais em seus trabalhos do Ray, você deve instalar manualmente as bibliotecas usando o comando magic %pip antes de iniciar um cluster do Ray-on-Spark que usará essas dependências nas tarefas. Por exemplo, para atualizar a versão do Ray que será usada para iniciar o cluster do Ray, você pode executar o seguinte comando em seu notebook:

%pip install ray==<The Ray version you want to use> --force-reinstall

Em seguida, execute o seguinte comando em seu notebook para reiniciar o kernel do Python:

dbutils.library.restartPython()

Próximas etapas