Поделиться через


Объединение Ray и Spark в одной среде в Azure Databricks

С помощью Azure Databricks можно запускать операции Ray и Spark в одной среде выполнения, чтобы использовать преимущества обоих распределенных вычислительных модулей.

Интеграция Ray и Spark поддерживается каталогом Delta Lake и Unity, обеспечивающим надежное управление данными, безопасный доступ и отслеживание происхождения.

В этой статье показано, как подключить операции Ray и Spark в соответствии со следующими вариантами использования:

  • Передача данных Spark в Ray: Эффективный перенос данных в оперативную память Ray.
  • Запись данных Ray в Spark: выходные данные из Ray обратно в Delta Lake или другие решения хранилища для обеспечения совместимости и доступа.
  • Подключение внешних приложений Ray к каталогу Unity: подключение приложений Ray за пределами Databricks для загрузки данных из таблицы каталога Databricks Unity.

Дополнительные сведения об использовании Ray, Spark или обоих см. в разделе "Когда следует использовать Spark и Ray".

Создание распределенного набора данных Ray из DataFrame Spark

Чтобы создать распределенный набор данных Ray из Spark DataFrame, можно использовать функцию ray.data.from_spark() для непосредственного чтения Spark DataFrame из Ray без необходимости записи данных в какое-либо место.

Передачи данных из памяти Spark в Ray доступны в Databricks Runtime ML версии 15.0 и выше.

Чтобы включить эту функцию, необходимо выполнить следующие действия.

  • Задайте для конфигурации кластера Spark spark.databricks.pyspark.dataFrameChunk.enabled значение true перед запуском кластера.
import ray.data

source_table = "my_db.my_table"

# Read a Spark DataFrame from a Delta table in Unity Catalog
df = spark.read.table(source_table)
ray_ds = ray.data.from_spark(df)

Предупреждение

Автоматическое масштабирование кластеров Spark (включая тех, которые используют точечные экземпляры) должно задать параметр use_spark_chunk_api для использования функции False с from_spark(). В противном случае вызов API приведет к пропущению кэша, так как кэш исполнителя Spark теряется при завершении работы исполнителя.

ray_ds = ray.data.from_spark(df, use_spark_chunk_api=False)

Записать данные Ray в формат Spark

Чтобы записать данные Ray в Spark, необходимо записать набор данных в расположение, к которому может получить доступ Spark.

В Databricks Runtime ML версии до 15.0 включительно вы можете сохранить данные прямо в расположение хранилища объектов с помощью инструмента Ray Parquet, находящегося в модуле ray_dataset.write_parquet()ray.data. Spark может считывать эти данные Parquet с помощью встроенных средств чтения.

Для рабочих областей с поддержкой каталога Unity используйте функцию ray.data.Dataset.write_databricks_table для записи в таблицу каталога Unity.

Эта функция временно сохраняет набор данных Ray в томах каталога Unity, считывает тома каталога Unity с помощью Spark и, наконец, записывает в таблицу каталога Unity. Перед вызовом функции ray.data.Dataset.write_databricks_table убедитесь, что переменная среды "_RAY_UC_VOLUMES_FUSE_TEMP_DIR" задана для допустимого и доступного пути тома каталога Unity, например "/Volumes/MyCatalog/MySchema/MyVolume/MyRayData".

ds = ray.data
ds.write_databricks_table()

Для рабочих областей, которые не включены в каталог Unity, можно вручную хранить набор данных Ray в качестве временного файла, например файл Parquet в DBFS, а затем читать файл данных с помощью Spark.

ds.write_parquet(tmp_path)
df = spark.read.parquet(tmp_path)
df.write.format("delta").saveAsTable(table_name)

Запись данных из основных приложений Ray в Spark

Azure Databricks также может интегрировать Ray Core с Spark, позволяя запускать Ray Core (нижнеуровневые API Ray) и рабочие нагрузки Spark в одной среде, обеспечивая обмен данными между ними. Эта интеграция предлагает несколько шаблонов в соответствии с различными рабочими нагрузками и потребностями управления данными, обеспечивая упрощенное взаимодействие с обеими платформами.

