Share via


Distribuerad träning av XGBoost-modeller med hjälp av xgboost.spark

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

Python-paketet xgboost>=1.7 innehåller en ny modul xgboost.spark. Den här modulen innehåller xgboost PySpark-skattarna xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifieroch xgboost.spark.SparkXGBRanker. Dessa nya klasser stöder införandet av XGBoost-skattningar i SparkML-pipelines. Api-information finns i XGBoost python spark API-dokumentet.

Krav

Databricks Runtime 12.0 ML och senare.

xgboost.spark Parametrar

De skattningar som definieras i modulen xgboost.spark stöder de flesta av samma parametrar och argument som används i standard XGBoost.

  • Parametrarna för klasskonstruktorn, fit -metoden och predict -metoden är i stort sett identiska med dem i modulen xgboost.sklearn .
  • Namngivning, värden och standardvärden är mestadels identiska med de som beskrivs i XGBoost-parametrar.
  • Undantag är några parametrar som inte stöds (till exempel , , , ), och de pyspark beräkningsspecifika parametrar som har lagts till (till exempel featuresCol, labelCol, use_gpu, validationIndicatorCol). eval_setsample_weightnthreadgpu_id Mer information finns i dokumentationen om XGBoost Python Spark API.

Distribuerad träning

PySpark-skattare som definierats i modulen xgboost.spark stöder distribuerad XGBoost-träning med hjälp av parametern num_workers . Om du vill använda distribuerad träning skapar du en klassificerare eller regressor och anger num_workers antalet samtidiga Spark-aktiviteter som körs under distribuerad träning. Om du vill använda alla Spark-aktivitetsfack anger du num_workers=sc.defaultParallelism.

Till exempel:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Kommentar

  • Du kan inte använda mlflow.xgboost.autolog med distribuerad XGBoost. Om du vill logga en xgboost Spark-modell med MLflow använder du mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • Du kan inte använda distribuerad XGBoost i ett kluster som har automatisk skalning aktiverat. Nya arbetsnoder som startar i det här elastiska skalningsparadigmet kan inte ta emot nya uppsättningar uppgifter och förbli inaktiva. Instruktioner för att inaktivera automatisk skalning finns i Aktivera automatisk skalning.

Aktivera optimering för träning på datauppsättning med glesa funktioner

PySpark-skattningar som definierats i xgboost.spark modulen stöder optimering för träning av datauppsättningar med glesa funktioner. Om du vill aktivera optimering av glesa funktionsuppsättningar måste du ange en datauppsättning för metoden fit som innehåller en funktionskolumn som består av värden av typen pyspark.ml.linalg.SparseVector och ange parametern enable_sparse_data_optim estimator till True. Dessutom måste du ange parametern missing till 0.0.

Till exempel:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

GPU-utbildning

PySpark-skattningar som definierats i modulen xgboost.spark stöder utbildning på GPU:er. Ange parametern use_gpu till True för att aktivera GPU-träning.

Kommentar

För varje Spark-uppgift som används i distribuerad XGBoost-träning används endast en GPU i träning när use_gpu argumentet är inställt på True. Databricks rekommenderar att du använder standardvärdet 1 för för Spark-klusterkonfigurationen spark.task.resource.gpu.amount. I annat fall är de ytterligare GPU:er som allokerats till den här Spark-aktiviteten inaktiva.

Till exempel:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Exempelnotebook-fil

Den här notebook-filen visar användningen av Python-paketet xgboost.spark med Spark MLlib.

PySpark-XGBoost Notebook

Hämta notebook-fil

Migreringsguide för den inaktuella sparkdl.xgboost modulen

  • Ersätt from sparkdl.xgboost import XgboostRegressor med from xgboost.spark import SparkXGBRegressor och ersätt from sparkdl.xgboost import XgboostClassifier med from xgboost.spark import SparkXGBClassifier.
  • Ändra alla parameternamn i estimatorkonstruktorn från camelCase-formatmallen till snake_case formatmall. Ändra XgboostRegressor(featuresCol=XXX) till exempel till SparkXGBRegressor(features_col=XXX).
  • Parametrarna use_external_storage och external_storage_precision har tagits bort. xgboost.spark estimatorer använder DMatrix data iteration API för att använda minne mer effektivt. Det finns inte längre något behov av att använda ineffektivt externt lagringsläge. För extremt stora datauppsättningar rekommenderar Databricks att du ökar parametern num_workers , vilket gör att varje träningsuppgift partitioneras till mindre, mer hanterbara datapartitioner. Överväg att ange num_workers = sc.defaultParallelism, som anger num_workers det totala antalet Spark-aktivitetsfack i klustret.
  • För skattare som definieras i xgboost.sparkkör inställningen num_workers=1 modellträning med hjälp av en enda Spark-uppgift. Detta använder det antal CPU-kärnor som anges av konfigurationsinställningen spark.task.cpusför Spark-klustret , som är 1 som standard. Om du vill använda fler CPU-kärnor för att träna modellen ökar num_workers du eller spark.task.cpus. Du kan inte ange parametern nthread eller n_jobs för skattningar som definierats i xgboost.spark. Det här beteendet skiljer sig från det tidigare beteendet för skattningar som definierats i det inaktuella sparkdl.xgboost paketet.

Konvertera sparkdl.xgboost modell till xgboost.spark modell

sparkdl.xgboost modeller sparas i ett annat format än xgboost.spark modeller och har olika parameterinställningar. Använd följande verktygsfunktion för att konvertera modellen:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Om du har en pyspark.ml.PipelineModel modell som innehåller en sparkdl.xgboost modell som den sista fasen kan du ersätta modellsteget sparkdl.xgboost med den konverterade xgboost.spark modellen.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)