Partilhar via


Funções escalares definidas pelo usuário - Python

Este artigo contém exemplos de função definida pelo usuário (UDF) do Python. Ele mostra como registrar UDFs, como invocar UDFs e fornece advertências sobre a ordem de avaliação de subexpressões no Spark SQL.

No Databricks Runtime 14.0 e superior, você pode usar funções de tabela definidas pelo usuário (UDTFs) do Python para registrar funções que retornam relações inteiras em vez de valores escalares. Consulte funções de tabela definidas pelo usuário (UDTFs) do Python.

Nota

No Databricks Runtime 12.2 LTS e versões anteriores, UDFs Python e UDFs Pandas não são suportados no Unity Catalog que utiliza o modo de acesso padrão na computação. Os UDFs Python escalares e os UDFs Pandas são suportados no Databricks Runtime 13.3 LTS e versões superiores para todos os modos de acesso.

No Databricks Runtime 13.3 LTS e superior, você pode registrar UDFs Python escalares no Unity Catalog usando sintaxe SQL. Consulte Funções definidas pelo usuário (UDFs) no Unity Catalog.

Registrar uma função como UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Opcionalmente, você pode definir o tipo de retorno do seu UDF. O tipo de retorno padrão é StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Chamar o UDF no Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Usar UDF com DataFrames

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Como alternativa, você pode declarar a mesma UDF usando a sintaxe de anotação:

from pyspark.sql.functions import udf

@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Variantes com UDF

O tipo PySpark para variante é VariantType e os valores são do tipo VariantVal. Para obter informações sobre variantes, consulte Consultar dados de variantes.

from pyspark.sql.types import VariantType

# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
  return VariantVal.parseJson(jsonString)

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
|        {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
  return {"v": VariantVal.parseJson(jsonString)}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
|        {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
  return [VariantVal.parseJson(jsonString)]

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
|           [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
  return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
|         toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+

Ordem de avaliação e verificação nula

O Spark SQL (incluindo SQL e a API DataFrame e Dataset) não garante a ordem de avaliação das subexpressões. Em particular, as entradas de um operador ou função não são necessariamente avaliadas da esquerda para a direita ou em qualquer outra ordem fixa. Por exemplo, as expressões lógicas AND e OR não têm semântica de "curto-circuito" ao serem avaliadas da esquerda para a direita.

Portanto, é perigoso confiar nos efeitos colaterais ou na ordem de avaliação das expressões booleanas, e na ordem das cláusulas WHERE e HAVING, uma vez que tais expressões e cláusulas podem ser reordenadas durante a otimização e o planejamento da consulta. Especificamente, se um UDF depende de semântica de curto-circuito no SQL para verificação nula, não há garantia de que a verificação nula acontecerá antes de invocar o UDF. Por exemplo,

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Esta WHERE cláusula não garante que o strlen UDF seja invocado após a filtragem dos nulos.

Para executar a verificação nula adequada, recomendamos que você siga um destes procedimentos:

  • Tornar a UDF capaz de lidar com valores nulos e fazer a verificação de nulos dentro da própria UDF.
  • Use IF ou CASE WHEN expressões para fazer a verificação nula e invocar a UDF em uma ramificação condicional
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Credenciais de serviço em UDFs Python escalares

UDFs Python escalares podem usar credenciais de serviço Unity Catalog para acessar com segurança serviços de nuvem externos. Isso é útil para integrar operações como tokenização baseada em nuvem, criptografia ou gerenciamento de segredos diretamente em suas transformações de dados.

As credenciais de serviço para UDFs Python escalares são suportadas apenas no SQL warehouse e na computação geral.

Nota

As credenciais de serviço nos UDFs Scalar Python requerem o Databricks Runtime 17.1 ou posterior.

Para criar uma credencial de serviço, consulte Criar credenciais de serviço.

Nota

API específica de UDF para credenciais de serviço:
Em UDFs, use databricks.service_credentials.getServiceCredentialsProvider() para acessar credenciais de serviço.

Isso difere da função dbutils.credentials.getServiceCredentialsProvider() usada em notebooks, mas não está disponível em contextos de execução de UDF.

Para acessar a credencial de serviço, use o databricks.service_credentials.getServiceCredentialsProvider() utilitário em sua lógica UDF para inicializar SDKs de nuvem com a credencial apropriada. Todo o código deve ser encapsulado no corpo UDF.

@udf
def use_service_credential():
    from azure.mgmt.web import WebSiteManagementClient

    # Assuming there is a service credential named 'testcred' set up in Unity Catalog
    web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
    # Use web_client to perform operations

Permissões de credenciais de serviço

O criador da UDF deve ter permissão ACCESS na credencial de serviço do Catálogo Unity.

UDFs que são executados em No-PE escopo, também conhecidos como clusters dedicados, exigem permissões MANAGE na credencial de serviço.

Credenciais padrão

Quando usado em UDFs Python escalares, o Databricks usa automaticamente a credencial de serviço padrão da variável de ambiente de computação. Esse comportamento permite que você faça referência segura a serviços externos sem gerenciar explicitamente aliases de credenciais em seu código UDF. Consulte Especificar uma credencial de serviço padrão para um recurso de computação

O suporte a credenciais padrão só está disponível em clusters de modo de acesso Padrão e Dedicado. Ele não está disponível em DBSQL.

Você deve instalar o azure-identity pacote para usar o DefaultAzureCredential provedor. Para instalar o pacote, consulte Bibliotecas Python com escopo de bloco de anotações ou Bibliotecas com escopo de computação.

@udf
def use_service_credential():
    from azure.identity import DefaultAzureCredential
    from azure.mgmt.web import WebSiteManagementClient

    # DefaultAzureCredential is automatically using the default service credential for the compute
    web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)

    # Use web_client to perform operations

Obter contexto de execução da tarefa

Utilize a API PySpark do TaskContext para obter informações de contexto, como identidade do utilizador, tags do cluster, ID do trabalho Spark e mais. Veja Obter o contexto da tarefa num UDF.

Limitações

As seguintes limitações se aplicam aos UDFs do PySpark:

  • Restrições de acesso a ficheiros: No Databricks Runtime 14.2 e inferior, as UDFs do PySpark em clusters compartilhados não podem acessar pastas Git, arquivos de espaço de trabalho ou volumes de catálogo Unity.

  • Variáveis de transmissão: UDFs do PySpark em clusters de modo de acesso padrão e computação sem servidor não suportam variáveis de difusão.

  • Credenciais de serviço: As credenciais de serviço estão disponíveis apenas em UDFs Python do Batch Unity Catalog e UDFs Python Scalar. Eles não são suportados em UDFs Python padrão do Unity Catalog.

  • Credenciais de serviço: As credenciais de serviço só estão disponíveis em computação sem servidor ao usar o ambiente sem servidor versão 3 ou superior. Consulte as versões do ambiente sem servidor .

  • Limite de memória em serverless: PySpark UDFs em computação sem servidor têm um limite de memória de 1GB por PySpark UDF. Exceder esse limite resulta em um erro do tipo UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
  • Limite de memória no modo de acesso padrão: UDFs PySpark no modo de acesso padrão têm um limite de memória baseado na memória disponível do tipo de instância escolhido. Exceder a memória disponível resulta em um erro do tipo UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.