Partilhar via


Funções Python Definidas pelo Usuário (UDFs) em Lote no Unity Catalog

Importante

Este recurso está no Public Preview.

As UDFs Python do Batch Unity Catalog estendem os recursos das UDFs do Unity Catalog permitindo que você escreva código Python para operar em lotes de dados, melhorando significativamente a eficiência ao reduzir a sobrecarga associada a UDFs linha a linha. Essas otimizações tornam os UDFs Python em lote do Unity Catalog ideais para processamento de dados em larga escala.

Requerimentos

As UDFs Python do Batch Unity Catalog requerem as versões 16.3 e superiores do Databricks Runtime.

Criar UDF em Python no Unity Catalog em Batch

Criar um Batch Unity Catalog Python UDF é semelhante à criação de um Unity Catalog UDF regular, com as seguintes adições:

  • PARAMETER STYLE PANDAS: Isso especifica que o UDF processa dados em lotes usando iteradores pandas.
  • HANDLER 'handler_function': Especifica a função do manipulador que é chamada para processar os lotes.

O exemplo a seguir mostra como criar um Batch Unity Catalog Python UDF:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

Depois de registrar o UDF, você pode chamá-lo usando SQL ou Python:

SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
  SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
  SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);

Função do manipulador UDF em lote

UDFs Python do Batch Unity Catalog requerem uma função de manipulador que processa lotes e produz resultados. Você deve especificar o nome da função do manipulador ao criar o UDF usando a HANDLER chave.

A função handler faz o seguinte:

  1. Aceita um argumento iterador que itera sobre um ou mais pandas.Series. Cada um pandas.Series contém os parâmetros de entrada do UDF.
  2. Itera sobre o gerador e processa os dados.
  3. Retorna um iterador de geradores.

UDFs Python do Batch Unity Catalog devem retornar o mesmo número de linhas que a entrada. A função handler garante isso produzindo um pandas.Series com o mesmo comprimento da série de entrada para cada lote.

Instalar dependências personalizadas

Você pode estender a funcionalidade dos UDFs Python do Batch Unity Catalog além do ambiente Databricks Runtime definindo dependências personalizadas para bibliotecas externas.

Consulte Estender UDFs usando dependências personalizadas.

UDFs de lote podem aceitar parâmetros únicos ou múltiplos

Parâmetro único: Quando a função handler usa um único parâmetro de entrada, ela recebe um iterador que itera sobre a pandas.Series para cada lote.

%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  for value_batch in batch_iter:
    d = {"min": value_batch.min(), "max": value_batch.max()}
    yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;

Vários parâmetros: Para vários parâmetros de entrada, a função handler recebe um iterador que itera sobre vários pandas.Series. Os valores na série estão na mesma ordem que os parâmetros de entrada.

%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for p1, p2 in batch_iter: # same order as arguments above
    yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);

Otimize o desempenho separando operações dispendiosas

Você pode otimizar operações computacionalmente caras separando essas operações da função manipulador. Isso garante que eles sejam executados apenas uma vez, em vez de durante cada iteração em lotes de dados.

O exemplo a seguir mostra como garantir que um cálculo caro seja executado apenas uma vez:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
  # expensive computation...
  return 1

expensive_value = compute_value()
def handler_func(batch_iter):
  for batch in batch_iter:
    yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL

Limites de isolamento e segurança

Observação

Os ambientes de isolamento compartilhado exigem o Databricks Runtime 17.1 e superior. Em versões anteriores, todos os Batch Unity Catalog Python UDF são executados no modo de isolamento estrito.

UDFs Python do Batch Unity Catalog com o mesmo proprietário podem compartilhar um ambiente de isolamento por padrão. Isso pode melhorar o desempenho e reduzir o uso de memória, reduzindo o número de ambientes separados que precisam ser iniciados.

Isolamento rigoroso

Para garantir que uma UDF seja sempre executada em seu próprio ambiente totalmente isolado, adicione a STRICT ISOLATION cláusula característica.

A maioria das UDFs não precisa de isolamento estrito. As UDFs de processamento de dados padrão se beneficiam do ambiente de isolamento compartilhado padrão e são executadas mais rapidamente com menor consumo de memória.

Adicione a STRICT ISOLATION cláusula característica aos UDF que:

  • Execute a entrada como código usando eval(), exec()ou funções semelhantes
  • Gravar arquivos no sistema de arquivos local
  • Modificar variáveis globais ou o estado do sistema
  • Acessar ou modificar variáveis de ambiente

O exemplo a seguir mostra um UDF que executa a entrada como código e requer isolamento estrito:

CREATE OR REPLACE TEMPORARY FUNCTION eval_string(input STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
STRICT ISOLATION
AS $$
import pandas as pd
from typing import Iterator

def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  for code_series in batch_iter:
    def eval_func(code):
      try:
        return str(eval(code))
      except Exception as e:
        return f"Error: {e}"
    yield code_series.apply(eval_func)
$$;

Credenciais de serviço no Batch Unity Catalog Python UDFs

As UDFs Python do Batch Unity Catalog podem usar as credenciais do Unity Catalog Service para acessar serviços de nuvem externos. Isso é particularmente útil para integrar funções de nuvem, como tokenizadores de segurança, em fluxos de trabalho de processamento de dados.

Observação

API específica de UDF para credenciais de serviço:
Em UDFs, use databricks.service_credentials.getServiceCredentialsProvider() para acessar credenciais de serviço.

Isso difere da função dbutils.credentials.getServiceCredentialsProvider() usada em notebooks, mas não está disponível em contextos de execução de UDF.

Para criar uma credencial de serviço, consulte Criar credenciais de serviço.

Especifique a credencial de serviço que você deseja usar na CREDENTIALS cláusula na definição UDF:

CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
  `credential-name` DEFAULT,
  `complicated-credential-name` AS short_name,
  `simple-cred`,
  cred_no_quotes
)
AS $$
# Python code here
$$;

