次の方法で共有


Azure Databricks で Ray クラスターをスケーリングする

自動スケーリング、ヘッド ノード構成、異種クラスター、リソース割り当てなど、最適なパフォーマンスを得るために Ray クラスターのサイズを調整する方法について説明します。

自動スケール モードで Ray クラスターを作成する

Ray 2.8.0 以降では、Azure Databricks 上で開始された Ray クラスターで、Azure Databricks 自動スケーリングとの統合がサポートされます。 この自動スケーリングとの統合により、Azure Databricks 環境内で内部的に Azure Databricks クラスターの自動スケーリングがトリガーされます。

自動スケールを有効にするには、次のコマンドを実行します。

2.10 より以前のバージョンの Ray の場合:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
)

2.10 以降のバージョンの Ray の場合:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

ray.util.spark.setup_ray_cluster API は、Apache Spark 上に Ray クラスターを作成します。 内部的には、バックグラウンド Apache Spark ジョブが作成されます。 ジョブ内の各 Apache Spark タスクによって Ray ワーカー ノードが作成され、Ray ヘッド ノードがドライバー上に作成されます。 引数 min_worker_nodesmax_worker_nodes は、Ray ワークロード用に作成して使用する Ray ワーカー ノードの範囲を表します。 引数 min_worker_nodes が未定義のままの場合は、max_worker_nodes 個のワーカーを使用できる固定サイズの Ray クラスターが開始されます。 各 Ray ワーカー ノードに割り当てられた CPU コアまたは GPU コアの数を指定するには、引数 num_cpus_worker_node (既定値: 1) または num_gpus_worker_node (既定値: 0) を設定します。

2.10 より以前のバージョンの Ray では、自動スケーリングが有効な場合、num_worker_nodes は Ray ワーカー ノードの最大数を示します。 Ray ワーカー ノードの既定の最小数は 0 です。 この既定の設定は、Ray クラスターがアイドル状態の場合、Ray ワーカー ノードが 0 にスケールダウンすることを意味します。 これは、すべてのシナリオで高速な応答性を実現するには理想的ではない可能性がありますが、有効にするとコストを大幅に削減できます。

自動スケーリング モードでは、num_worker_nodes を ray.util.spark.MAX_NUM_WORKER_NODES に設定することはできません。

次の引数は、アップスケーリングとダウンスケーリングの速度を構成します。

  • autoscale_upscaling_speed は、現在のノード数の倍数として保留を許可するノード数を表します。 値が大きいほど、より積極的なアップスケーリングになります。 たとえば、これを 1.0 に設定すると、クラスターはいつでも最大 100% までサイズを大きくすることができます。
  • autoscale_idle_timeout_minutes は、アイドル状態のワーカー ノードがオートスケーラーによって削除されるまでに必要な経過時間 (分) を表します。 値が小さいほど、より積極的なダウンスケーリングになります。

Ray 2.9.0 以降では、autoscale_min_worker_nodes を設定して、Ray クラスターがアイドル状態のときにワーカーが 0 にスケールダウンされるのを防ぐこともできます。ワーカーが 0 になると、クラスターが終了します。

Ray ヘッド ノードで使用されるリソースを構成する

既定では、Ray on Spark 構成に対して、Azure Databricks は Ray ヘッド ノードに割り当てられたリソースを次のように制限します。

  • 0 CPU コア
  • 0 GPU
  • 128 MB ヒープ メモリ
  • 128 MB オブジェクト ストア メモリ

これは、Ray ヘッド ノードが通常、Ray タスクの実行ではなく、グローバルな調整にのみ使用されるためです。 Apache Spark ドライバー ノードのリソースは複数のユーザーで共有されるため、既定の設定によって、Apache Spark ドライバー側のリソースを節約できます。 Ray 2.8.0 以上では、Ray ヘッド ノードで使用されるリソースを構成できます。 setup_ray_cluster API で次の引数を使用します。

  • num_cpus_head_node: Ray ヘッド ノードで使用される CPU コアの設定
  • num_gpus_head_node: Ray ヘッド ノードで使用される GPU の設定
  • object_store_memory_head_node: Ray ヘッド ノードによるオブジェクト ストアのメモリ サイズの設定

異種クラスターのサポート

より効率的で費用効果の高いトレーニングを実行するために Ray on Spark クラスターを作成し、Ray ヘッド ノードと Ray ワーカー ノードの間で異なる構成を設定できます。 ただし、Ray ワーカー ノードはすべて同じ構成である必要があります。 Azure Databricks クラスターは異種クラスターを完全にサポートしているわけではありませんが、クラスター ポリシーを設定すると、ドライバーとワーカーのインスタンスの種類が異なる Azure Databricks クラスターを作成できます。 次に例を示します。

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

