次の方法で共有


Spark DataFrame を使用してバッチ推論を実行する

Important

このページのトピックは、バッチ推論シナリオ用に最適化された Databricks ホスト型基盤モデルを使用しないバッチ推論シナリオにのみ関連します。 Azure Databricks AI Functions を使用したデータへの AI の適用に関するページを参照してください。

このページでは、Databricks の登録済みモデルを使用して Spark DataFrame でバッチ推論を実行する方法について説明します。 このワークフローは、TensorFlow、PyTorch、scikit-learn など、さまざまな機械学習およびディープ ラーニング モデルに適用されます。 これには、データ読み込み、モデル推論、およびパフォーマンス チューニングのベスト プラクティスが含まれています。

Azure Databricks では、ディープ ラーニング アプリケーションのモデル推論に次のワークフローをお勧めしています。 TensorFlow と PyTorch を使用するノートブックの例については、 バッチ推論の例を参照してください。

モデル推論ワークフロー

Databricks では、Spark DataFrames を使用してバッチ推論を実行するための次のワークフローをお勧めします。

手順 1: 環境のセットアップ

クラスターで、トレーニング環境に合わせて互換性のある Databricks ML ランタイム バージョンが実行されていることを確認します。 MLflow を使用してログに記録されるモデルには、トレーニング環境と推論環境が確実に一致するようにインストールできる要件が含まれています。

requirements_path = os.path.join(local_path, "requirements.txt")
if not os.path.exists(requirements_path):
    dbutils.fs.put("file:" + requirements_path, "", True)

%pip install -r $requirements_path
%restart_python

手順 2: Spark DataFrames にデータを読み込む

データ型に応じて、適切なメソッドを使用して Spark DataFrame にデータを読み込みます。

データの種類 メソッド
Unity カタログのテーブル (推奨) table = spark.table(input_table_name)
画像ファイル (JPG、PNG) files_df = spark.createDataFrame(map(lambda path: (path,), file_paths), ["path"])
TFRecords df = spark.read.format("tfrecords").load(image_path)
その他の形式 (Parquet、CSV、JSON、JDBC) Spark データ ソースを使用してロードします。

手順 3: モデル レジストリからモデルを読み込む

この例では、推論に Databricks Model Registry のモデルを使用します。

predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri)

手順 4: Pandas UDF を使用してモデル推論を実行する

Pandas UDF では、効率的なデータ転送のために Apache Arrow を利用し、データ処理には pandas を使用します。 pandas UDF を使用した推論の一般的な手順は次のとおりです。

  1. トレーニング済みモデルを読み込む: MLflow を使用して推論用の Spark UDF を作成します。
  2. 入力データの前処理: 入力スキーマがモデルの要件と一致していることを確認します。
  3. モデル予測の実行: DataFrame でモデルの UDF 関数を使用します。
df_result = df_spark.withColumn("prediction", predict_udf(*df_spark.columns))
  1. (推奨)予測を Unity カタログに保存します。

次の例では、予測を Unity カタログに保存します。

df_result.write.mode("overwrite").saveAsTable(output_table)

モデル推論のパフォーマンス チューニング

このセクションでは、Azure Databricks でのモデル推論のデバッグとパフォーマンス チューニングに関するいくつかのヒントを提供します。 概要については、 Spark DataFrame を使用したバッチ推論の実行に関するページを参照してください。

通常、モデル推論には、データ入力パイプラインとモデル推論という 2 つの主要な部分があります。 データ入力パイプラインはデータ I/O 入力が多く、モデルの推論は計算に負荷がかかります。 ワークフローのボトルネックの特定は簡単です。 いくつかの方法を次に示します。

  • モデルを単純なモデルに減らし、1 秒あたりの例を測定します。 完全なモデルと単純なモデルの間のエンド ツー エンド時間の差が最小限の場合、データ入力パイプラインがボトルネックである可能性が高く、それ以外の場合はモデル推論がボトルネックになります。
  • GPU でモデル推論を実行している場合は、GPU 使用率メトリックを確認 します。 GPU 使用率が継続的に高くない場合は、データ入力パイプラインがボトルネックになる可能性があります。

データ入力パイプラインを最適化する

GPU を使用すると、モデル推論の実行速度を効率的に最適化できます。 GPU やその他のアクセラレータが高速になるにつれて、データ入力パイプラインが需要に対応することが重要です。 データ入力パイプラインは、データを Spark DataFrames に読み取り、変換し、モデル推論の入力として読み込みます。 データ入力がボトルネックである場合、I/O スループットを向上させるヒントを次に示します。

  • バッチあたりの最大レコード数を設定します。 最大レコード数を増やすと、レコードがメモリに収まる限り、UDF 関数を呼び出すための I/O オーバーヘッドを減らすことができます。 バッチ サイズを設定するには、次の構成を設定します。

    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")
    
  • pandas UDF で入力データを前処理するときに、データをバッチで読み込み、プリフェッチします。

    TensorFlow の場合、Azure Databricks では tf.data API を使用することをお勧めします。 num_parallel_calls関数でmapを設定し、プリフェッチとバッチ処理のためにprefetchbatchを呼び出すことで、マップを並列に解析できます。

    dataset.map(parse_example, num_parallel_calls=num_process).prefetch(prefetch_size).batch(batch_size)
    

    PyTorch の場合、Azure Databricks では DataLoader クラスを使用することをお勧めします。 バッチ処理の batch_size と並列データ読み込みの num_workers を設定できます。

    torch.utils.data.DataLoader(images, batch_size=batch_size, num_workers=num_process)
    

バッチ推論の例

このセクションの例は、推奨されているディープ ラーニング推論ワークフローに従っています。 これらの例では、事前トレーニング済みのディープ残差ネットワーク (ResNets) ニューラル ネットワーク モデルを使用してモデル推論を実行する方法を示します。

Spark UDF を使用した構造化データ抽出とバッチ推論

次のノートブック例では、構造化データ抽出用の単純なエージェントの開発、ログ記録、評価を示し、自動化された抽出手法を使用して、未加工の非構造化データを整理された使用可能な情報に変換します。 このアプローチでは、MLflow の PythonModel クラスを使用してバッチ推論用のカスタム エージェントを実装し、ログに記録されたエージェント モデルを Spark User-Defined 関数 (UDF) として使用する方法を示します。 このノートブックでは、Mosaic AI エージェント評価を活用し、典拠とする真偽データを使用して精度を評価する方法も示します。

Spark UDF を使用した構造化データ抽出とバッチ推論

ノートブックを入手