Permissões de credenciais de serviço

O criador do UDF deve possuir a ACCESS permissão nas credenciais de serviço do Catálogo Unity. No entanto, para os utilizadores do UDF, é suficiente conceder-lhes permissão em EXECUTE UDF. Em particular, os chamadores UDF não precisam de acesso à credencial de serviço subjacente, porque o UDF é executado usando as permissões de credenciais do criador UDF.

Para funções temporárias, o criador é sempre o invocador. UDFs que são executados no escopoNo-PE, também conhecidos como clusters dedicados, usam as permissões do chamador.

Credenciais e aliases padrão

Você pode incluir várias credenciais na cláusula CREDENTIALS, mas apenas uma pode ser marcada como DEFAULT. Você pode aliar credenciais não padrão usando a palavra-chave AS. Cada credencial deve ter um alias exclusivo.

SDKs de nuvem corrigidos selecionam automaticamente as credenciais padrão. A credencial padrão tem precedência sobre qualquer padrão especificado na configuração do Spark da computação e persiste na definição UDF do Unity Catalog.

Você deve instalar o azure-identity pacote para usar o DefaultAzureCredential provedor. Utilize a cláusula ENVIRONMENT para instalar bibliotecas externas. Para saber mais sobre como instalar bibliotecas externas, consulte Estender UDFs usando dependências personalizadas.

Exemplo de credencial de serviço - Azure Functions

O exemplo a seguir usa uma credencial de serviço para chamar uma função do Azure de um UDF Python do Batch Unity Catalog:

%sql
CREATE OR REPLACE FUNCTION main.test.call_azure_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
  dependencies = '["azure-identity", "requests"]', environment_version = 'None'
)
AS $$
import json
import pandas as pd
import requests
from azure.identity import DefaultAzureCredential
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # DefaultAzureCredential automatically uses the DEFAULT credential
  credential = DefaultAzureCredential()
  token = credential.get_token("https://management.azure.com/.default")

  # Azure Function URL
  function_url = "https://your-function-app.azurewebsites.net/api/HashValuesFunction"

  # Propagate TaskContext information:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = {
      "values": vals.to_list(),
      "is_debug": bool(is_debug[0]),
      "context": user_ctx
    }

    headers = {
      "Authorization": f"Bearer {token.token}",
      "Content-Type": "application/json"
    }

    response = requests.post(function_url, json=payload, headers=headers)

    if response.status_code != 200:
      raise Exception(f"Function call failed: {response.text}")

    response_data = response.json()
    if "errorMessage" in response_data:
      raise Exception(str(response_data))

    yield pd.Series(response_data["values"])
$$;

Chame a UDF depois de ser registada:

SELECT main.test.call_azure_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

Obter contexto de execução da tarefa

Utilize a API PySpark do TaskContext para obter informações de contexto, como identidade do utilizador, tags do cluster, ID do trabalho Spark e mais. Veja Obter o contexto da tarefa num UDF.

Definir DETERMINISTIC se a sua função produz resultados consistentes

Adicione DETERMINISTIC à sua definição de função se ela produzir as mesmas saídas para as mesmas entradas. Isso permite otimizações de consulta para melhorar o desempenho.

Por padrão, UDTFs Python do Batch Unity Catalog são assumidos como não-deterministioc a menos que explicitamente declarados. Exemplos de funções não determinísticas incluem: gerar valores aleatórios, acessar horas ou datas atuais ou fazer chamadas de API externas.

Veja CREATE FUNCTION (SQL e Python)

Limitações

  • As funções Python devem manipular NULL valores de forma independente e todos os mapeamentos de tipo devem seguir os mapeamentos da linguagem SQL do Azure Databricks.
  • As UDFs Python do Batch Unity Catalog são executadas em um ambiente seguro e isolado e não têm acesso a um sistema de arquivos compartilhado ou serviços internos.
  • As várias invocações de UDFs dentro de um estágio são serializadas, e resultados intermediários são materializados e podem ser despejados no disco.
  • As credenciais de serviço estão disponíveis apenas em UDFs Python do Batch Unity Catalog e UDFs Python Scalar. Eles não são suportados em UDFs Python padrão do Unity Catalog.
  • Em clusters dedicados e para funções temporárias, é necessário que quem chamar a função tenha ACCESS autorização nas credenciais de serviço. Consulte Conceder permissões para usar uma credencial de serviço para acessar um serviço de nuvem externo.
  • Habilite o recurso de visualização pública Habilite a rede para UDFs em SQL Warehouses sem servidor na página Visualizações do seu espaço de trabalho para fazer chamadas UDF Python do Batch Unity Catalog para serviços externos na computação de SQL warehouse sem servidor.
  • Para fazer chamadas UDF Python do Batch Unity Catalog em um notebook sem servidor ou computação de trabalho, você deve configurar o controle de saída sem servidor