Creare e connettersi ai cluster Ray in Azure Databricks
Informazioni su come creare, configurare ed eseguire cluster di calcolo Ray in Azure Databricks
Requisiti
Per creare un cluster Ray, è necessario avere accesso a una risorsa di calcolo all-purpose di Databricks con le impostazioni seguenti:
- Databricks Runtime 12.2 LTS ML e versioni successive.
- La modalità di accesso deve essere Un singolo utente o Nessun isolamento condiviso.
Nota
I cluster Ray non sono attualmente supportati nell'ambiente di calcolo serverless.
Installare Ray
Con Databricks Runtime ML 15.0 e versioni successive, Ray è preinstallato nei cluster di Azure Databricks.
Per i runtime rilasciati prima della versione 15.0, usare pip per installare Ray nel cluster:
%pip install ray[default]>=2.3.0
Creare un cluster Ray specifico dell'utente in un cluster Azure Databricks
Per creare un cluster Ray, usare l'API ray.util.spark.setup_ray_cluster .
Nota
Quando si crea un cluster Ray in un notebook, è disponibile solo per l'utente del notebook corrente. Il cluster Ray viene arrestato automaticamente dopo che il notebook viene scollegato dal cluster o dopo 30 minuti di inattività (nessuna attività è stata inviata a Ray). Se si vuole creare un cluster Ray condiviso con tutti gli utenti e non è soggetto a un notebook in esecuzione attivamente, usare invece l'API ray.util.spark.setup_global_ray_cluster
.
Cluster Ray di dimensioni fisse
In qualsiasi notebook di Azure Databricks collegato a un cluster Azure Databricks, è possibile eseguire il comando seguente per avviare un cluster Ray di dimensioni fisse:
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
num_worker_nodes=2,
num_cpus_per_node=4,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)
Ridimensionamento automatico del cluster Ray
Per informazioni su come avviare un cluster Ray con scalabilità automatica, vedere Ridimensionare i cluster Ray in Azure Databricks.
Avvio di un cluster Ray in modalità globale
Usando Ray 2.9.0 e versioni successive, è possibile creare un cluster Ray in modalità globale in un cluster Azure Databricks. Un cluster Ray in modalità globale consente a tutti gli utenti collegati al cluster Azure Databricks di usare anche il cluster Ray. Questa modalità di esecuzione di un cluster Ray non dispone della funzionalità di timeout attiva che un cluster a utente singolo ha quando esegue un'istanza del cluster Ray a singolo utente.
Per avviare un cluster Ray globale in cui più utenti possono connettersi ed eseguire attività Ray, iniziare creando un processo notebook di Azure Databricks e collegandolo a un cluster Azure Databricks in modalità condivisa, quindi eseguire il comando seguente:
from ray.util.spark import setup_global_ray_cluster
setup_global_ray_cluster(
max_worker_nodes=2,
...
# other arguments are the same as with the `setup_global_ray` API.
)
Si tratta di una chiamata di blocco che rimarrà attiva fino a quando non si interrompe la chiamata facendo clic sul pulsante "Interrupt" nella cella di comando del notebook, scollegando il notebook dal cluster Azure Databricks o terminando il cluster Azure Databricks. In caso contrario, il cluster Ray in modalità globale continuerà a essere eseguito e sarà disponibile per l'invio di attività da parte degli utenti autorizzati. Per altre informazioni sui cluster in modalità globale, vedere La documentazione dell'API Ray.
I cluster in modalità globale hanno le proprietà seguenti:
- In un cluster Azure Databricks è possibile creare un solo cluster Ray in modalità globale attiva alla volta.
- In un cluster Azure Databricks, il cluster Ray in modalità globale attiva può essere usato da tutti gli utenti in qualsiasi notebook di Azure Databricks collegato. È possibile eseguire
ray.init()
per connettersi al cluster Ray in modalità globale attiva. Poiché più utenti possono accedere a questo cluster Ray, la contesa delle risorse potrebbe essere un problema. - Il cluster Ray in modalità globale è attivo fino a quando la
setup_ray_cluster
chiamata non viene interrotta. Non ha un timeout di arresto automatico perché i cluster Ray a utente singolo eseguono.
Creare un cluster RAY GPU
Per i cluster GPU, queste risorse possono essere aggiunte al cluster Ray nel modo seguente:
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
min_worker_nodes=2,
max_worker_nodes=4,
num_cpus_per_node=8,
num_gpus_per_node=1,
num_cpus_head_node=8,
num_gpus_head_node=1,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)
Connettersi al cluster Ray remoto usando il client Ray
In Ray versione 2.3.0 e successive è possibile creare un cluster Ray usando l'API setup_ray_cluster e nello stesso notebook è possibile chiamare ray.init() API per connettersi a questo cluster Ray. Per ottenere il stringa di connessione remoto, usare quanto segue:
from ray.util.spark import setup_ray_cluster
_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)
È quindi possibile connettere il cluster remoto usando il stringa di connessione remoto precedente:
import ray
ray.init(remote_conn_str)
Il client Ray non supporta l'API del set di dati Ray definita nel modulo ray.data. Come soluzione alternativa, è possibile eseguire il wrapping del codice che chiama l'API del set di dati Ray all'interno di un'attività Ray remota, come illustrato nel codice seguente:
import ray
import pandas as pd
# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")
@ray.remote
def ray_data_task():
p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
ds = ray.data.from_pandas(p1)
return ds.repartition(4).to_pandas()
ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI
For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:
``` shell
ray job submit --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py
I valori che devono essere configurati sono l'URL dell'area di lavoro di Azure Databricks, a partire da https://
e quindi i valori trovati dopo che sono stati trovati nell'URL proxy ray dashboard visualizzato dopo /driver-proxy/o/
l'avvio del cluster Ray.
L'interfaccia della riga di comando di Ray Job viene usata per l'invio di processi a un cluster Ray da sistemi esterni, ma non è necessaria per l'invio di processi nei cluster Ray in Azure Databricks. È consigliabile distribuire il processo usando processi di Azure Databricks, un cluster Ray per applicazione da creare e gli strumenti esistenti di Azure Databricks, ad esempio i bundle di asset di Azure Databricks o i trigger del flusso di lavoro, per attivare il processo.
Impostare un percorso di output del log
È possibile impostare l'argomento collect_log_to_path
per specificare il percorso di destinazione in cui si desidera raccogliere i log del cluster Ray. La raccolta di log viene eseguita dopo l'arresto del cluster Ray.
Azure Databricks consiglia di impostare un percorso che inizia con /dbfs/
o il percorso del volume del catalogo Unity per mantenere i log anche se si termina il cluster Apache Spark. In caso contrario, i log non sono recuperabili perché l'archiviazione locale nel cluster viene eliminata quando il cluster viene arrestato.
Dopo aver creato un cluster Ray, è possibile eseguire qualsiasi codice dell'applicazione Ray direttamente nel notebook. Fare clic su Apri dashboard cluster Ray in una nuova scheda per visualizzare il dashboard Ray per il cluster.
Abilitare le tracce dello stack e i grafici di fiamma nella pagina Ray Dashboard Actors
Nella pagina Ray Dashboard Actors è possibile visualizzare le tracce dello stack e i grafici di fiamma per gli attori Ray attivi. Per visualizzare queste informazioni, usare il comando seguente per installare py-spy prima di avviare il cluster Ray:
%pip install py-spy
Creare e configurare le procedure consigliate
Questa sezione illustra le procedure consigliate per la creazione e la configurazione di cluster Ray.
Carichi di lavoro non GPU
Il cluster Ray viene eseguito su un cluster Spark di Azure Databricks. Uno scenario tipico consiste nell'usare un processo Spark e una funzione definita dall'utente Spark per eseguire semplici attività di pre-elaborazione dei dati che non necessitano di risorse GPU. Usare quindi Ray per eseguire attività complesse di Machine Learning che traggono vantaggio dalle GPU. In questo caso, Azure Databricks consiglia di impostare il parametro di configurazione a livello di cluster Apache Spark spark.task.resource.gpu.amount su 0 in modo che tutte le trasformazioni dei dataframe Apache Spark e le esecuzioni UDF di Apache Spark non usino risorse GPU.
I vantaggi di questa configurazione sono i seguenti:
- Aumenta il parallelismo del processo Apache Spark perché il tipo di istanza GPU ha in genere più core CPU rispetto ai dispositivi GPU.
- Se il cluster Apache Spark viene condiviso con più utenti, questa configurazione impedisce ai processi Apache Spark di competere per le risorse GPU con carichi di lavoro Ray contemporaneamente in esecuzione.
Disabilitare l'integrazione MLflow del formatore transformers
se viene usata nelle attività Ray
L'integrazione transformers
MLflow del formatore è abilitata per impostazione predefinita dall'interno della transformers
libreria. Se si usa ray train per ottimizzare un transformers
modello, le attività Ray avranno esito negativo a causa di un problema di credenziali. Tuttavia, questo problema non si applica se si usa direttamente MLflow per il training.
Per evitare questo problema, è possibile impostare la DISABLE_MLFLOW_INTEGRATION
variabile di ambiente su "TRUE" dalla configurazione del cluster Azure Databricks all'avvio del cluster Apache Spark.
Errore di selezione della funzione remota Ray
Per eseguire attività Ray, Ray seleziona la funzione attività. Se si rileva un errore di selezione, è necessario diagnosticare quale parte del codice causa l'errore. Le cause comuni degli errori di selezione sono la gestione di riferimenti, chiusura e riferimenti esterni a oggetti con stato. Uno degli errori più semplici da verificare e correggere rapidamente può essere risolto spostando le istruzioni import all'interno della dichiarazione della funzione di attività.
Ad esempio, datasets.load_dataset
è una funzione ampiamente usata con patch sul lato driver di Azure Databricks Runtime, che esegue il rendering del riferimento senza selezione. Per risolverlo, è sufficiente scrivere la funzione task come indicato di seguito:
def ray_task_func():
from datasets import load_dataset # import the function inside task function
...
Disabilitare il monitoraggio della memoria Ray se l'attività Ray viene terminata in modo imprevisto con un errore di memoria insufficiente
In Ray 2.9.3, il monitoraggio della memoria Ray presenta diversi problemi noti che possono causare l'arresto accidentale delle attività Ray senza causa.
Per risolvere il problema, è possibile disabilitare il monitoraggio della memoria Ray impostando la variabile RAY_memory_monitor_refresh_ms
di ambiente su 0
all'interno della configurazione del cluster Azure Databricks all'avvio del cluster Apache Spark.
Applicazione di funzioni di trasformazione a batch di dati
Quando si elaborano dati in batch, è consigliabile usare l'API Ray Data con la map_batches
funzione . Questo approccio può essere più efficiente e scalabile, soprattutto per set di dati di grandi dimensioni o calcoli complessi che traggono vantaggio dall'elaborazione batch. Qualsiasi dataframe Spark può essere convertito in un set di dati Ray usando l'API ray.data.from_spark
. L'output elaborato dalla chiamata a questa API di trasformazione può essere scritto nelle tabelle UC di Azure Databricks usando l'API ray.data.write_databricks_table
.
Uso di MLflow nelle attività Ray
Per usare MLflow nelle attività Ray, è necessario :
- Definire le credenziali MLflow di Azure Databricks all'interno delle attività Ray.
- Creare esecuzioni MLflow all'interno del driver Apache Spark e passare il creato
run_id
alle attività Ray.
Nell'esempio di codice seguente viene illustrato come eseguire questa operazione:
import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(experiment_name)
# We need to use the run created in <AS> driver side,
# and set `nested=True` to make it a nested run inside the
# parent run.
with mlflow.start_run(run_id=run_id, nested=True):
mlflow.log_metric(f"task_{x}_metric", x)
return x
with mlflow.start_run() as run: # create MLflow run in <AS> driver side.
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Usare librerie Python con ambito notebook o librerie Python con ambito notebook nelle attività Ray
Attualmente, Ray presenta un problema noto in cui le attività Ray non possono usare librerie Python con ambito notebook o librerie Python cluster. Per usare dipendenze aggiuntive all'interno dei processi Ray, è necessario installare manualmente le librerie usando il %pip
comando magic prima di avviare un cluster Ray-on-Spark che userà queste dipendenze all'interno delle attività. Ad esempio, per aggiornare la versione di Ray che verrà usata per avviare il cluster Ray, è possibile eseguire il comando seguente nel notebook:
%pip install ray==<The Ray version you want to use> --force-reinstall
Eseguire quindi il comando seguente nel notebook per riavviare il kernel Python:
dbutils.library.restartPython()