次の方法で共有


ユーザー定義スカラー関数 - Python

この記事には、Python ユーザー定義関数 (UDF) の例が含まれています。 ここでは、UDF を登録する方法、UDF を呼び出す方法、Spark SQL における部分式の評価順序に関する注意点を示します。

Requirements

  • Databricks Runtime 12.2 LTS 以下では、標準アクセス モードを使用する Unity カタログ コンピューティングでは、Python UDF と Pandas UDF はサポートされていません。

  • スカラー Python UDF と Pandas UDF は、Databricks Runtime 13.3 LTS 以降で、すべてのアクセス モードでサポートされています。

  • Unity カタログ対応クラスターでの Python UDF の ARM インスタンスのサポートには、Databricks Runtime 15.2 以降が必要です。

Databricks Runtime 14.0 以下では、標準アクセス モードを使用する Unity カタログ クラスターでは、Python UDF と Pandas UDF はサポートされていません。 スカラー Python UDF と Pandas UDF は、Databricks Runtime 14.1 以降のすべてのアクセス モードでサポートされています。

Databricks Runtime 14.1 以降では、SQL 構文を使用してスカラー Python UDF を Unity カタログに登録できます。 「Unity Catalog のユーザー定義関数 (UDF)」を参照してください。

関数を UDF として登録する

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

必要に応じて、UDF の戻り値の型を設定できます。 既定の戻り値の型は StringType です。

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Spark SQL で UDF を呼び出す

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

DataFrame で UDF を使用する

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

また、次のように注釈構文を使用して同じ UDF を宣言することもできます。

from pyspark.sql.functions import udf

@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

UDF を使用したバリアント

バリアントの PySpark 型は VariantType で、値の型は VariantVal です。 バリアントの詳細については、「 クエリバリアントデータ」を参照してください。

from pyspark.sql.types import VariantType

# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
  return VariantVal.parseJson(jsonString)

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
|        {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
  return {"v": VariantVal.parseJson(jsonString)}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
|        {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
  return [VariantVal.parseJson(jsonString)]

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
|           [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
  return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
|         toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+

評価順序と null チェック

Spark SQL (SQL、DataFrame、データセット API を含む) では、部分式の評価の順序は保証されません。 特に、演算子や関数の入力は、必ずしも左から右へ、またはその他の決まった順序で評価されるとは限りません。 たとえば、AND および OR 論理式には、左から右への "短絡" セマンティクスはありません。

したがって、クエリの最適化および計画の際に式や句の順序は並べ替えられる可能性があるため、ブール式の副作用や評価の順序および WHEREHAVING 句の順序に依存することは危険です。 具体的には、UDF が NULL チェックのために SQL のショートサーキット セマンティクスに依存している場合、UDF を呼び出す前に null チェックが行われるという保証はありません。 たとえば、次のように入力します。

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

この WHERE 句を使用しても、null を除外した後に strlen UDF が呼び出されることは保証されません。

適切な null チェックを実行するには、次のいずれかを実行することをお勧めします。

  • UDF 自体を null で認識し、UDF 自体の内部で null チェックを実行する
  • IF または CASE WHEN 式を使用して null チェックを実行し、条件分岐で UDF を呼び出す
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

スカラー Python UDF 内のサービス用資格情報

スカラー Python UDF は、Unity カタログ サービスの資格情報を使用して、外部クラウド サービスに安全にアクセスできます。 これは、クラウドベースのトークン化、暗号化、シークレット管理などの操作をデータ変換に直接統合する場合に便利です。

スカラー Python UDF のサービス資格情報は、SQL ウェアハウスと一般的なコンピューティングでのみサポートされます。

スカラー Python UDF のサービス資格情報には、Databricks Runtime 17.1 以降が必要です。

サービス資格情報を作成するには、「 サービス資格情報の作成」を参照してください。

サービス資格情報の UDF 固有の API:
UDF では、 databricks.service_credentials.getServiceCredentialsProvider() を使用してサービス資格情報にアクセスします。

これは、UDF 実行コンテキストでは使用できないノートブックで使用される dbutils.credentials.getServiceCredentialsProvider() 関数とは異なります。

サービス資格情報にアクセスするには、UDF ロジックの databricks.service_credentials.getServiceCredentialsProvider() ユーティリティを使用して、適切な資格情報でクラウド SDK を初期化します。 すべてのコードを UDF 本文にカプセル化する必要があります。

@udf
def use_service_credential():
    from azure.mgmt.web import WebSiteManagementClient

    # Assuming there is a service credential named 'testcred' set up in Unity Catalog
    web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
    # Use web_client to perform operations

サービス資格情報のアクセス許可

UDF の作成者は、Unity カタログ サービスの資格情報に対する ACCESS アクセス許可を持っている必要があります。

No-PE スコープ (専用クラスターとも呼ばれます) で実行される UDF には、サービス資格情報に対する MANAGE アクセス許可が必要です。

既定の資格情報

スカラー Python UDF で使用すると、Databricks はコンピューティング環境変数の既定のサービス資格情報を自動的に使用します。 この動作により、UDF コードで資格情報のエイリアスを明示的に管理することなく、外部サービスを安全に参照できます。 「コンピューティング リソースの既定のサービス資格情報を指定する」を参照してください

既定の資格情報のサポートは、Standard および Dedicated アクセス モードのクラスターでのみ使用できます。 DBSQL では使用できません。

azure-identity プロバイダーを使用するには、DefaultAzureCredential パッケージをインストールする必要があります。 パッケージをインストールするには、 Notebook スコープの Python ライブラリ または コンピューティング スコープ ライブラリを参照してください。

@udf
def use_service_credential():
    from azure.identity import DefaultAzureCredential
    from azure.mgmt.web import WebSiteManagementClient

    # DefaultAzureCredential is automatically using the default service credential for the compute
    web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)

    # Use web_client to perform operations

タスク実行コンテキストを取得する

TaskContext PySpark API を使用して、ユーザーの ID、クラスター タグ、Spark ジョブ ID などのコンテキスト情報を取得します。 UDF でのタスク コンテキストの取得を参照してください。

制限事項

PySpark UDF には、次の制限事項が適用されます。

  • ファイル アクセスの制限: Databricks Runtime 14.2 以下では、共有クラスター上の PySpark UDF は Git フォルダー、ワークスペース ファイル、または Unity カタログ ボリュームにアクセスできません。

  • ブロードキャスト変数: 標準アクセス モード クラスターとサーバーレス コンピューティング上の PySpark UDF は、ブロードキャスト変数をサポートしていません。

  • サービス資格情報: サービス資格情報は、Batch Unity カタログ Python UDF とスカラー Python UDF でのみ使用できます。 これらは、標準の Unity カタログ Python UDF ではサポートされていません。

  • サービス資格情報: サービス資格情報は、サーバーレス環境バージョン 3 以降を使用している場合にのみ、サーバーレス コンピューティングで使用できます。 「サーバーレス環境のバージョン」を参照してください。

  • サーバーレスでのメモリ制限: サーバーレス コンピューティング上の PySpark UDF のメモリ制限は、PySpark UDF あたり 1 GB です。 この制限を超えると、エラーの種類UDF_PYSPARK_USER_CODE_ERROR.MEMORY_LIMIT_SERVERLESSが発生します。
  • 標準アクセス モードのメモリ制限: 標準アクセス モードの PySpark UDF には、選択したインスタンスの種類の使用可能なメモリに基づくメモリ制限があります。 使用可能なメモリを超えると、UDF_PYSPARK_USER_CODE_ERROR型のエラーが発生します 。MEMORY_LIMIT