這很重要
本頁中的主題僅適用於批次推斷案例,這些案例不使用由 Databricks 所提供的基礎模型,這些模型已針對批次推斷場景進行了優化。 請參閱 使用 Azure Databricks AI Functions 將 AI 應用於數據。
本頁說明如何使用 Databricks 中的已註冊模型,在 Spark DataFrame 上執行批次推斷。 此工作流程適用於各種機器學習和深度學習模型,包括 TensorFlow、PyTorch 和 scikit-learn。 其中包含數據載入、模型推斷和效能微調的最佳做法。
針對深度學習應用程式的模型推斷,Azure Databricks 建議使用下列工作流程。 如需使用 TensorFlow 和 PyTorch 的範例筆記本,請參閱 批次推斷範例。
模型推斷工作流程
Databricks 建議使用 Spark DataFrame 執行批次推斷的下列工作流程。
步驟 1:環境設定
請確定您的叢集運行相容的 Databricks ML 執行時間版本,以符合訓練環境。 使用 MLflow 記錄的模型包含可安裝的需求,以確保定型和推斷環境相符。
requirements_path = os.path.join(local_path, "requirements.txt")
if not os.path.exists(requirements_path):
dbutils.fs.put("file:" + requirements_path, "", True)
%pip install -r $requirements_path
%restart_python
步驟 2:將數據載入 Spark DataFrame
根據資料類型,使用適當的方法將數據載入 Spark DataFrame:
| 數據類型 | 方法 |
|---|---|
| Unity 目錄中的表(建議) | table = spark.table(input_table_name) |
| 影像檔 (JPG, PNG) | files_df = spark.createDataFrame(map(lambda path: (path,), file_paths), ["path"]) |
| TFRecords | df = spark.read.format("tfrecords").load(image_path) |
| 其他格式(Parquet、CSV、JSON、JDBC) | 使用 Spark 數據源載入。 |
步驟 3:從模型登錄載入模型
此範例會使用 Databricks 模型登錄中的模型進行推斷。
predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri)
步驟 4:使用 Pandas UDF 執行模型推斷
Pandas UDF 會利用 Apache Arrow 進行有效率的數據傳輸和 Pandas 進行處理。 使用 pandas UDF 進行推斷的一般步驟如下:
- 載入定型的模型:使用 MLflow 建立 Spark UDF 進行推斷。
- 前置處理輸入數據:確定輸入架構符合模型需求。
- 執行模型預測:在 DataFrame 上使用模型的 UDF 函式。
df_result = df_spark.withColumn("prediction", predict_udf(*df_spark.columns))
- (建議)將預測儲存至 Unity 目錄。
下列範例會將預測儲存至 Unity 目錄。
df_result.write.mode("overwrite").saveAsTable(output_table)
模型推論效能的微調
本節提供在 Azure Databricks 上針對模型推斷進行偵錯和效能微調的一些秘訣。 如需概觀,請參閱 使用Spark DataFrame執行批次推斷。
模型推斷中通常有兩個主要部分:數據輸入管線和模型推斷。 數據輸入管線在數據 I/O 輸入上負荷很大,而模型推斷在計算上負荷很大。 判斷工作流程的瓶頸很簡單。 以下是一些方法:
- 將模型縮減為簡單模型,並測量每秒的範例。 如果完整模型與一般模型之間的端對端時間差異很小,則數據輸入管線可能是瓶頸,否則模型推斷就是瓶頸。
- 如果使用 GPU 執行模型推斷,請檢查 GPU 使用率 計量。 如果 GPU 使用率不持續高,則數據輸入管線可能是瓶頸。
優化數據輸入管線
使用 GPU 可以有效率地優化模型推斷的執行速度。 當 GPU 和其他加速器變得更快時,數據輸入管線必須跟上需求。 數據輸入管線會將數據讀入 Spark DataFrame、轉換數據,並將其載入為模型推斷的輸入。 如果數據輸入是瓶頸,以下是增加 I/O 輸送量的一些秘訣:
設定每個批次的最大記錄。 較大的最大記錄數目可減少 I/O 額外負荷,只要記錄可以放入記憶體中,即可呼叫 UDF 函式。 若要設定批次大小,請設定下列設定:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")在 pandas UDF 中預先處理輸入數據時,以批次的方式載入數據並預先擷取數據。
針對 TensorFlow,Azure Databricks 建議使用 tf.data API。 您可以在
num_parallel_calls函式中設定map,並呼叫prefetch和batch以進行預先擷取和批處理,藉此以平行運算的方式剖析映射。dataset.map(parse_example, num_parallel_calls=num_process).prefetch(prefetch_size).batch(batch_size)針對 PyTorch,Azure Databricks 建議使用 DataLoader 類別。 您可以將
batch_size設定為批次處理,而num_workers設定為平行數據載入。torch.utils.data.DataLoader(images, batch_size=batch_size, num_workers=num_process)
Batch 推斷範例
本節中的範例會遵循建議的深度學習推斷工作流程。 這些範例說明如何使用預先定型的深度殘差網路 (ResNets) 類神經網路模型來執行模型推斷。
使用 Spark UDF 的結構化數據擷取和批次推斷
下列範例筆記本示範透過自動化擷取技術,將未經結構化數據擷取的簡單代理程式開發、記錄和評估,轉換成有組織、可使用的資訊。 此方法示範如何使用 MLflow 的 PythonModel 類別來實作批次推斷的自定義代理程式,並使用記錄的代理程式模型作為 Spark User-Defined 函式 (UDF)。 此筆記本也會示範如何利用 Mosaic AI 代理評估工具,透過真實數據來評估準確性。