Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Important
Cette fonctionnalité est disponible en préversion publique.
Les fonctions Python de traitement par lots définies par l’utilisateur (UDF) dans Unity Catalog étendent les fonctionnalités des fonctions définies par l’utilisateur de Unity Catalog en vous permettant d’écrire du code Python pour fonctionner sur des lots de données, ce qui améliore considérablement l’efficacité en réduisant la surcharge associée aux fonctions définies par ligne par ligne. Ces optimisations rendent les fonctions définies par lots Python de Unity Catalog idéales pour le traitement des données à grande échelle.
Spécifications
Les UDF Python du Batch Unity Catalog requièrent Databricks Runtime versions 16.3 et ultérieures.
Créer un UDF Python pour le Batch Unity Catalog
La création d’une fonction UDF Python de catalogue Batch Unity est similaire à la création d’une fonction UDF de catalogue Unity standard, avec les ajouts suivants :
-
PARAMETER STYLE PANDAS: spécifie que l’UDF traite les données par lots à l’aide d’itérateurs Pandas. -
HANDLER 'handler_function': spécifie la fonction de gestionnaire appelée pour traiter les lots.
L’exemple suivant montre comment créer une fonction UDF Python du catalogue Batch Unity :
%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
Après avoir inscrit la fonction UDF, vous pouvez l’appeler à l’aide de SQL ou Python :
SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);
Fonction de gestionnaire UDF de traitement par lots
Les fonctions Python de traitement par lots définies par l’utilisateur (UDF) dans Unity Catalog nécessitent une fonction de gestionnaire qui traite les lots et génère des résultats. Vous devez spécifier le nom de la fonction de gestionnaire lorsque vous créez la fonction UDF à l’aide de la HANDLER clé.
La fonction de gestionnaire effectue les opérations suivantes :
- Accepte un argument de type itérateur qui parcourt un ou plusieurs
pandas.Series. Chacunpandas.Seriescontient les paramètres d’entrée de la fonction UDF. - Itère sur le générateur et traite les données.
- Retourne un itérateur de générateur.
Les fonctions Python de traitement par lots définies par l’utilisateur dans Unity Catalog doivent retourner le même nombre de lignes que l’entrée. La fonction de gestionnaire garantit cela en générant un pandas.Series ayant une longueur identique à la série d’entrée pour chaque lot.
Installer des dépendances personnalisées
Vous pouvez étendre les fonctionnalités des UDF Python du catalogue Batch Unity au-delà de l’environnement Databricks Runtime en définissant des dépendances personnalisées pour les bibliothèques externes.
Consultez Étendre les fonctions définies par l’utilisateur à l’aide de dépendances personnalisées.
Les fonctions de traitement par lots définies par l’utilisateur lots peuvent accepter des paramètres uniques ou multiples
Paramètre unique : lorsque la fonction de gestionnaire utilise un paramètre d’entrée unique, elle reçoit un itérateur qui parcourt une pandas.Series pour chaque lot.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for value_batch in batch_iter:
d = {"min": value_batch.min(), "max": value_batch.max()}
yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;
Plusieurs paramètres : Pour plusieurs paramètres d’entrée, la fonction de gestionnaire reçoit un itérateur qui itère sur plusieurs pandas.Series. Les valeurs de la série sont dans le même ordre que les paramètres d’entrée.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for p1, p2 in batch_iter: # same order as arguments above
yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);
Optimiser les performances en séparant les opérations coûteuses
Vous pouvez optimiser les opérations coûteuses de calcul en séparant ces opérations de la fonction de gestionnaire. Cela garantit qu’ils ne sont exécutés qu’une seule fois plutôt que pendant chaque itération sur les lots de données.
L’exemple suivant montre comment s’assurer qu’un calcul coûteux n’est effectué qu’une seule fois :
%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
# expensive computation...
return 1
expensive_value = compute_value()
def handler_func(batch_iter):
for batch in batch_iter:
yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL
Isolation de l’environnement
Remarque
Les environnements d’isolation partagé nécessitent Databricks Runtime 17.1 et versions ultérieures. Dans les versions antérieures, tous les UDF Python du catalogue Batch Unity s’exécutent en mode d’isolation strict.
Les UDF Python de Batch Unity Catalog ayant le même propriétaire et la même session peuvent partager un environnement d’isolation par défaut. Cela peut améliorer les performances et réduire l’utilisation de la mémoire en réduisant le nombre d’environnements distincts qui doivent être lancés.
Isolation stricte
Pour garantir qu’une fonction UDF s’exécute toujours dans son propre environnement entièrement isolé, ajoutez la STRICT ISOLATION clause caractéristique.
La plupart des fonctions définies par l’utilisateur n’ont pas besoin d’isolation stricte. Les UDFs de traitement de données standard bénéficient de l’environnement d’isolation partagé par défaut et s’exécutent plus rapidement avec une consommation de mémoire inférieure.
Ajoutez la clause caractéristique suivante STRICT ISOLATION aux fonctions définies par l’utilisateur qui :
- Exécuter une entrée en tant que code à l’aide de
eval(),exec(), ou de fonctions similaires - Écrire des fichiers dans le système de fichiers local
- Modifier les variables globales ou l’état système
- Modifier des variables d’environnement
L’exemple suivant montre une fonction UDF qui exécute l’entrée en tant que code et nécessite une isolation stricte :
CREATE OR REPLACE TEMPORARY FUNCTION eval_string(input STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
STRICT ISOLATION
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for code_series in batch_iter:
def eval_func(code):
try:
return str(eval(code))
except Exception as e:
return f"Error: {e}"
yield code_series.apply(eval_func)
$$;
Informations d’identification du service dans les fonctions définies par l’utilisateur python du catalogue Batch Unity
Les UDF Python du catalogue Batch Unity peuvent utiliser les informations d’identification du service catalogue Unity pour accéder aux services cloud externes. Cela est particulièrement utile pour intégrer des fonctions cloud telles que des tokenizers de sécurité dans des workflows de traitement des données.
Remarque
API spécifique à la fonction UDF pour les informations d’identification du service :
Dans les FDUs, utilisez databricks.service_credentials.getServiceCredentialsProvider() pour accéder aux identifiants de service.
Cela diffère de la dbutils.credentials.getServiceCredentialsProvider() fonction utilisée dans les notebooks, qui n’est pas disponible dans les contextes d’exécution UDF.
Pour créer des informations d’identification de service, consultez Créer des informations d’identification de service.
Spécifiez les informations d’identification du service que vous souhaitez utiliser dans la CREDENTIALS clause dans la définition UDF :
CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
`credential-name` DEFAULT,
`complicated-credential-name` AS short_name,
`simple-cred`,
cred_no_quotes
)
AS $$
# Python code here
$$;
Autorisations des informations d’identification du service
Le créateur de l’UDF doit avoir l’autorisation ACCESS sur les informations d’identification du service Unity Catalog. Toutefois, pour les appelants UDF, il suffit de leur accorder l’autorisation EXECUTE sur l’UDF. En particulier, les appelants UDF n’ont pas besoin d’accéder aux identifiants du service, car l’UDF s’exécute en utilisant les autorisations du créateur de l’UDF.
Pour les fonctions temporaires, le créateur est toujours l’appelant. Les fonctions définies par l’utilisateur (UDF) qui s’exécutent dans le périmètre No-PE, aussi appelées clusters dédiés, utilisent plutôt les autorisations de l’appelant.
Informations d’identification et alias par défaut
Vous pouvez inclure plusieurs informations d’identification dans la CREDENTIALS clause, mais un seul peut être marqué comme DEFAULT. Vous pouvez aliasr des informations d’identification non par défaut à l’aide du AS mot clé. Chaque information d’identification doit avoir un alias unique.
Les kits SDK cloud corrigés récupèrent automatiquement les informations d’identification par défaut. Les informations d’identification par défaut sont prioritaires sur toutes les valeurs par défaut spécifiées dans la configuration Spark du calcul et persistent dans la définition UDF du catalogue Unity.
Vous devez installer le azure-identity package pour utiliser le DefaultAzureCredential fournisseur. Utilisez la ENVIRONMENT clause pour installer des bibliothèques externes. Pour en savoir plus sur l’installation de bibliothèques externes, consultez Étendre les fonctions définies par l’utilisateur à l’aide de dépendances personnalisées.
Exemple d’informations d’identification du service - Stockage Blob Azure
L’exemple suivant utilise des informations d’identification de service pour accéder au stockage Blob Azure à partir d’une fonction UDF Python du catalogue Batch Unity :
%sql
CREATE OR REPLACE FUNCTION main.test.read_azure_blob(blob_name STRING) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
dependencies = '["azure-identity", "azure-storage-blob"]', environment_version = 'None'
)
AS $$
import pandas as pd
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
def batchhandler(it):
# DefaultAzureCredential automatically uses the DEFAULT service credential
credential = DefaultAzureCredential()
blob_service = BlobServiceClient(
account_url="https://your-storage-account.blob.core.windows.net",
credential=credential
)
container = blob_service.get_container_client("your-container")
for blob_names in it:
results = []
for name in blob_names:
blob_client = container.get_blob_client(name)
try:
content = blob_client.download_blob().readall().decode("utf-8")
results.append(content)
except Exception as e:
results.append(f"Error: {e}")
yield pd.Series(results)
$$;
Appelez l’UDF après son enregistrement :
SELECT main.test.read_azure_blob(blob_name)
FROM VALUES
('config/settings.json'),
('data/input.txt')
AS t(blob_name)
Obtenir le contexte d’exécution des tâches
Utilisez l’API TaskContext PySpark pour obtenir des informations contextuelles telles que l’identité de l’utilisateur, les balises de cluster, l’ID de travail Spark et bien plus encore. Consultez Obtenir le contexte de tâche dans une UDF.
Définir DETERMINISTIC si votre fonction produit des résultats cohérents
Ajoutez DETERMINISTIC à votre définition de fonction s’il produit les mêmes sorties pour les mêmes entrées. Cela permet aux optimisations des requêtes d’améliorer les performances.
Par défaut, les fonctions définies par l’utilisateur Python du catalogue Unity de Batch sont supposées être non déterministes, sauf déclaration explicite. Parmi les exemples de fonctions non déterministes, citons la génération de valeurs aléatoires, l’accès aux heures ou dates actuelles ou l’exécution d’appels d’API externes.
Voir CREATE FUNCTION (SQL et Python)
Limites
- Les fonctions Python doivent gérer les
NULLvaleurs indépendamment, et tous les mappages de types doivent suivre les mappages de langage SQL Azure Databricks. - Les fonctions UDF Python du catalogue Batch Unity s’exécutent dans un environnement sécurisé et isolé, sans accès à un système de fichiers partagé ou à des services internes.
- Plusieurs appels UDF au sein d’une phase sont sérialisés et les résultats intermédiaires sont matérialisés et peuvent se répandre sur le disque.
- Les informations d’identification du service sont disponibles uniquement dans les UDF Python du catalogue Batch Unity et les UDF Python scalaires. Ils ne sont pas pris en charge dans les fonctions définies par l’utilisateur python du catalogue Unity standard.
- Sur les clusters dédiés et pour les fonctions temporaires, l’appelant de fonction doit disposer des autorisations
ACCESSsur les informations d’identification du service. Consultez Autoriser l'utilisation d'un justificatif de service pour accéder à un service cloud externe. - Activez la fonctionnalité De préversion publique Activer la mise en réseau pour les fonctions définies par l’utilisateur dans serverless SQL Warehouses dans la page Aperçus de votre espace de travail pour effectuer des appels UDF Python de catalogue Batch Unity à des services externes sur le calcul serverless SQL Warehouse.
- Pour effectuer des appels UDF Python du catalogue Batch Unity sur un notebook serverless ou un calcul de travail, vous devez configurer le contrôle de sortie serverless