استخدام Ray على Azure Databricks

باستخدام Ray 2.3.0 وما فوق، يمكنك إنشاء مجموعات Ray وتشغيل تطبيقات Ray على مجموعات Apache Spark باستخدام Azure Databricks. للحصول على معلومات حول البدء في التعلم الآلي على Ray، بما في ذلك البرامج التعليمية والأمثلة ، راجع وثائق Ray. لمزيد من المعلومات حول تكامل Ray وApache Spark، راجع وثائق Ray on Spark API.

المتطلبات

  • Databricks Runtime 12.2 LTS ML وما فوق.
  • يجب أن يكون وضع الوصول إلى نظام مجموعة Databricks Runtime إما وضع "معين" أو وضع "لا يوجد عزل مشترك".

تثبيت Ray

استخدم الأمر التالي لتثبيت Ray. الملحق [default] مطلوب من قبل مكون لوحة معلومات Ray.

%pip install ray[default]>=2.3.0

إنشاء مجموعة Ray خاصة بالمستخدم في مجموعة Databricks

لإنشاء مجموعة Ray، استخدم واجهة برمجة تطبيقات ray.util.spark.setup_ray_cluster .

في أي دفتر ملاحظات Databricks مرفق بمجموعة Databricks، يمكنك تشغيل الأمر التالي:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_worker_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

ray.util.spark.setup_ray_cluster تنشئ واجهة برمجة التطبيقات مجموعة Ray على Spark. داخليا، فإنه ينشئ وظيفة Spark في الخلفية. تنشئ كل مهمة Spark في الوظيفة عقدة عامل Ray، ويتم إنشاء عقدة رأس Ray على برنامج التشغيل. تمثل الوسيطة num_worker_nodes عدد عقد عامل Ray المراد إنشاؤها. لتحديد عدد مراكز وحدة المعالجة المركزية أو وحدة معالجة الرسومات المعينة لكل عقدة عامل Ray، قم بتعيين الوسيطة num_cpus_worker_node (القيمة الافتراضية: 1) أو num_gpus_worker_node (القيمة الافتراضية: 0).

بعد إنشاء مجموعة Ray، يمكنك تشغيل أي تعليمة برمجية لتطبيق Ray مباشرة في دفتر ملاحظاتك. انقر فوق Open Ray Cluster Dashboard في علامة تبويب جديدة لعرض لوحة معلومات Ray للمجموعة.

تلميح

إذا كنت تستخدم نظام مجموعة مستخدم واحد في Azure Databricks، يمكنك تعيين num_worker_nodes لاستخدام ray.util.spark.MAX_NUM_WORKER_NODES جميع الموارد المتوفرة لمجموعة Ray الخاصة بك.

setup_ray_cluster(
  # ...
  num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

قم بتعيين الوسيطة collect_log_to_path لتحديد مسار الوجهة حيث تريد جمع سجلات نظام مجموعة Ray. يتم تشغيل مجموعة السجل بعد إيقاف تشغيل مجموعة Ray. توصي Databricks بتعيين مسار يبدأ ب /dbfs/ بحيث يتم الاحتفاظ بالسجلات حتى إذا قمت بإنهاء مجموعة Spark. وإلا، فإن سجلاتك غير قابلة للاسترداد نظرا لحذف التخزين المحلي على نظام المجموعة عند إيقاف تشغيل نظام المجموعة.

إشعار

"لجعل تطبيق Ray الخاص بك يستخدم تلقائيا مجموعة Ray التي تم إنشاؤها، استدع ray.util.spark.setup_ray_cluster لتعيين RAY_ADDRESS متغير البيئة إلى عنوان مجموعة Ray." يمكنك تحديد عنوان نظام مجموعة بديل باستخدام وسيطة addressواجهة برمجة تطبيقات ray.init .

تشغيل تطبيق Ray

بعد إنشاء نظام مجموعة Ray، يمكنك تشغيل أي تعليمة برمجية لتطبيق Ray في دفتر ملاحظات Azure Databricks.

هام

توصي Databricks بتثبيت أي مكتبات ضرورية لتطبيقك مع %pip install <your-library-dependency> للتأكد من أنها متاحة لمجموعة Ray والتطبيق وفقا لذلك. يؤدي تحديد التبعيات في استدعاء دالة Ray init إلى تثبيت التبعيات في موقع لا يمكن الوصول إليه لعقد عامل Spark، ما يؤدي إلى عدم توافق الإصدار وأخطاء الاستيراد.

على سبيل المثال، يمكنك تشغيل تطبيق Ray بسيط في دفتر ملاحظات Azure Databricks كما يلي:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

إنشاء نظام مجموعة Ray في وضع التحجيم التلقائي

في Ray 2.8.0 وما فوق، بدأت مجموعات Ray على Databricks بدعم التكامل مع التحجيم التلقائي ل Databricks. راجع التحجيم التلقائي لنظام مجموعة Databricks.

باستخدام Ray 2.8.0 وما فوق، يمكنك إنشاء مجموعة Ray على مجموعة Databricks التي تدعم التحجيم لأعلى أو لأسفل وفقا لأحمال العمل. يؤدي تكامل التحجيم التلقائي هذا إلى تشغيل التحجيم التلقائي لنظام مجموعة Databricks داخليا داخل بيئة Databricks.

لتمكين التحجيم التلقائي، قم بتشغيل الأمر التالي:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
  ... # other arguments
)

