ユーザー定義関数 (UDF) を使用すると、Azure Databricks の組み込み機能を拡張するコードを再利用して共有できます。 UDF を使用して、複雑な計算、変換、カスタム データ操作などの特定のタスクを実行します。
UDF と Apache Spark 関数を使用する場合
組み込みの Apache Spark 関数では表現が困難なロジックには UDF を使用します。 組み込みの Apache Spark 関数は分散処理用に最適化されており、大規模なパフォーマンスが向上します。 詳細については、「関数」を参照してください。
Databricks では、アドホック クエリ、手動データ クレンジング、探索的データ分析、および小規模から中規模のデータセットに対する操作に UDF をお勧めします。 UDF の一般的なユース ケースには、データの暗号化、暗号化解除、ハッシュ、JSON 解析、検証などがあります。
Apache Spark メソッドは、非常に大規模なデータセットに対する操作に使用します。また、ETL ジョブやストリーミング操作など、ワークロードは定期的または継続的に実行されます。
UDF の種類について
次のタブから UDF の種類を選択すると、説明、例、詳細を確認するためのリンクが表示されます。
スカラー UDF
スカラー UDF は 1 つの行で動作し、各行に対して 1 つの結果値を返します。 それらは、Unity Catalog による管理下に置かれたり、セッション スコープにすることができます。
次の例では、スカラー UDF を使用して、 name
列の各名前の長さを計算し、新しい列 name_length
に値を追加します。
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
PySpark を使用して Databricks ノートブックにこれを実装するには:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
Unity カタログのユーザー定義関数 (UDF) とユーザー定義スカラー関数 (Python) を参照してください。
バッチ スカラー UDF
1 対 1 の入力/出力行パリティを維持しながら、データをバッチ処理します。 これにより、大規模なデータ処理に対する行ごとの操作のオーバーヘッドが軽減されます。 また、バッチ UDF はバッチ間の状態を維持して、より効率的に実行し、リソースを再利用し、データ チャンク間でコンテキストを必要とする複雑な計算を処理します。
それらは、Unity Catalog による管理下に置かれたり、セッション スコープにすることができます。
次の Batch Unity カタログ Python UDF は、行のバッチの処理中に BMI を計算します。
+-------------+-------------+
| weight_kg | height_m |
+-------------+-------------+
| 90 | 1.8 |
| 77 | 1.6 |
| 50 | 1.5 |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
| BMI |
+--------+
| 27.8 |
| 30.1 |
| 22.2 |
+--------+
Unity カタログのユーザー定義関数 (UDF) と Unity カタログのBatch Python ユーザー定義関数 (UDF) を参照してください。
非スカラー UDF
非スカラー UDF は、柔軟な入力/出力比 (1:N または多:多) を持つデータセット/列全体で動作します。
セッション スコープのバッチ Pandas UDF には、次の種類があります。
- Series から Series
- Series の反復子から Series の反復子
- 複数の Series の反復子から Series の反復子
- 系列からスカラーへ
Series から Series への pandas UDF の例を次に示します。
from pyspark.sql.functions import pandas_udf
import pandas as pd
df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])
@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
return weight / (height ** 2)
df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()
pandas ユーザー定義関数を参照してください。
UDAF
UDF は複数の行で動作し、1 つの集計結果を返します。 UDAF はセッション スコープに限定されています。
次の UDAF の例では、名前の長さでスコアを集計します。
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
Python およびユーザー定義集計関数の pandas ユーザー定義関数 (Scala) を参照してください。
UDTF
UDTF は 1 つ以上の入力引数を受け取り、入力行ごとに複数の行 (および場合によっては複数の列) を返します。 UDTF はセッション スコープに限定されています。
次の例では、スコア列の各値がカテゴリの一覧に対応しています。 UDTF は、コンマ区切りのリストを複数の行に分割します。
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="name: string, score: int, category: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: int, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
ScoreCategoriesUDTF(lit("Alice"), lit(85), lit("Math,Science,English")).display()
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
Python ユーザー定義テーブル関数 (UDF) を参照してください。
Unity カタログで管理されるUDFとセッションでスコープされるUDF
Unity カタログ Python UDF と Batch Unity カタログ Python UDF は、ガバナンス、再利用、および検出可能性を向上させるために Unity カタログに保持されます。 他のすべての UDF はセッション ベースです。つまり、ノートブックまたはジョブで定義され、現在の SparkSession にスコープが設定されます。 Scala または Python を使用して、セッション スコープの UDF を定義してアクセスできます。
Unity Catalog 管理 UDF クイック ガイド
Unity カタログで管理される UDF を使用すると、カスタム関数を定義、使用、安全に共有し、コンピューティング環境全体で管理できます。 「Unity Catalog のユーザー定義関数 (UDF)」を参照してください。
UDF の種類 | サポートされている計算処理 | 説明 |
---|---|---|
Unity カタログ Python UDF |
|
Python で UDF を定義し、ガバナンスのために Unity カタログに登録します。 スカラー UDF は 1 つの行で動作し、各行に対して 1 つの結果値を返します。 |
バッチ Unity Catalog Python UDF |
|
Python で UDF を定義し、ガバナンスのために Unity カタログに登録します。 複数の値に対するバッチ操作と複数の値の返し。 大規模なデータ処理に対する行ごとの操作のオーバーヘッドを軽減します。 |
ユーザー分離コンピューティング用のセッション スコープ UDF クイック ガイド
セッション スコープ UDF はノートブックまたはジョブで定義され、現在の SparkSession にスコープが設定されます。 Scala または Python を使用して、セッション スコープの UDF を定義してアクセスできます。
UDF の種類 | サポートされている計算処理 | 説明 |
---|---|---|
Python スカラー |
|
スカラー UDF は 1 つの行で動作し、各行に対して 1 つの結果値を返します。 |
Python 非スカラー |
|
非スカラー UDF には、 pandas_udf 、 mapInPandas 、 mapInArrow 、 applyInPandas が含まれます。 Pandas UDF は Apache Arrow を使用してデータを転送し、Pandas はデータを操作します。 Pandas UDF は、行ごとのスカラー UDF よりもパフォーマンスを大幅に向上させるベクター化された操作をサポートします。 |
Python UDTF |
|
UDTF は 1 つ以上の入力引数を受け取り、入力行ごとに複数の行 (および場合によっては複数の列) を返します。 |
Scala スカラー UDF |
|
スカラー UDF は 1 つの行で動作し、各行に対して 1 つの結果値を返します。 |
Scala UDAF |
|
UDF は複数の行で動作し、1 つの集計結果を返します。 |
パフォーマンスに関する考慮事項
Scala UDF は 通常、Python UDF よりも高速です。
- 非分離の Scala UDF は Java 仮想マシン (JVM) で実行されるため、JVM 内外へのデータの移動に伴うオーバーヘッドを回避できます。
- 分離された Scala UDF は JVM との間でデータを移動する必要がありますが、メモリをより効率的に処理するため、Python UDF よりも高速です。
Python UDF と pandas UDF は 、データをシリアル化して JVM から Python インタープリターに移動する必要があるため、Scala UDF よりも低速になる傾向があります。
- Pandas UDF は、シリアル化コストを削減するために Apache Arrow を使用するため、Python UDF よりも最大 100 倍高速です。