Compartir a través de


Funciones de Python definidas por el usuario por lotes en Unity Catalog

Importante

Esta característica está en versión preliminar pública.

Las UDF de Python del catálogo de Unity por lotes amplían las funcionalidades de las UDF del catálogo de Unity al permitirle escribir código de Python para que funcione en lotes de datos, lo que mejora significativamente la eficacia al reducir la sobrecarga asociada a las UDF de fila a fila. Estas optimizaciones hacen que las UDF de Python por lotes del catálogo de Unity sean ideales para el procesamiento de datos a gran escala.

Requisitos

Las UDF de Python del catálogo de Unity de Batch requieren las versiones 16.3 y posteriores de Databricks Runtime.

Crear UDF de Python por lotes en Unity Catalog

La creación de una UDF de Python del catálogo de Batch unity es similar a la creación de una UDF normal del catálogo de Unity, con las siguientes adiciones:

  • PARAMETER STYLE PANDAS: especifica que la UDF procesa los datos en lotes mediante iteradores pandas.
  • HANDLER 'handler_function': especifica la función de controlador a la que se llama para procesar los lotes.

En el ejemplo siguiente se muestra cómo crear una UDF de Python del catálogo de Batch Unity:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

Después de registrar la UDF, puede llamarla mediante SQL o Python:

SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
  SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
  SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);

Función de controlador de UDF por lotes

Las UDF de Python del catálogo de Unity por lotes requieren una función de controlador que procesa lotes y genera resultados. Debe especificar el nombre de la función de controlador al crear la UDF mediante la HANDLER clave .

La función de controlador hace lo siguiente:

  1. Acepta un argumento de iterador que recorre en iteración uno o varios pandas.Series. Cada pandas.Series contiene los parámetros de entrada de la UDF.
  2. Recorre en iteración los datos del generador y los procesa.
  3. Devuelve un iterador generador.

Las UDF de Python del catálogo de Unity por lotes deben devolver el mismo número de filas que la entrada. La función de controlador garantiza esto produciendo un pandas.Series con la misma longitud que la serie de entrada para cada lote.

Instalación de dependencias personalizadas

Puede ampliar la funcionalidad de las UDF de Python del catálogo de Unity Batch más allá del entorno de runtime de Databricks mediante la definición de dependencias personalizadas para bibliotecas externas.

Consulte Extensión de UDF mediante dependencias personalizadas.

Las UDF de Batch pueden aceptar parámetros únicos o varios

Parámetro único: Cuando la función de controlador usa un único parámetro de entrada, recibe un iterador que recorre en iteración un pandas.Series para cada lote.

%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  for value_batch in batch_iter:
    d = {"min": value_batch.min(), "max": value_batch.max()}
    yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;

Varios parámetros: Para varios parámetros de entrada, la función de controlador recibe un iterador que recorre en iteración varios pandas.Series. Los valores de la serie están en el mismo orden que los parámetros de entrada.

%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for p1, p2 in batch_iter: # same order as arguments above
    yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);

Optimización del rendimiento mediante la separación de operaciones costosas

Puede optimizar las operaciones costosas computacionalmente separando estas operaciones de la función del controlador. Esto garantiza que se ejecuten solo una vez en lugar de durante cada iteración sobre lotes de datos.

En el ejemplo siguiente se muestra cómo asegurarse de que un cálculo costoso se realiza solo una vez:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
  # expensive computation...
  return 1

expensive_value = compute_value()
def handler_func(batch_iter):
  for batch in batch_iter:
    yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL

Credenciales de servicio en las UDF de Python por lotes de Unity Catalog

Las UDF de Python del catálogo de Unity de Batch pueden usar las credenciales del servicio de catálogo de Unity para acceder a servicios en la nube externos. Esto es especialmente útil para integrar funciones en la nube como tokenizadores de seguridad en flujos de trabajo de procesamiento de datos.

Para crear una credencial de servicio, consulte Creación de credenciales de servicio.

Especifique la credencial de servicio que desea usar en la CREDENTIALS cláusula en la definición de UDF:

CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
  `credential-name` DEFAULT,
  `complicated-credential-name` AS short_name,
  `simple-cred`,
  cred_no_quotes
)
AS $$
# Python code here
$$;

Permisos de credenciales de servicio

