共用方式為


使用 Spark DataFrame 執行批次推斷

這很重要

本頁中的主題僅適用於批次推斷案例,這些案例不使用由 Databricks 所提供的基礎模型,這些模型已針對批次推斷場景進行了優化。 請參閱 使用 Azure Databricks AI Functions 將 AI 應用於數據

本頁說明如何使用 Databricks 中的已註冊模型,在 Spark DataFrame 上執行批次推斷。 此工作流程適用於各種機器學習和深度學習模型,包括 TensorFlow、PyTorch 和 scikit-learn。 其中包含數據載入、模型推斷和效能微調的最佳做法。

針對深度學習應用程式的模型推斷,Azure Databricks 建議使用下列工作流程。 如需使用 TensorFlow 和 PyTorch 的範例筆記本,請參閱 批次推斷範例

模型推斷工作流程

Databricks 建議使用 Spark DataFrame 執行批次推斷的下列工作流程。

步驟 1:環境設定

請確定您的叢集運行相容的 Databricks ML 執行時間版本,以符合訓練環境。 使用 MLflow 記錄的模型包含可安裝的需求,以確保定型和推斷環境相符。

requirements_path = os.path.join(local_path, "requirements.txt")
if not os.path.exists(requirements_path):
    dbutils.fs.put("file:" + requirements_path, "", True)

%pip install -r $requirements_path
%restart_python

步驟 2:將數據載入 Spark DataFrame

根據資料類型,使用適當的方法將數據載入 Spark DataFrame:

數據類型 方法
Unity 目錄中的表(建議) table = spark.table(input_table_name)
影像檔 (JPG, PNG) files_df = spark.createDataFrame(map(lambda path: (path,), file_paths), ["path"])
TFRecords df = spark.read.format("tfrecords").load(image_path)
其他格式(Parquet、CSV、JSON、JDBC) 使用 Spark 數據源載入。

步驟 3:從模型登錄載入模型

此範例會使用 Databricks 模型登錄中的模型進行推斷。

predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri)

步驟 4:使用 Pandas UDF 執行模型推斷

Pandas UDF 會利用 Apache Arrow 進行有效率的數據傳輸和 Pandas 進行處理。 使用 pandas UDF 進行推斷的一般步驟如下:

  1. 載入定型的模型:使用 MLflow 建立 Spark UDF 進行推斷。
  2. 前置處理輸入數據:確定輸入架構符合模型需求。
  3. 執行模型預測:在 DataFrame 上使用模型的 UDF 函式。
df_result = df_spark.withColumn("prediction", predict_udf(*df_spark.columns))
  1. (建議)將預測儲存至 Unity 目錄。

下列範例會將預測儲存至 Unity 目錄。

df_result.write.mode("overwrite").saveAsTable(output_table)

模型推論效能的微調

本節提供在 Azure Databricks 上針對模型推斷進行偵錯和效能微調的一些秘訣。 如需概觀,請參閱 使用Spark DataFrame執行批次推斷

模型推斷中通常有兩個主要部分:數據輸入管線和模型推斷。 數據輸入管線在數據 I/O 輸入上負荷很大,而模型推斷在計算上負荷很大。 判斷工作流程的瓶頸很簡單。 以下是一些方法:

  • 將模型縮減為簡單模型,並測量每秒的範例。 如果完整模型與一般模型之間的端對端時間差異很小,則數據輸入管線可能是瓶頸,否則模型推斷就是瓶頸。
  • 如果使用 GPU 執行模型推斷,請檢查 GPU 使用率 計量。 如果 GPU 使用率不持續高,則數據輸入管線可能是瓶頸。

優化數據輸入管線

使用 GPU 可以有效率地優化模型推斷的執行速度。 當 GPU 和其他加速器變得更快時,數據輸入管線必須跟上需求。 數據輸入管線會將數據讀入 Spark DataFrame、轉換數據,並將其載入為模型推斷的輸入。 如果數據輸入是瓶頸,以下是增加 I/O 輸送量的一些秘訣:

  • 設定每個批次的最大記錄。 較大的最大記錄數目可減少 I/O 額外負荷,只要記錄可以放入記憶體中,即可呼叫 UDF 函式。 若要設定批次大小,請設定下列設定:

    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")
    
  • 在 pandas UDF 中預先處理輸入數據時,以批次的方式載入數據並預先擷取數據。

    針對 TensorFlow,Azure Databricks 建議使用 tf.data API。 您可以在 num_parallel_calls 函式中設定 map,並呼叫 prefetchbatch 以進行預先擷取和批處理,藉此以平行運算的方式剖析映射。

    dataset.map(parse_example, num_parallel_calls=num_process).prefetch(prefetch_size).batch(batch_size)
    

    針對 PyTorch,Azure Databricks 建議使用 DataLoader 類別。 您可以將 batch_size 設定為批次處理,而 num_workers 設定為平行數據載入。

    torch.utils.data.DataLoader(images, batch_size=batch_size, num_workers=num_process)
    

Batch 推斷範例

本節中的範例會遵循建議的深度學習推斷工作流程。 這些範例說明如何使用預先定型的深度殘差網路 (ResNets) 類神經網路模型來執行模型推斷。

使用 Spark UDF 的結構化數據擷取和批次推斷

下列範例筆記本示範透過自動化擷取技術,將未經結構化數據擷取的簡單代理程式開發、記錄和評估,轉換成有組織、可使用的資訊。 此方法示範如何使用 MLflow 的 PythonModel 類別來實作批次推斷的自定義代理程式,並使用記錄的代理程式模型作為 Spark User-Defined 函式 (UDF)。 此筆記本也會示範如何利用 Mosaic AI 代理評估工具,透過真實數據來評估準確性。

使用 Spark UDF 的結構化數據擷取和批次推斷

拿筆記本