Существует три основных шаблона записи данных из Ray в Spark.

  • Сохранять выходные данные во временном расположении: временно хранить результаты задач Ray в томах DBFS или Unity Catalog перед консолидацией их в Spark DataFrame.
  • Подключение с Spark Connect: напрямую подключайте задачи Ray к кластеру Spark, что позволяет Ray взаимодействовать с DataFrame и таблицами Spark.
  • Использовать сторонние библиотеки: используйте внешние библиотеки, такие как deltalake или deltaray, для записи данных из задач Ray Core в таблицы Delta Lake или Spark.

Шаблон 1. Сохранение выходных данных во временном расположении

Наиболее распространенным шаблоном записи данных из Ray в Spark является хранение выходных данных во временном расположении, например томов каталога Unity или DBFS. После хранения данных поток драйвера Ray считывает каждую часть файлов на рабочих узлах и объединяет их в окончательный кадр данных для дальнейшей обработки. Как правило, временные файлы находятся в стандартном формате, например CSV. Этот подход лучше всего подходит, если выходные данные отображаются в табличной форме, например, Pandas DataFrame, созданный с помощью задачи Ray Core.

Используйте этот метод, если выходные данные из задач Ray слишком большие, чтобы поместиться в память узла драйвера или общего хранилища объектов. Если необходимо обрабатывать большие наборы данных без сохранения данных в хранилище, рекомендуется увеличить объем памяти, выделенной для узла драйвера в кластере Databricks, чтобы повысить производительность.

import os
import uuid
import numpy as np
import pandas as pd

@ray.remote
def write_example(task_id, path_prefix):

  num_rows = 100

  df = pd.DataFrame({
      'foo': np.random.rand(num_rows),
      'bar': np.random.rand(num_rows)
  })

  # Write the DataFrame to a CSV file
  df.to_csv(os.path.join(path_prefix, f"result_part_{task_id}.csv"))

n_tasks = 10

# Put a unique DBFS prefix for the temporary file path
dbfs_prefix = f"/dbfs/<USERNAME>"

# Create a unique path for the temporary files
path_prefix = os.path.join(dbfs_prefix, f"/ray_tmp/write_task_{uuid.uuid4()}")

tasks = ray.get([write_example.remote(i, path_prefix) for i in range(n_tasks)])

# Read all CSV files in the directory into a single DataFrame
df = spark.read.csv(path_prefix.replace("/dbfs", "dbfs:"), header=True, inferSchema=True)

Шаблон 2. Подключение с помощью Spark Connect

Другим способом взаимодействия задач Ray Core с Spark в рамках удаленной задачи является использование Spark Connect. Это позволяет настроить контекст Spark на рабочем узле Ray, чтобы указать на кластер Spark, запущенный с драйверного узла.

Чтобы настроить эту настройку, необходимо настроить ресурсы кластера Ray, чтобы выделить пространство для Spark. Например, если рабочий узел имеет 8 ЦП, установите num_cpus_worker_node значение 7, оставив 1 ЦП для Spark. Для более крупных задач Spark рекомендуется выделить большую долю ресурсов.

from databricks.connect import DatabricksSession
import ray

@ray.remote
class SparkHandler(object):

   def __init__(self, access_token=None, cluster_id=None, host_url=None):
       self.spark = (DatabricksSession
                     .builder
                     .remote(host=host_url,
                             token=access_token,
                             cluster_id=cluster_id)
                     .getOrCreate()
                     )
   def test(self):
       df = self.spark.sql("select * from samples.nyctaxi.trips")

       df.write.format("delta").mode(
"overwrite").saveAsTable("catalog.schema.taxi_trips")
       return df.count()

access_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
cluster_id = dbutils.notebook.entry_point.getDbutils().notebook().getContext().clusterId().get()
host_url = f"https://{dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get('browserHostName').get()}"

sh = SparkHandler.remote(access_token=access_token,
                        cluster_id=cluster_id,
                        host_url=host_url)
print(ray.get(sh.test.remote()))

В этом примере используется маркер, созданный записной книжкой. Тем не менее, Databricks рекомендует для производственных кейсов использовать токен доступа, хранящийся в секретах Databricks.

