Sdílet prostřednictvím


Distribuované trénování modelů XGBoost pomocí xgboost.spark

Důležité

Tato funkce je ve verzi Public Preview.

Balíček Python xgboost>=1.7 obsahuje nový modul xgboost.spark. Tento modul zahrnuje estimátory xgboost.spark.SparkXGBRegressorxgboost PySpark , xgboost.spark.SparkXGBClassifiera xgboost.spark.SparkXGBRanker. Tyto nové třídy podporují zahrnutí estimátorů XGBoost do kanálů SparkML. Podrobnosti o rozhraní API najdete v dokumentaci k rozhraní PYTHON SPARK API XGBoost.

Požadavky

Databricks Runtime 12.0 ML a vyšší

xgboost.spark Parametry

Estimátory definované v xgboost.spark modulu podporují většinu stejných parametrů a argumentů používaných ve standardním XGBoostu.

  • Parametry konstruktoru třídy, fit metody a predict metody jsou z velké části identické s parametry v xgboost.sklearn modulu.
  • Názvy, hodnoty a výchozí hodnoty jsou většinou stejné jako názvy popsané v parametrech XGBoost.
  • Výjimky jsou několik nepodporovaných parametrů (například , , , ), a pyspark parametry specifické pro odhadce, které byly přidány (například featuresCol, labelCol, use_gpu, ). validationIndicatorColeval_setsample_weightnthreadgpu_id Podrobnosti najdete v dokumentaci k rozhraní XGBoost Python Spark API.

Distribuované trénování

Estimátory PySpark definované v xgboost.spark modulu podporují distribuované trénování XGBoost pomocí parametru num_workers . Pokud chcete použít distribuované trénování, vytvořte klasifikátor nebo regresor a nastavte num_workers ho na počet souběžných spuštěných úloh Sparku během distribuovaného trénování. Pokud chcete použít všechny sloty úloh Sparku, nastavte num_workers=sc.defaultParallelism.

Příklad:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Poznámka:

  • Nelze použít mlflow.xgboost.autolog s distribuovaným XGBoostem. Pokud chcete protokolovat model Spark xgboost pomocí MLflow, použijte mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • Distribuovaný XGBoost nelze použít v clusteru s povoleným automatickým škálováním. Nové pracovní uzly, které začínají v tomto paradigmatu elastického škálování, nemůžou přijímat nové sady úloh a zůstat nečinné. Pokyny k zakázání automatického škálování najdete v tématu Povolení automatického škálování.

Povolení optimalizace pro trénování u řídkých funkcí – datová sada

Estimátory PySpark definované v xgboost.spark modulu podporují optimalizaci pro trénování datových sad s řídkými funkcemi. Pokud chcete povolit optimalizaci řídkých sad funkcí, musíte metodě poskytnout datovou sadu fit , která obsahuje sloupec funkcí skládající se z hodnot typu pyspark.ml.linalg.SparseVector , a nastavit parametr enable_sparse_data_optim odhadce na Truehodnotu . Kromě toho je nutné nastavit missing parametr na 0.0.

Příklad:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

Trénování GPU

Estimátory PySpark definované v xgboost.spark modulu podporují trénování gpu. Nastavte parametr use_gpu na True povolení trénování GPU.

Poznámka:

Pro každou úlohu Sparku použitou v distribuovaném trénování XGBoost se při nastavení Trueargumentu use_gpu použije v trénování pouze jeden GPU . Databricks doporučuje použít výchozí hodnotu konfigurace clusteru 1spark.task.resource.gpu.amountSpark . V opačném případě jsou další GPU přidělené této úloze Sparku nečinné.

Příklad:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Příklad poznámkového bloku

Tento poznámkový blok ukazuje použití balíčku xgboost.spark Pythonu se sparkem MLlib.

Poznámkový blok PySpark-XGBoost

Získat poznámkový blok

Průvodce migrací pro zastaralý sparkdl.xgboost modul

  • Nahraďte from sparkdl.xgboost import XgboostRegressor ho from xgboost.spark import SparkXGBRegressor a nahraďte from sparkdl.xgboost import XgboostClassifier ho .from xgboost.spark import SparkXGBClassifier
  • Změňte všechny názvy parametrů v konstruktoru estimátoru z stylu camelCase na snake_case styl. Například změňte XgboostRegressor(featuresCol=XXX) na SparkXGBRegressor(features_col=XXX).
  • Parametry use_external_storage a external_storage_precision byly odebrány. xgboost.spark Estimátory používají rozhraní API iterace dat DMatrix k efektivnějšímu využití paměti. Už není potřeba používat neefektivní externí režim úložiště. U extrémně velkých datových sad doporučuje Databricks zvýšit num_workers parametr, díky kterému každý trénovací úkol rozdělí data do menších a spravovatelných datových oddílů. Zvažte nastavení num_workers = sc.defaultParallelism, které nastaví num_workers celkový počet slotů úloh Sparku v clusteru.
  • U odhadců definovaných v xgboost.sparknastavení num_workers=1 se provádí trénování modelu pomocí jediné úlohy Sparku. To využívá počet jader procesoru určených nastavením spark.task.cpuskonfigurace clusteru Spark, což je ve výchozím nastavení 1. Pokud chcete k trénování modelu použít více jader procesoru, zvyšte num_workers nebo spark.task.cpus. Nelze nastavit nthread ani n_jobs parametr pro odhadátory definované v parametru xgboost.spark. Toto chování se liší od předchozího chování odhadců definovaných v zastaralém sparkdl.xgboost balíčku.

Převod sparkdl.xgboost modelu na xgboost.spark model

sparkdl.xgboost modely se ukládají v jiném formátu než xgboost.spark modely a mají různá nastavení parametrů. K převodu modelu použijte následující funkci nástroje:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Pokud máte pyspark.ml.PipelineModel model obsahující sparkdl.xgboost model jako poslední fázi, můžete fázi sparkdl.xgboost modelu nahradit převedeným xgboost.spark modelem.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)