El creador de UDF debe tener permiso deACCESS en la credencial del servicio de Unity Catalog. Sin embargo, para los usuarios de llamadas UDF es suficiente concederles permiso de EXECUTE en la UDF. En concreto, los usuarios de llamadas UDF no necesitan acceder a la credencial de servicio subyacente, ya que la UDF se ejecuta utilizando los permisos de credenciales del creador de la UDF.

En el caso de las funciones temporales, el creador siempre es el invocador. Las UDF que se ejecutan en ámbito No-PE, también conocidas como clústeres dedicados, usan en su lugar los permisos del autor de la llamada.

Credenciales y alias predeterminados

Puede incluir varias credenciales en la cláusula CREDENTIALS, pero solo una se puede marcar como DEFAULT. Puede usar alias para las credenciales no predeterminadas mediante la palabra clave AS. Cada credencial debe tener un alias único.

Los SDK de nube revisados seleccionan automáticamente las credenciales predeterminadas. La credencial predeterminada tiene prioridad sobre cualquier valor predeterminado especificado en la configuración de Spark del proceso y persiste en la definición de UDF del catálogo de Unity.

Debe instalar el azure-identity paquete para usar el DefaultAzureCredential proveedor. Use la ENVIRONMENT cláusula para instalar bibliotecas externas. Para obtener más información sobre la instalación de bibliotecas externas, consulte Extensión de las UDF mediante dependencias personalizadas.

%sql
CREATE OR REPLACE TEMPORARY FUNCTION call_lambda_func(data STRING) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
  dependencies = '["azure-identity", "azure-mgmt-web"]', environment_version = 'None'
)
AS $$
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient

def batchhandler(it):
  # DefaultAzureCredential is automatically using 'batch-udf-service-creds-example-cred'
  web_client = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)

  for vals in data:
    yield vals
$$

Ejemplo de credenciales de servicio: función de AWS Lambda

En el ejemplo siguiente se usa una credencial de servicio para llamar a una función de AWS Lambda desde una UDF de Python del catálogo de Batch Unity:

%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # Automatically picks up DEFAULT credential:
  session = boto3.Session()

  client = session.client("lambda", region_name="us-west-2")

  # Propagate TaskContext information to lambda context:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

    res = client.invoke(
      FunctionName="HashValuesFunction",
      InvocationType="RequestResponse",
      ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
        "utf-8"
      ),
      Payload=payload,
    )

    response_payload = json.loads(res["Payload"].read().decode("utf-8"))
    if "errorMessage" in response_payload:
      raise Exception(str(response_payload))

    yield pd.Series(response_payload["values"])
$$;

Llame a la UDF después de registrarla:

SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

Obtención del contexto de ejecución de tareas

Use taskContext PySpark API para obtener información de contexto, como la identidad del usuario, las etiquetas de clúster, el identificador de trabajo de Spark, etc. Consulte Obtener contexto de tarea en una UDF.

Limitaciones

  • Las funciones de Python deben controlar NULL los valores de forma independiente y todas las asignaciones de tipos deben seguir las asignaciones de lenguaje SQL de Azure Databricks.
  • Las UDF de Python del catálogo de Batch unity se ejecutan en un entorno seguro y aislado y no tienen acceso a un sistema de archivos compartido ni a servicios internos.
  • Se serializan varias invocaciones de UDF dentro de una fase y los resultados intermedios se materializan y pueden desbordarse en el disco.
  • Las credenciales de servicio solo están disponibles en las UDF de Python por lotes de Unity Catalog. No se admiten en los UDF estándar de Python del Catálogo de Unity ni los UDF de PySpark.
  • En clústeres dedicados y para funciones temporales, el autor de llamada de función debe tener permisos de ACCESS en las credenciales de servicio. Consulte Concesión de permisos para usar una credencial de servicio para acceder a un servicio en la nube externo.
  • Habilite la característica Versión preliminar pública Habilitar redes para UDF en SQL Warehouse sin servidor para realizar llamadas UDF de Python del catálogo de Batch Unity a servicios externos en el proceso de SQL Warehouse sin servidor.
  • Para ejecutar llamadas UDF de Python en el Batch Unity Catalog en un cuaderno o cómputo de tareas sin servidor, debe configurar el control de salida sin servidor.