إذا تم تمكين التحجيم التلقائي، num_worker_nodes يشير إلى الحد الأقصى لعدد عقد عامل Ray. الحد الأدنى الافتراضي لعدد عقد عامل Ray هو 0. يعني هذا الإعداد الافتراضي أنه عندما تكون مجموعة Ray خاملة، فإنها تتدرج إلى صفر عقد عامل Ray. قد لا يكون هذا مثاليا للاستجابة السريعة في جميع السيناريوهات، ولكن عند تمكينه، يمكن أن يقلل التكاليف بشكل كبير.

في وضع التحجيم التلقائي، num_worker_nodes لا يمكن تعيين إلى ray.util.spark.MAX_NUM_WORKER_NODES.

تكون الوسيطات التالية سرعة التحجيم والتحجيم الأدنى:

  • autoscale_upscaling_speed يمثل عدد العقد المسموح لها أن تكون معلقة كمضاعف للعدد الحالي للعقد. كلما ارتفعت القيمة، زادت قوة التوسع. على سبيل المثال، إذا تم تعيين هذا إلى 1.0، يمكن أن يزيد حجم نظام المجموعة بنسبة 100٪ على الأكثر في أي وقت.
  • autoscale_idle_timeout_minutes يمثل عدد الدقائق التي تحتاج إلى المرور قبل إزالة عقدة عامل خامدة بواسطة التحجيم التلقائي. كلما كانت القيمة أصغر، زاد الحدة في التحجيم.

باستخدام Ray 2.9.0 وما فوق، يمكنك أيضا تعيين autoscale_min_worker_nodes لمنع مجموعة Ray من تقليص حجمها إلى صفر عمال عندما تكون مجموعة Ray الخامة.

الاتصال إلى مجموعة Ray البعيدة باستخدام عميل Ray

في Ray 2.9.3، أنشئ مجموعة Ray عن طريق استدعاء setup_ray_cluster واجهة برمجة التطبيقات. في نفس دفتر الملاحظات، اتصل بواجهة ray.init() برمجة التطبيقات للاتصال بمجموعة Ray هذه.

بالنسبة لمجموعة Ray غير الموجودة في الوضع العمومي، احصل على سلسلة الاتصال البعيد باستخدام التعليمات البرمجية التالية:

للحصول على سلسلة الاتصال البعيد باستخدام ما يلي:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

الاتصال إلى نظام المجموعة البعيد باستخدام هذا سلسلة الاتصال البعيد:

import ray
ray.init(remote_conn_str)

لا يدعم عميل Ray واجهة برمجة تطبيقات مجموعة بيانات Ray المحددة في الوحدة النمطية ray.data . كحل بديل، يمكنك تضمين التعليمات البرمجية التي تستدعي واجهة برمجة تطبيقات مجموعة بيانات Ray داخل مهمة Ray عن بعد، كما هو موضح في التعليمات البرمجية التالية:

import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())

تحميل البيانات من Spark DataFrame

لتحميل Spark DataFrame كمجموعة بيانات Ray، أولا، يجب حفظ Spark DataFrame إلى وحدات تخزين UC أو Databricks Filesystem (مهمل) بتنسيق Parquet. للتحكم في الوصول إلى Databricks Filesystem بشكل آمن، توصي Databricks بتحميل تخزين كائن السحابة إلى DBFS. بعد ذلك، يمكنك إنشاء مثيل ray.data.Dataset من مسار Spark DataFrame المحفوظ باستخدام أسلوب المساعد التالي:

import ray
import os
from urllib.parse import urlparse

def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

تحميل البيانات من جدول كتالوج Unity من خلال مستودع Databricks SQL

بالنسبة إلى Ray 2.8.0 والإصدارات الأحدث، يمكنك استدعاء ray.data.read_databricks_tables واجهة برمجة التطبيقات لتحميل البيانات من جدول كتالوج Databricks Unity.

أولا، تحتاج إلى تعيين DATABRICKS_TOKEN متغير البيئة إلى رمز الوصول إلى مستودع Databricks. إذا كنت لا تقوم بتشغيل البرنامج على Databricks Runtime، فقم بتعيين DATABRICKS_HOST متغير البيئة إلى عنوان URL لمساحة عمل Databricks، كما هو موضح في ما يلي:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

ثم استدع ray.data.read_databricks_tables() للقراءة من مستودع Databricks SQL.

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

تكوين الموارد المستخدمة بواسطة عقدة رأس Ray

بشكل افتراضي، لتكوين Ray على Spark، يقيد Databricks الموارد المخصصة لعقدة رأس Ray من أجل:

  • 0 ذاكرة أساسية لوحدة المعالجة المركزية
  • 0 وحدات معالجة الرسومات
  • ذاكرة كومة الذاكرة المؤقتة 128 ميغابايت
  • ذاكرة تخزين كائن 128 ميغابايت

وذلك لأن عقدة رأس Ray تستخدم عادة للتنسيق العالمي، وليس لتنفيذ مهام Ray. تتم مشاركة موارد عقدة برنامج تشغيل Spark مع عدة مستخدمين، لذلك يحفظ الإعداد الافتراضي الموارد على جانب برنامج تشغيل Spark.

باستخدام Ray 2.8.0 وما فوق، يمكنك تكوين الموارد المستخدمة من قبل عقدة رأس Ray. استخدم الوسيطات التالية في setup_ray_cluster واجهة برمجة التطبيقات:

  • num_cpus_head_node: تعيين الذاكرات الأساسية لوحدة المعالجة المركزية المستخدمة بواسطة عقدة رأس Ray
  • num_gpus_head_node: تعيين وحدة معالجة الرسومات المستخدمة بواسطة عقدة رأس Ray
  • object_store_memory_head_node: تعيين حجم ذاكرة تخزين الكائنات حسب عقدة رأس Ray

دعم المجموعات غير المتجانسة

للحصول على تشغيل تدريب أكثر كفاءة وفعالية من حيث التكلفة، يمكنك إنشاء Ray على مجموعة Spark وتعيين تكوينات مختلفة بين عقدة رأس Ray وعقد عامل Ray. ومع ذلك، يجب أن يكون لجميع عقد عامل Ray نفس التكوين. لا تدعم مجموعات Databricks المجموعات غير المتجانسة بشكل كامل، ولكن يمكنك إنشاء نظام مجموعة Databricks مع أنواع مختلفة من مثيلات برامج التشغيل والعاملين عن طريق تعيين نهج نظام المجموعة.

على سبيل المثال:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

ضبط تكوين نظام مجموعة Ray

التكوين الموصى به لكل عقدة عامل Ray هو:

  • 4 ذاكرات أساسية لوحدة المعالجة المركزية كحد أدنى لكل عقدة عامل Ray.
  • ذاكرة كومة الذاكرة المؤقتة 10 غيغابايت كحد أدنى لكل عقدة عامل Ray.