Так как этот процесс вызывает один драйвер Spark, он создает блокировку потоков, которая приводит к тому, что все задачи будут ожидать завершения предыдущих задач Spark. Поэтому рекомендуется использовать это, если не существует большого количества параллельных задач, так как все они будут иметь последовательное поведение по мере выполнения задач Spark. Для этих ситуаций лучше сохранить выходные данные, а затем объединить их в один кадр данных Spark в конце, а затем записать в выходную таблицу.

Шаблон 3. Сторонние библиотеки

Другой вариант — использование сторонних библиотек, взаимодействующих с Delta Lake и Spark. Azure Databricks официально не поддерживает эти сторонние библиотеки. Примером этого является deltalake библиотека из delta-rs проекта. Этот подход в настоящее время работает только с таблицами хранилища метаданных Hive, а не с таблицами каталога Unity.

from deltalake import DeltaTable, write_deltalake
import pandas as pd
import numpy as np
import ray

@ray.remote
def write_test(table_name):
   random_df_id_vals = [int(np.random.randint(1000)), int(np.random.randint(1000))]
   pdf = pd.DataFrame({"id": random_df_id_vals, "value": ["foo", "bar"]})
   write_deltalake(table_name, pdf, mode="append")

def main():
   table_name = "database.mytable"
   ray.get([write_test.remote(table_name) for _ in range(100)])

Другая сторонняя библиотека доступна в библиотеке deltaray, доступной через проект Delta Incubator https://github.com/delta-incubator/deltaray)

# Standard Libraries
import pathlib

# External Libraries
import deltaray
import deltalake as dl
import pandas as pd

# Creating a Delta Table
cwd = pathlib.Path().resolve()
table_uri = f'{cwd}/tmp/delta-table'
df = pd.DataFrame({'id': [0, 1, 2, 3, 4, ], })
dl.write_deltalake(table_uri, df)

# Reading our Delta Table
ds = deltaray.read_delta(table_uri)
ds.show()

Подключение внешних приложений Ray к Databricks

Создание набора данных Ray из запроса хранилища Databricks

Для Ray 2.8.0 и более поздних версий для подключения приложений Ray за пределами Azure Databricks к таблицам в Azure Databricks можно вызвать API ray.data.read_databricks_tables для загрузки данных из таблицы каталога Unity.

Сначала задайте переменную среды DATABRICKS_TOKEN с вашим токеном доступа к SQL-складу. Если программа не запущена в Databricks Runtime, также установите значение переменной среды DATABRICKS_HOST на URL-адрес рабочей области Databricks, как показано ниже:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

Затем вызовите ray.data.read_databricks_tables(), чтобы прочитать из хранилища SQL.

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity Catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

Предупреждение

Хранилища Databricks могут кэшировать только результаты запросов в течение примерно 2 часов. Для долгосрочных рабочих нагрузок используйте метод ray.data.Dataset.materialize для переноса набора данных Ray в распределенное хранилище объектов Ray.

Создание набора данных Ray из таблицы Delta Sharing от Databricks

Вы также можете считывать данные из таблиц Delta Sharing в Azure Databricks . Чтение из таблиц Delta Sharing является более надежным, чем чтение из кэша хранилища Databricks.

API ray.data.read_delta_sharing_tables доступен в Ray 2.33 и более поздних версиях.

import ray

ds = ray.data.read_delta_sharing_tables(
    url=f"<profile-file-path>#<share-name>.<schema-name>.<table-name>",
    limit=100000,
    version=1,
)

Лучшие практики

  • Всегда используйте методы, описанные в руководстве по кластеру Ray, чтобы убедиться, что кластер полностью используется.
  • Рекомендуется использовать тома каталога Unity для хранения выходных данных в не табличном формате и обеспечения управления.
  • Убедитесь, что задана конфигурация num_cpus_worker_node таким образом, чтобы число ядер ЦП соответствовало числу рабочих узлов Spark. Аналогичным образом задайте для num_gpus_worker_node количество GPU на рабочий узел Spark. В этой конфигурации каждый рабочий узел Spark запускает один рабочий узел Ray, который полностью использует ресурсы рабочего узла Spark.

Ограничения

В настоящее время каталог Unity не делится учетными данными для записи в таблицы писателями, отличными от Spark. Поэтому все данные, записываемые в таблицу каталога Unity из задачи Ray Core, должны быть сначала сохранены и затем прочитаны с помощью Spark, или в задаче Ray должен быть настроен Databricks Connect.