Microsoft Fabricのネイティブ実行エンジンでは、Pythonユーザー定義関数 (UDF)、Scala UDF、複合データ型 (配列、マップ、構造体) がサポートされるようになりました。 これらの機能を使用すると、パフォーマンスを犠牲にすることなく、表現力豊かな Spark アプリケーションを作成できます。
PYTHON UDF のサポート
Pythonは、データ エンジニアリングとデータ サイエンスで最も人気のある言語の 1 つです。 これまで、Python UDF では、JVM プロセスと Python ワーカー プロセス間のシリアル化コストが原因で、Spark で大幅なオーバーヘッドが発生しました。 ネイティブ実行エンジンは、これらの負荷の高い遷移を最小限に抑え、コードを変更せずに高速に実行できるようにします。
ネイティブ実行エンジンで Python UDF がどのように動作するか
従来の Spark 実行モデルでは、PYTHON UDF の実行には次の処理が含まれます。
- Spark の内部形式からのデータ変換。
- シリアル化とPythonワーカー プロセスへの転送。
- Python UDF の実行
- JVM への結果のシリアル化。
- Spark は実行を再開します。
このランタイム間の移動により、シリアル化/デシリアル化のコスト、CPU効率の低下、および列指向実行パイプラインの破綻が引き起こされます。 ネイティブ実行エンジンは、データ転送パスを最適化し、可能な場合はベクター化された処理を維持することで、このオーバーヘッドを削減します。
サポートされているPython UDF の種類
ネイティブ実行エンジンでは、次の機能がサポートされています。
-
Scalar UDF:
udf()に登録された行単位のPython関数。 -
ベクター化された (Pandas) UDF: 効率的な転送のために Apache Arrow を使用してデータのバッチを操作する
@pandas_udfで修飾された関数。
ベクター化された UDF は、ネイティブ実行エンジンの列処理モデルと自然に整合するため、パフォーマンスの向上が最大になります。
例: ベクター化されたPython UDF
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
@pandas_udf(DoubleType())
def calculate_discount(price: pd.Series, rate: pd.Series) -> pd.Series:
return price * (1 - rate)
df = spark.table("sales.transactions")
result = df.withColumn("discounted_price", calculate_discount(df.price, df.discount_rate))
result.show()
ネイティブ実行エンジンを有効にする以外に追加の構成は必要ありません。 既存のPython UDF は自動的に利用できます。
Scala UDF のサポート
ネイティブ実行エンジンでは、Scala UDF も高速化されます。 Scala UDF は JVM でネイティブに実行されるため、エンジンはサポートされている操作をベクター化された C++ 実行パスにオフロードしながら、Scala UDF の評価を同じランタイム内で効率的に保つことができます。
例: Scala UDF
import org.apache.spark.sql.functions.udf
val toUpperCase = udf((s: String) => s.toUpperCase)
val df = spark.table("catalog.customers")
val result = df.withColumn("name_upper", toUpperCase(df("name")))
result.show()
サポートされているデータ型で動作する Scala UDF は、ネイティブ実行エンジンが有効になっている場合、コードを変更せずに高速化されます。
複合データ型のサポート
最新のレイクハウス アーキテクチャは、半構造化データと入れ子になったデータに依存します。 ネイティブ実行エンジンでは、次の機能に対して最適化されたサポートが提供されるようになりました。
| データの種類 | Description | ユースケースの例 |
|---|---|---|
| 配列 | 要素の順序付きコレクション | イベント タグ、製品カテゴリ |
| 地図 | キーと値のペア | 構成プロパティ、メタデータ |
| 構造体 | 異なる型の名前付きフィールド | 入れ子になった顧客レコード、アドレス オブジェクト |
複合型でサポートされる操作
ネイティブ実行エンジンは、複雑なデータ型に対する一般的な操作を高速化します。
- 配列関数:
explode、array_contains、size、flatten、transform - マップ関数:
map_keys、map_values、element_at - 構造体アクセス: ドット表記によるフィールドアクセス、
getField - 入れ子になった組み合わせ: 構造体の配列、配列値を持つマップ
例: 配列と構造体の操作
from pyspark.sql.functions import explode, col, size
# Read data with nested schema
df = spark.table("events.telemetry")
# Operations on arrays - accelerated by native engine
result = (df
.filter(size(col("tags")) > 0)
.select(
col("event_id"),
col("metadata.source"), # Struct field access
explode(col("tags")).alias("tag")
)
)
result.show()
例: マップの操作
from pyspark.sql.functions import map_keys, map_values, col
df = spark.table("config.settings")
# Map operations - accelerated by native engine
result = (df
.select(
col("setting_id"),
map_keys(col("properties")).alias("keys"),
map_values(col("properties")).alias("values")
)
)
result.show()
パフォーマンスの結果
内部ベンチマークでは、Python UDF と複雑なデータ型を使用するワークロード全体で大幅な改善が示されます。
| ワークロードの種類 | パフォーマンスの向上 |
|---|---|
| ベクター化されたPython UDF | 最大 5.76 倍高速 |
| スカラー Python UDF | 最大 1.08 倍高速 |
| TPC-DS エンドツーエンド(複合型あり) | 最大 2.35 倍高速 |
これらの利点は、シリアル化オーバーヘッドの削減、ベクター化の改善、およびエンドツーエンドの列実行によって得られます。
高度なレイクハウス パターンの利点
複雑なデータ型アクセラレーションは、次の場合に特に重要です。
- Z-ORDER 最適化: 入れ子になった列は、最適化されたデータ レイアウトに参加します。
- 液体クラスタリング: 複合型の列は、フラット化せずにクラスタリングを利用できます。
- 半構造化分析: JSON ペイロードとイベント ストリームは、自然なクエリのために入れ子のままです。
- イベント ドリブン アーキテクチャ: テレメトリと IoT データは階層構造を保持します。
パフォーマンスのためにデータをフラット化したりパイプラインを再構築したりする代わりに、高い実行効率を維持しながら、複雑なスキーマで自然に作業します。
機能の有効化
Python UDF、Scala UDF、および複合データ型のサポートは、ネイティブ実行エンジンが有効になっている場合に使用できます。 追加の構成は必要ありません。
ネイティブ実行エンジンを有効にするには、 Fabric Data Engineering のネイティブ実行エンジンを参照してください。
Prerequisites
- ランタイム 1.3 (Apache Spark 3.5) または ランタイム 2.0 (Apache Spark 4.0)。
- 環境、ノートブック、または Spark ジョブ定義レベルで有効になっているネイティブ実行エンジン。
制限事項
- ベクター化されたパス内ですべてのPython ライブラリがサポートされているわけではありません。 任意のPythonオブジェクトのシリアル化を必要とするライブラリは、フォールバックを引き続きトリガーする可能性があります。
- 深く入れ子になった複合型 (構造体のマップの配列など) は、特定の操作のために JVM エンジンにフォールバックする可能性があります。
- ANSI モードは、ネイティブ実行エンジンではサポートされていません。