عند استدعاء ray.util.spark.setup_ray_cluster، توصي Databricks بتعيين num_cpus_worker_node إلى قيمة >= 4.

راجع تخصيص الذاكرة لعقد عامل Ray للحصول على تفاصيل حول ضبط ذاكرة كومة الذاكرة المؤقتة لكل عقدة عامل Ray.

تخصيص الذاكرة لعقد عامل Ray

تستخدم كل عقدة عاملة Ray نوعين من الذاكرة: ذاكرة كومة الذاكرة المؤقتة وذاكرة تخزين الكائن. يتم تحديد حجم الذاكرة المخصصة لكل نوع كما هو موضح أدناه.

إجمالي الذاكرة المخصصة لكل عقدة عامل Ray هي:

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES هو الحد الأقصى لعدد عقد عامل Ray التي يمكن تشغيلها على عقدة عامل Spark. يتم تحديد ذلك بواسطة الوسيطة num_cpus_worker_node أو num_gpus_worker_node.

إذا لم تقم بتعيين الوسيطة object_store_memory_per_node، فإن حجم ذاكرة كومة الذاكرة المؤقتة وحجم ذاكرة تخزين الكائن المخصصة لكل عقدة عامل Ray هي:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

إذا قمت بتعيين الوسيطة object_store_memory_per_node:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

بالإضافة إلى ذلك، يتم تقييد حجم ذاكرة تخزين الكائن لكل عقدة عامل Ray بواسطة الذاكرة المشتركة لنظام التشغيل. القيمة القصوى هي:

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY/dev/shm هو حجم القرص الذي تم تكوينه لعقدة عامل Spark.

أفضل الممارسات

كيفية تعيين رقم وحدة المعالجة المركزية / GPU لكل عقدة عامل Ray؟

توصي Databricks بتعيين num_cpus_worker_node عدد مراكز وحدة المعالجة المركزية لكل عقدة عامل Spark وإعداد num_gpus_worker_node عدد وحدات معالجة الرسومات لكل عقدة عامل Spark. في هذا التكوين، تقوم كل عقدة عامل Spark بتشغيل عقدة عامل Ray واحدة تستخدم موارد عقدة عامل Spark بشكل كامل.

تكوين مجموعة GPU

تعمل مجموعة Ray أعلى مجموعة Databricks Spark. السيناريو الشائع هو استخدام وظيفة Spark وSpark UDF للقيام بمهام معالجة البيانات المسبقة البسيطة التي لا تحتاج إلى موارد وحدة معالجة الرسومات، ثم استخدام Ray لتنفيذ مهام التعلم الآلي المعقدة التي تستفيد من وحدات معالجة الرسومات. في هذه الحالة، توصي Databricks بتعيين معلمة spark.task.resource.gpu.amount تكوين مستوى نظام مجموعة Spark إلى 0، بحيث لا تستخدم جميع تحويلات Spark DataFrame وعمليات تنفيذ Spark UDF موارد GPU.

فوائد هذا التكوين هي التالية:

  • فهو يزيد من توازي وظيفة Spark، لأن نوع مثيل GPU عادة ما يحتوي على العديد من نوى وحدة المعالجة المركزية أكثر من أجهزة GPU.
  • إذا تمت مشاركة نظام مجموعة Spark مع عدة مستخدمين، فإن هذا التكوين يمنع وظائف Spark من التنافس على موارد GPU مع تشغيل أحمال عمل Ray بشكل متزامن.

تعطيل transformers تكامل mlflow للمدرب إذا كان استخدامه في مهام Ray

transformers يتم تشغيل تكامل MLflow للمدرب بشكل افتراضي. إذا كنت تستخدم تدريب Ray لتدريبه، تفشل مهمة Ray لأن بيانات اعتماد خدمة Databricks MLflow لم يتم تكوينها لمهام Ray.

لتجنب هذه المشكلة، قم بتعيين DISABLE_MLFLOW_INTEGRATION متغير البيئة إلى "TRUE" في تكوين مجموعة databricks. للحصول على معلومات حول تسجيل الدخول إلى MLflow في مهام مدرب Ray، راجع قسم "استخدام MLflow في مهام Ray" للحصول على التفاصيل.