Ray クラスター構成を調整する

各 Ray ワーカー ノードに推奨される構成は、Ray ワーカー ノードあたり最小 4 CPU コアです。 Ray ワーカー ノードあたり 10 GB 以上のヒープ メモリ。

そのため、ray.util.spark.setup_ray_cluster を呼び出す場合、Azure Databricks では、num_cpus_per_node を 4 以上の値に設定することをお勧めします。

各 Ray ワーカー ノードのヒープ メモリの調整に関する詳細については、次のセクションを参照してください。

Ray ワーカー ノードのメモリ割り当て

各 Ray ワーカー ノードは、ヒープ メモリとオブジェクト ストア メモリの 2 種類のメモリを使用します。

各種類に割り当てられるメモリ サイズは、以下に示すように決定されます。

各 Ray ワーカー ノードに割り当てられる合計メモリは RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8) です。

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES は、Apache Spark ワーカー ノード上で起動できる Ray ワーカー ノードの最大数です。 これは引数 num_cpus_per_node または num_gpus_per_node によって決定されます。

引数 object_store_memory_per_node を設定しない場合、各 Ray ワーカー ノードに割り当てられるヒープ メモリ サイズとオブジェクト ストア メモリ サイズは、RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3 です。

引数 object_store_memory_per_node を設定する場合は、RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node です。

さらに、Ray ワーカー ノードあたりのオブジェクト ストア メモリ サイズは、オペレーティング システムの共有メモリによっても制限されます。 最大値は OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8) です。

SPARK_WORKER_NODE_OS_SHARED_MEMORY は、Apache Spark ワーカー ノード用に構成された /dev/shm ディスクのサイズです。

スケーリングのベスト プラクティス

Ray ワーカー ノードごとに CPU と GPU の数を設定する

引数 num_cpus_worker_node を Apache Spark ワーカー ノードあたりの CPU コア数に設定することをお勧めします。 同様に、num_gpus_worker_node を Apache Spark ワーカー ノードあたりの GPU 数に設定するのが最適です。 この構成では、各 Apache Spark ワーカー ノードによって、各 Apache Spark ワーカー ノードのリソースを完全に利用する 1 つの Ray ワーカー ノードが起動されます。

Apache Spark クラスターを起動するときに、Azure Databricks クラスター構成内で RAY_memory_monitor_refresh_ms 環境変数を 0 に設定します。

Apache Spark と Ray のハイブリッド ワークロード用のメモリ リソース構成

Azure Databricks クラスターで Spark と Ray のハイブリッド ワークロードを実行する場合、Azure Databricks では、Spark Executor メモリを小さい値に削減することをお勧めします。 たとえば、Azure Databricks クラスター構成で spark.executor.memory 4g を設定します。

Apache Spark Executor は GC を遅延トリガーする Java プロセスであり、Apache Spark データセット キャッシュによって大量の Apache Spark Executor メモリが使用されます。 これにより、Ray で使用できる使用可能なメモリが減少します。 メモリ不足エラーの可能性を回避するには、spark.executor.memory の構成を削減します。

Apache Spark と Ray のハイブリッド ワークロード用の計算リソース構成

Azure Databricks クラスターで Spark と Ray のハイブリッド ワークロードを実行する場合、クラスター ノードまたは Ray ワーカー ノードを自動スケーリング可能にすることをお勧めします。 次に例を示します。

Azure Databricks クラスターを起動するために使用できるワーカー ノード数が固定されている場合、Ray-on-Spark の自動スケーリングを有効にすることをお勧めします。 Ray ワークロードが実行されていない場合、Ray クラスターはスケールダウンされます。これにより、Apache Spark タスクで使用できるようにリソースを解放することができます。 Apache Spark タスクが完了し、Ray が再び使用されると、Ray-on-Spark クラスターは需要に合わせて再びスケールアップされます。

さらに、Azure Databricks クラスターと Ray-on-Spark クラスターを自動スケーリング可能にすることもできます。 たとえば、Azure Databricks クラスターの自動スケーリング可能ノードを最大 10 ノードに構成し、Ray-on-Spark ワーカー ノードを最大 4 ノードに構成し、Apache Spark ワーカーごとに 1 つの Ray ワーカー ノードが動作するように設定した場合、Ray ワークロードは、このようなクラスター構成で最大 4 つのノードのリソースを使用できます。 これに対し、Apache Spark ジョブは、最大 6 ノード相当のリソースを割り当てることができます。