Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Informazioni su come creare, configurare ed eseguire cluster di calcolo Ray in Azure Databricks
Requisiti
Per creare un cluster Ray, è necessario avere accesso a una risorsa di calcolo all-purpose di Databricks con le impostazioni seguenti:
- Databricks Runtime 12.2 LTS ML e versioni successive.
- Dedicato (precedentemente utente singolo) o modalità di accesso condiviso senza isolamento:
Nota
I cluster Ray non sono attualmente supportati nell'ambiente di calcolo serverless.
Installare Ray
Con Databricks Runtime ML 15.0 e versioni successive, Ray è preinstallato nei cluster di Azure Databricks.
Per i runtime rilasciati prima della versione 15.0, usare pip per installare Ray nel cluster:
%pip install ray[default]>=2.3.0
Creare un cluster Ray specifico dell'utente in un cluster Azure Databricks
Per creare un cluster Ray, usare l'API ray.util.spark.setup_ray_cluster .
Nota
Quando si crea un cluster Ray in un notebook, è disponibile solo per l'utente del notebook corrente. Il cluster Ray viene arrestato automaticamente dopo che il notebook viene scollegato dal cluster o dopo 30 minuti di inattività (nessuna attività è stata inviata a Ray). Se si vuole creare un cluster Ray condiviso con tutti gli utenti e non è soggetto a un notebook in esecuzione attivamente, usare invece l'API ray.util.spark.setup_global_ray_cluster .
Cluster Ray di dimensioni fisse
In qualsiasi notebook di Azure Databricks collegato a un cluster Azure Databricks, è possibile eseguire il comando seguente per avviare un cluster Ray di dimensioni fisse:
import ray
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
max_worker_nodes=1,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)
Ridimensionamento automatico del cluster Ray
Per informazioni su come avviare un cluster Ray con scalabilità automatica, vedere Ridimensionare i cluster Ray in Azure Databricks.
Avvio di un cluster Ray in modalità globale
Usando Ray 2.9.0 e versioni successive, è possibile creare un cluster Ray in modalità globale in un cluster Azure Databricks. Un cluster Ray in modalità globale consente a tutti gli utenti collegati alla risorsa di calcolo di Azure Databricks di usare anche il cluster Ray. Questa modalità di esecuzione di un cluster Ray non ha la funzionalità di timeout attiva che una risorsa di calcolo dedicata ha quando esegue una singola istanza del cluster Ray utente.
Per avviare un cluster Ray globale in cui più utenti possono connettersi ed eseguire attività Ray, iniziare creando un processo notebook di Azure Databricks e collegandolo a un cluster Azure Databricks in modalità condivisa, quindi eseguire il comando seguente:
from ray.util.spark import setup_global_ray_cluster
setup_global_ray_cluster(
max_worker_nodes=2,
...
# other arguments are the same as with the `setup_global_ray` API.
)
Si tratta di una chiamata di blocco che rimarrà attiva fino a quando non si interrompe la chiamata facendo clic sul pulsante "Interrupt" nella cella di comando del notebook, scollegando il notebook dal cluster Azure Databricks o terminando il cluster Azure Databricks. In caso contrario, il cluster Ray in modalità globale continuerà a essere eseguito e sarà disponibile per l'invio di attività da parte degli utenti autorizzati. Per altre informazioni sui cluster in modalità globale, vedere La documentazione dell'API Ray.
I cluster in modalità globale hanno le proprietà seguenti:
- In un cluster Azure Databricks è possibile creare un solo cluster Ray in modalità globale attiva alla volta.
- In un cluster Azure Databricks, il cluster Ray in modalità globale attiva può essere usato da tutti gli utenti in qualsiasi notebook di Azure Databricks collegato. È possibile eseguire
ray.init()per connettersi al cluster Ray in modalità globale attiva. Poiché più utenti possono accedere a questo cluster Ray, la contesa delle risorse potrebbe essere un problema. - Il cluster Ray in modalità globale rimane attivo fino all'interruzione della chiamata
setup_ray_cluster. Non ha un timeout di arresto automatico, a differenza dei cluster Ray per utente singolo.
Creare un cluster Ray GPU
Per i cluster GPU, queste risorse possono essere aggiunte al cluster Ray nel modo seguente:
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
min_worker_nodes=2,
max_worker_nodes=4,
num_cpus_per_node=8,
num_gpus_per_node=1,
num_cpus_head_node=8,
num_gpus_head_node=1,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)
Connettersi al cluster Ray remoto usando il client Ray
In Ray versione 2.3.0 e successive è possibile creare un cluster Ray usando l'API setup_ray_cluster e nello stesso notebook è possibile chiamare ray.init() API per connettersi a questo cluster Ray. Per ottenere la stringa di connessione remota, usare quanto segue:
from ray.util.spark import setup_ray_cluster
_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)
È quindi possibile connettere il cluster remoto usando la stringa di connessione remota precedentemente menzionata.
import ray
ray.init(remote_conn_str)
Il client Ray non supporta l'API del set di dati Ray definita nel modulo ray.data. Come soluzione alternativa, è possibile eseguire il wrapping del codice che chiama l'API del set di dati Ray all'interno di un'attività Ray remota, come illustrato nel codice seguente:
import ray
import pandas as pd
# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")
@ray.remote
def ray_data_task():
p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
ds = ray.data.from_pandas(p1)
return ds.repartition(4).to_pandas()
ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI
For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:
``` shell
ray job submit --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py
I valori che devono essere configurati sono l'URL dell'area di lavoro di Azure Databricks, a partire da https://e quindi i valori trovati dopo l'/driver-proxy/o/ sono disponibili nell'URL proxy di Ray Dashboard visualizzato dopo l'avvio del cluster Ray.
L'interfaccia della riga di comando di Ray Job viene usata per l'invio di processi a un cluster Ray da sistemi esterni, ma non è necessaria per l'invio di processi nei cluster Ray in Azure Databricks. È consigliabile distribuire il processo usando i processi Lakeflow, un cluster Ray per applicazione da creare e gli strumenti esistenti di Azure Databricks, ad esempio i bundle di asset di Azure Databricks o i trigger del flusso di lavoro, per attivare il processo.
Impostare un percorso di output del log
È possibile impostare l'argomento collect_log_to_path per specificare il percorso di destinazione in cui raccogliere i log del cluster Ray. La raccolta di log viene eseguita dopo l'arresto del cluster Ray.
Azure Databricks consiglia di impostare un percorso che inizi con /dbfs/ o con il percorso del volume del catalogo Unity per preservare i log anche se si termina il cluster Apache Spark. In caso contrario, i log non sono recuperabili perché l'archiviazione locale nel cluster viene eliminata quando il cluster viene arrestato.
Dopo aver creato un cluster Ray, è possibile eseguire qualsiasi codice dell'applicazione Ray direttamente nel notebook. Fare clic su Apri dashboard cluster Ray in una nuova scheda per visualizzare il dashboard Ray per il cluster.
Abilitare le tracce dello stack e i grafici di fiamma nella pagina Ray Dashboard Actors
Nella pagina degli Attori della Dashboard di Ray, è possibile visualizzare le stack trace e i grafici a fiamma per gli attori attivi di Ray. Per visualizzare queste informazioni, usare il comando seguente per installare py-spy prima di avviare il cluster Ray:
%pip install py-spy
Creare e configurare le procedure consigliate
Questa sezione illustra le procedure consigliate per la creazione e la configurazione di cluster Ray.
Carichi di lavoro non GPU
Il cluster Ray viene eseguito su un cluster Spark di Azure Databricks. Uno scenario tipico consiste nell'usare un job Spark e una UDF Spark per eseguire semplici attività di pre-elaborazione dei dati che non necessitano di risorse GPU. Usare quindi Ray per eseguire attività complesse di Machine Learning che traggono vantaggio dalle GPU. In questo caso, Azure Databricks consiglia di impostare il parametro di configurazione a livello di cluster Apache Spark spark.task.resource.gpu.amount su 0 in modo che tutte le trasformazioni dei dataframe Apache Spark e le esecuzioni UDF di Apache Spark non usino risorse GPU.
I vantaggi di questa configurazione sono i seguenti:
- Aumenta il parallelismo del processo Apache Spark perché il tipo di istanza GPU ha in genere più core CPU rispetto ai dispositivi GPU.
- Se il cluster Apache Spark viene condiviso con più utenti, questa configurazione impedisce ai processi Apache Spark di competere per le risorse GPU con carichi di lavoro Ray contemporaneamente in esecuzione.
Disabilitare l'integrazione MLflow del trainer transformers se la si usa nelle attività Ray
L'integrazione con MLflow dell'addestratore è abilitata di default all'interno della libreria transformers. Se si usa ray train per ottimizzare un transformers modello, le attività Ray avranno esito negativo a causa di un problema di credenziali. Tuttavia, questo problema non si applica se si usa direttamente MLflow per il training.
Per evitare questo problema, è possibile impostare la DISABLE_MLFLOW_INTEGRATION variabile di ambiente su "TRUE" dalla configurazione del cluster Azure Databricks all'avvio del cluster Apache Spark.
Errore di selezione della funzione remota Ray
Per eseguire attività Ray, Ray seleziona la funzione attività. Se si riscontra un errore di pickling, è necessario individuare quale parte del codice causa l'errore. Le cause comuni degli errori di selezione sono la gestione di riferimenti, chiusura e riferimenti esterni a oggetti con stato. Uno degli errori più semplici da verificare e correggere rapidamente può essere risolto spostando le istruzioni import all'interno della dichiarazione della funzione di attività.
Ad esempio, datasets.load_dataset è una funzione ampiamente usata modificata sul lato driver di Azure Databricks Runtime, rendendo il riferimento non deserializzabile. Per risolverlo, è sufficiente scrivere la funzione task come indicato di seguito:
def ray_task_func():
from datasets import load_dataset # import the function inside task function
...
Disabilitare il monitor della memoria di Ray se il task di Ray viene inaspettatamente terminato a causa di un errore di esaurimento della memoria.
In Ray 2.9.3, il monitoraggio della memoria Ray presenta diversi problemi noti che possono causare l'arresto accidentale delle attività Ray senza causa.
Per risolvere il problema, è possibile disabilitare il monitoraggio della memoria Ray impostando la variabile RAY_memory_monitor_refresh_ms di ambiente su 0 all'interno della configurazione del cluster Azure Databricks all'avvio del cluster Apache Spark.
Applicazione di funzioni di trasformazione a batch di dati
Quando si elaborano dati in batch, è consigliabile usare l'API Ray Data con la map_batches funzione . Questo approccio può essere più efficiente e scalabile, soprattutto per set di dati di grandi dimensioni o calcoli complessi che traggono vantaggio dall'elaborazione batch. Qualsiasi dataframe Spark può essere convertito in un set di dati Ray usando l'API ray.data.from_spark . L'output elaborato dalla chiamata a questa API di trasformazione può essere scritto nelle tabelle UC di Azure Databricks usando l'API ray.data.write_databricks_table.
Uso di MLflow in Ray tuner, Ray train o attività personalizzate Ray
L'integrazione di Databricks MLflow e Ray richiede Ray 2.41 e versioni successive.
Per utilizzare MLflow con Ray Tune, Ray Train o compiti Ray personalizzati, impostare le seguenti variabili di ambiente: DATABRICKS_HOST e DATABRICKS_TOKEN, oppure impostare le variabili di ambiente DATABRICKS_HOST, DATABRICKS_CLIENT_IDe DATABRICKS_CLIENT_SECRET prima di chiamare ray.util.spark.setup_ray_cluster. Il codice seguente illustra come impostare queste variabili.
import os
from ray.util.spark import setup_ray_cluster
os.environ["DATABRICKS_HOST"] = "https://....databricks.com"
os.environ["DATABRICKS_TOKEN"] = "<your PAT token"
setup_ray_cluster(num_cpus_worker_node=2, num_gpus_worker_node=0, max_worker_nodes=1, min_worker_nodes=1)
Usare librerie Python con ambito notebook o librerie Python di cluster nelle attività Ray
L'uso di librerie Python a livello di notebook o di cluster nelle attività Ray remote richiede Ray 2.12 e versioni successive.
Le versioni ray 2.11 e successive presentano un problema noto per cui le attività Ray non possono usare librerie Python con ambito notebook o librerie Python cluster. Per le versioni di Ray 2.11 e inferiori, bisogna preinstallare le dipendenze aggiuntive all'interno della sessione attiva usando il comando magico %pip prima di avviare il cluster Ray.