خطأ في انتقاء الدالة عن بعد ل Address Ray

لتنفيذ مهام Ray، يستخدم Ray المخلل لتسلسل دالة المهمة. إذا فشل الانتقاء، فحدد السطر (الأسطر) في التعليمات البرمجية الخاصة بك حيث يحدث الفشل. غالبا ما يعالج نقل import الأوامر إلى دالة المهمة أخطاء الانتقاء الشائعة. على سبيل المثال، datasets.load_dataset هي دالة مستخدمة على نطاق واسع يحدث أن يتم تصحيحها داخل Databricks Runtime، مما قد يعرض استيرادا خارجيا غير قادر على التحكم. لتصحيح هذه المشكلة، يمكنك تحديث التعليمات البرمجية الخاصة بك كما يلي:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

تعطيل جهاز عرض ذاكرة Ray إذا تم إيقاف مهمة Ray بشكل غير متوقع مع خطأ OOM

في Ray 2.9.3، لدى جهاز عرض ذاكرة Ray مشكلات معروفة تتسبب في قتل مهام Ray بشكل خاطئ.

لمعالجة المشكلة، قم بتعطيل جهاز عرض ذاكرة Ray عن طريق تعيين متغير RAY_memory_monitor_refresh_ms البيئة إلى 0 في تكوين مجموعة Databricks.

تكوين موارد الذاكرة لأحمال عمل Spark و Ray المختلطة

إذا قمت بتشغيل أحمال عمل Spark و Ray المختلطة في مجموعة Databricks، توصي Databricks بتقليل ذاكرة منفذ Spark إلى قيمة صغيرة، مثل الإعداد spark.executor.memory 4g في تكوين مجموعة Databricks. ويرجع ذلك إلى تشغيل منفذ Spark داخل عملية Java التي تؤدي إلى تجميع البيانات المهملة (GC) ببطء. ضغط الذاكرة للتخزين المؤقت لمجموعة بيانات Spark مرتفع إلى حد ما، ما يسبب انخفاضا في الذاكرة المتوفرة التي يمكن أن يستخدمها Ray. لتجنب أخطاء OOM المحتملة، توصي Databricks بتقليل قيمة "spark.executor.memory" المكونة إلى قيمة أصغر من القيمة الافتراضية.

تكوين مورد الحساب لأحمال عمل Spark و Ray المختلطة

إذا قمت بتشغيل أحمال عمل Spark و Ray المختلطة في مجموعة Databricks، فقم بتعيين إما عقد نظام مجموعة Spark إلى قابلة للتطوير التلقائي، أو عقد عامل Ray إلى قابلة للتطوير التلقائي، أو كليهما مع تمكين التحجيم التلقائي.

على سبيل المثال، إذا كان لديك عدد ثابت من العقد العاملة في مجموعة Databricks، ففكر في تمكين التحجيم التلقائي ل Ray-on-Spark، بحيث عندما لا يكون هناك حمل عمل Ray قيد التشغيل، يتم تقليص مجموعة Ray. ونتيجة لذلك، يتم إصدار موارد نظام المجموعة الخاملة بحيث يمكن لمهمة Spark استخدامها.

عند اكتمال وظيفة Spark وبدء مهمة Ray، فإنها تؤدي إلى تشغيل مجموعة Ray-on-Spark لتوسيع نطاقها لتلبية متطلبات المعالجة.

يمكنك أيضا جعل كل من مجموعة Databricks والمجموعة Ray-on-spark قابلة للتطوير التلقائي. على وجه التحديد، يمكنك تكوين العقد القابلة للتطوير التلقائي لنظام مجموعة Databricks إلى 10 عقد كحد أقصى وعقد عامل Ray-on-Spark إلى 4 عقد كحد أقصى (مع عقدة عامل Ray واحدة لكل عامل spark)، مما يترك Spark مجانيا لتخصيص ما يصل إلى 6 عقد لمهام Spark. وهذا يعني أن أحمال عمل Ray يمكن أن تستخدم على الأكثر 4 موارد عقد في نفس الوقت، بينما يمكن تخصيص وظيفة Spark على الأكثر 6 عقد بقيمة الموارد.

