Compartilhar via


Funções escalares definidas pelo usuário – Python

Este artigo contém exemplos de UDF (função definida pelo usuário) do Python. Ele mostra como registrar UDFs, como invocar UDFs e fornece advertências sobre a ordem de avaliação de subexpressões no Spark SQL.

No Databricks Runtime 14.0 e superior, você pode usar funções de tabela definidas pelo usuário (UDTFs) do Python para registrar funções que retornam relações inteiras em vez de valores escalares. Consulte Funções de tabela definidas pelo usuário (UDTFs) em Python.

Observação

No Databricks Runtime 12.2 LTS e versões anteriores, não há suporte para UDFs do Python e do Pandas no Catálogo do Unity na computação que usa o modo de acesso padrão. As UDFs escalares do Python e as UDFs do Pandas têm suporte no Databricks Runtime 13.3 LTS e superior para todos os modos de acesso.

No Databricks Runtime 13.3 LTS e versões superiores, é possível registrar UDFs escalares do Python no Catálogo do Unity usando a sintaxe SQL. Consulte UDFs (funções definidas pelo usuário) no Catálogo do Unity.

Registrar uma função como uma UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Opcionalmente, você pode definir o tipo de retorno de seu UDF. O tipo de retorno padrão é StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Chamar a UDF no Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Usar UDF com DataFrames

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Como alternativa, você pode declarar o mesmo UDF usando a sintaxe de anotação:

from pyspark.sql.functions import udf

@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Variantes com UDF

O tipo PySpark para variante é VariantType e os valores são do tipo VariantVal. Para obter informações sobre variantes, consulte os dados de variante de consulta.

from pyspark.sql.types import VariantType

# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
  return VariantVal.parseJson(jsonString)

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
|        {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
  return {"v": VariantVal.parseJson(jsonString)}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
|        {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
  return [VariantVal.parseJson(jsonString)]

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
|           [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
  return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
|         toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+

Ordem de avaliação e verificação de nulos

O SQL do Spark (incluindo o SQL e o DataFrame e a API do Conjunto de Dados) não garante a ordem de avaliação de subexpressões. Em especial, as entradas de um operador ou função não são necessariamente avaliadas da esquerda para a direita ou em qualquer outra ordem fixa. Por exemplo, as expressões lógicas AND e OR não têm a semântica de “curto-circuito” da esquerda para a direita.

Portanto, é perigoso confiar nos efeitos colaterais ou na ordem de avaliação de expressões booleanas, e na ordem das cláusulas WHERE e HAVING, pois essas expressões e cláusulas podem ser reordenadas durante a otimização e o planejamento da consulta. Especificamente, se uma UDF depender de semântica de curto-circuito no SQL para verificação nula, não há nenhuma garantia de que a verificação nula ocorrerá antes de invocar a UDF. Por exemplo,

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Essa cláusula WHERE não garante que a UDF strlen seja invocada após a filtragem de nulos.

Para executar a verificação nula adequada, é recomendável que você faça o seguinte:

  • Fazer a própria UDF reconhecer nulos e verificar nulos dentro da própria UDF
  • Usar IF expressões ou CASE WHEN para fazer a verificação nula e invocar a UDF em uma ramificação condicional
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Credenciais de serviço em UDFs escalares do Python

UDFs escalares do Python podem usar credenciais de serviço do Catálogo do Unity para acessar com segurança serviços de nuvem externos. Isso é útil para integrar operações como tokens baseados em nuvem, criptografia ou gerenciamento de segredos diretamente em suas transformações de dados.

As credenciais de serviço para UDFs escalares do Python só têm suporte no SQL Warehouse e na computação geral.

Observação

As credenciais de serviço em UDFs escalares do Python exigem o Databricks Runtime 17.1 e superior.

Para criar uma credencial de serviço, consulte Criar credenciais de serviço.

Observação

API específica da UDF para credenciais de serviço:
Em UDFs, use databricks.service_credentials.getServiceCredentialsProvider() para acessar credenciais de serviço.

Isso difere da dbutils.credentials.getServiceCredentialsProvider() função usada em notebooks, que não está disponível nos contextos de execução da UDF.

Para acessar a credencial de serviço, use o databricks.service_credentials.getServiceCredentialsProvider() utilitário em sua lógica UDF para inicializar SDKs de nuvem com a credencial apropriada. Todo o código deve ser encapsulado no corpo da UDF.

@udf
def use_service_credential():
    from azure.mgmt.web import WebSiteManagementClient

    # Assuming there is a service credential named 'testcred' set up in Unity Catalog
    web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
    # Use web_client to perform operations

Permissões de credenciais de serviço

O criador da UDF deve ter permissão ACCESS na credencial de serviço do Catálogo do Unity.

UDFs executados no escopo No-PE, também conhecidos como clusters dedicados, exigem permissões MANAGE na credencial de serviço.

Credenciais padrão

Quando usado em UDFs escalares do Python, o Databricks usa automaticamente a credencial de serviço padrão da variável de ambiente de computação. Esse comportamento permite que você faça referência segura a serviços externos sem gerenciar explicitamente aliases de credencial em seu código UDF. Consulte Especificar uma credencial de serviço padrão para um recurso de computação

O suporte à credencial padrão só está disponível em clusters de modo de acesso Standard e Dedicado. Ele não está disponível no DBSQL.

Você deve instalar o azure-identity pacote para usar o DefaultAzureCredential provedor. Para instalar o pacote, consulte bibliotecas python com escopo de bloco de anotações ou bibliotecas com escopo de computação.

@udf
def use_service_credential():
    from azure.identity import DefaultAzureCredential
    from azure.mgmt.web import WebSiteManagementClient

    # DefaultAzureCredential is automatically using the default service credential for the compute
    web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)

    # Use web_client to perform operations

Obter o contexto de execução da tarefa

Use a API do TaskContext PySpark para obter informações de contexto, como identidade do usuário, marcas de cluster, ID do trabalho do Spark e muito mais. Veja Como obter o contexto de tarefa em uma UDF.

Limitações

As seguintes limitações se aplicam a UDFs do PySpark:

  • Restrições de acesso a arquivos: No Databricks Runtime 14.2 e abaixo, UDFs do PySpark em clusters compartilhados não podem acessar pastas git, arquivos de workspace ou Volumes de Catálogo do Unity.

  • Variáveis de difusão: UDFs do PySpark em clusters de modo de acesso padrão e computação sem servidor não dão suporte a variáveis de difusão.

  • Credenciais de serviço: As credenciais de serviço estão disponíveis apenas em UDFs do Python do Catálogo do Unity do Lote e UDFs escalares do Python. Eles não têm suporte nas UDFs padrão do Python do Catálogo do Unity.

  • Credenciais de serviço: as credenciais de serviço só estão disponíveis na computação sem servidor ao usar o ambiente sem servidor versão 3 ou superior. Consulte Versões do ambiente sem servidor.

  • Limite de memória sem servidor: UDFs do PySpark na computação sem servidor têm um limite de memória de 1 GB por UDF do PySpark. Exceder esse limite resulta em um erro de tipo UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
  • Limite de memória no modo de acesso padrão: UDFs do PySpark no modo de acesso padrão têm um limite de memória com base na memória disponível do tipo de instância escolhido. Exceder a memória disponível resulta em um erro de tipo UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.