تطبيق دالة التحويل على دفعات من البيانات

عند معالجة البيانات على دفعات، توصي Databricks باستخدام واجهة برمجة تطبيقات بيانات Ray مع map_batches الوظيفة. يمكن أن يكون هذا النهج أكثر كفاءة وقابلية للتطوير، خاصة بالنسبة لمجموعات البيانات الكبيرة أو عند إجراء حسابات معقدة تستفيد من معالجة الدفعات. يمكن تحويل أي Spark DataFrame إلى بيانات Ray باستخدام ray.data.from_spark واجهة برمجة التطبيقات، ويمكن كتابتها إلى جدول databricks UC باستخدام واجهة برمجة التطبيقات ray.data.write_databricks_table.

استخدام MLflow في مهام Ray

لاستخدام MLflow في مهام Ray، قم بتكوين ما يلي:

  • بيانات اعتماد Databricks MLflow في مهام Ray
  • يعمل MLflow على جانب برنامج تشغيل Spark الذي يمرر القيم التي تم إنشاؤها run_id إلى مهام Ray.

التعليمات البرمجية التالية مثال على ذلك:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name>@databricks.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in Spark driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in Spark driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

استخدام مكتبات python في نطاق دفتر الملاحظات أو مكتبات python للمجموعة في مهام Ray

حاليا، لدى Ray مشكلة معروفة بأن مهام Ray لا يمكنها استخدام مكتبات Python ذات نطاق دفتر الملاحظات أو مكتبات Python لنظام المجموعة. لمعالجة هذا القيد، قم بتشغيل الأمر التالي في دفتر الملاحظات قبل تشغيل نظام مجموعة Ray-on-Spark:

%pip install ray==<The Ray version you want to use> --force-reinstall

ثم قم بتشغيل الأمر التالي في دفتر الملاحظات لإعادة تشغيل نواة python:

dbutils.library.restartPython()

تمكين آثار المكدس والرسوم البيانية لهب على صفحة الجهات الفاعلة في لوحة معلومات Ray

في صفحة Ray Dashboard Actors ، يمكنك عرض آثار المكدس والرسوم البيانية لهب لممثلي Ray النشطين.

لعرض هذه المعلومات، قم بتثبيت py-spy قبل بدء تشغيل نظام مجموعة Ray:

%pip install py-spy

إيقاف تشغيل نظام مجموعة Ray

لإيقاف تشغيل مجموعة Ray التي تعمل على Azure Databricks، اتصل بواجهة برمجة تطبيقات ray.utils.spark.shutdown_ray_cluster .

إشعار

يتم أيضا إيقاف تشغيل مجموعات الأشعة عندما:

  • يمكنك فصل دفتر الملاحظات التفاعلي من مجموعة Azure Databricks.
  • اكتملت مهمة Azure Databricks.
  • تتم إعادة تشغيل نظام مجموعة Azure Databricks أو إنهاؤها.
  • لا يوجد نشاط لوقت الخمول المحدد.

مثال لدفتر الملاحظات

يوضح دفتر الملاحظات التالي كيفية إنشاء مجموعة Ray وتشغيل تطبيق Ray على Databricks.

Ray على دفتر ملاحظات بدء Spark

الحصول على دفتر الملاحظات

القيود

  • مجموعات Azure Databricks المشتركة متعددة المستخدمين (تمكين وضع العزل) غير مدعومة.
  • عند استخدام ٪pip لتثبيت الحزم، سيتم إيقاف تشغيل مجموعة Ray. تأكد من بدء تشغيل Ray بعد الانتهاء من تثبيت جميع مكتباتك باستخدام ٪pip.
  • يمكن أن يؤدي استخدام عمليات التكامل التي تتجاوز التكوين من ray.util.spark.setup_ray_cluster إلى أن تصبح مجموعة Ray غير مستقرة ويمكن أن يتعطل سياق Ray. على سبيل المثال، يمكن أن يؤدي استخدام الحزمة xgboost_ray والإعداد RayParams مع ممثل أو cpus_per_actor تكوين يتجاوز تكوين مجموعة Ray إلى تعطل مجموعة Ray بصمت.