Sdílet prostřednictvím


Funkce definované uživatelem (UDFs) v Batch Python v katalogu Unity

Důležité

Tato funkce je ve verzi Public Preview.

Pythonové UDFy v katalogu Batch Unity rozšiřují možnosti funkcí katalogu Unity tím, že umožňují psát Python kód pro operace na dávkách dat, což výrazně zvyšuje efektivitu snížením režijních nákladů spojených s řádkovými UDFy. Díky těmto optimalizacům jsou definované uživatelem Pythonu v dávce katalogu Unity ideální pro zpracování velkých objemů dat.

Požadavky

Uživatelem definované funkce Pythonu v Batch Unity Catalog vyžadují Databricks Runtime verze 16.3 a vyšší.

Vytvořit uživatelem definovanou funkci v katalogu Batch Unity v Pythonu

Vytvoření uživatelsky definované funkce Pythonu v katalogu Unity Batch se podobá vytvoření běžné uživatelsky definované funkce v katalogu Unity s následujícími doplňky:

  • PARAMETER STYLE PANDAS: Určuje, že UDF zpracovává data v dávkách pomocí iterátorů pandas.
  • HANDLER 'handler_function': Tímto je určena funkce obslužné rutiny, která je volána pro zpracování dávek.

Následující příklad ukazuje, jak vytvořit uživatelem definovanou funkci v Pythonu v Batch Unity Catalog:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

Po registraci UDF ji můžete volat použitím SQL nebo Pythonu.

SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
  SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
  SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);

Funkce obslužné rutiny pro Batch UDF

Funkce definovaná uživatelem Pythonu (UDF) v katalogu Batch Unity vyžaduje funkci obslužné rutiny, která zpracovává dávky a vytváří výsledky. Je nutné zadat název funkce obslužné rutiny při vytváření uživatelsky definované funkce pomocí HANDLER prostředku.

Funkce obslužné rutiny provede následující:

  1. Přijímá argument iterátoru, který iteruje přes jeden nebo více pandas.Series. Každý pandas.Series obsahuje vstupní parametry uživatelem definované funkce.
  2. Iteruje přes generátor a zpracovává data.
  3. Vrátí iterátor generátoru.

Uživatelem definované uživatelem Pythonu v katalogu Batch Unity musí být vrácen stejný počet řádků jako vstup. Obslužná funkce to zajišťuje tím, že pro každou dávku předává pandas.Series se stejnou délkou jako vstupní řada.

Instalace vlastních závislostí

Python UDFs v Batch Unity Catalog můžete rozšířit i mimo prostředí Databricks Runtime tím, že definujete specifické závislosti pro externí knihovny.

Viz Rozšíření UDF pomocí vlastních závislostí.

Funkce UDFs Batch mohou přijímat jeden nebo více parametrů.

Jeden parametr: Když funkce obslužné rutiny používá jeden vstupní parametr, obdrží iterátor, který iteruje přes pandas.Series pro každou dávku.

%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  for value_batch in batch_iter:
    d = {"min": value_batch.min(), "max": value_batch.max()}
    yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;

Více parametrů: Pro více vstupních parametrů obdrží funkce obslužné rutiny iterátor, který iteruje více pandas.Series. Hodnoty v řadě jsou ve stejném pořadí jako vstupní parametry.

%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for p1, p2 in batch_iter: # same order as arguments above
    yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);

Optimalizace výkonu oddělením drahých operací

Výpočetně nákladné operace můžete optimalizovat oddělením těchto operací od funkce obslužné rutiny. Tím se zajistí, že se budou spouštět pouze jednou, a ne během každé iterace v dávkách dat.

Následující příklad ukazuje, jak zajistit, aby se nákladné výpočty prováděly pouze jednou:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
  # expensive computation...
  return 1

expensive_value = compute_value()
def handler_func(batch_iter):
  for batch in batch_iter:
    yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL

Hranice izolace a zabezpečení

Poznámka:

Sdílená prostředí izolace vyžadují Databricks Runtime 17.1 a vyšší. Ve starších verzích se všechny uživatelem definované funkce Python (UDF) v katalogu Batch Unity spouštěly v přísném režimu izolace.

Uživatelsky definované funkce Pythonu v Batch Unity Catalogu se stejným vlastníkem můžou ve výchozím nastavení sdílet izolační prostředí. To může zvýšit výkon a snížit využití paměti snížením počtu samostatných prostředí, která je potřeba spustit.

Striktní izolace

Pokud chcete zajistit, aby se funkce definovaná uživatelem vždy spouštěla ve vlastním, plně izolovaném prostředí, přidejte tu charakteristickou klauzuli STRICT ISOLATION.

Většina uživatelsky definovaných funkcí nepotřebuje striktní izolaci. Funkce UDF pro standardní zpracování dat využívají výchozí sdílené izolační prostředí a běží rychleji s nižší spotřebou paměti.

Přidejte charakteristickou klauzuli STRICT ISOLATION do funkcí definovaných uživatelem, které:

  • Spusťte vstup jako kód pomocí , eval(), exec() nebo podobných funkcí.
  • Zápis souborů do místního systému souborů
  • Úprava globálních proměnných nebo stavu systému
  • Přístup k proměnným prostředí nebo jejich úprava

Následující příklad ukazuje UDF, který spouští vstup jako kód a vyžaduje striktní izolaci:

CREATE OR REPLACE TEMPORARY FUNCTION eval_string(input STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
STRICT ISOLATION
AS $$
import pandas as pd
from typing import Iterator

def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  for code_series in batch_iter:
    def eval_func(code):
      try:
        return str(eval(code))
      except Exception as e:
        return f"Error: {e}"
    yield code_series.apply(eval_func)
$$;

Přihlašovací údaje služby ve službě Batch Unity Catalog Python UDF

UDF Pythonu v katalogu Unity Batch mohou používat přihlašovací údaje služby katalogu Unity pro přístup k externím cloudovým službám. To je zvlášť užitečné pro integraci cloudových funkcí, jako jsou tokenizátory zabezpečení do pracovních postupů zpracování dat.

Poznámka:

API specifické pro uživatelsky definované funkce pro přihlašovací údaje služby:
V UDF použijte databricks.service_credentials.getServiceCredentialsProvider() pro přístup k přihlašovacím údajům služby.

To se liší od dbutils.credentials.getServiceCredentialsProvider() funkce používané v poznámkových blocích, která není k dispozici v kontextech spouštění UDF.

Pokud chcete vytvořit přihlašovací údaje služby, přečtěte si téma Vytvoření přihlašovacích údajů služby.

Zadejte přihlašovací údaje služby, které chcete použít v klauzuli v CREDENTIALS definici UDF:

CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
  `credential-name` DEFAULT,
  `complicated-credential-name` AS short_name,
  `simple-cred`,
  cred_no_quotes
)
AS $$
# Python code here
$$;

Oprávnění k přihlašovacím údajům služby

Tvůrce UDF musí mít ACCESS oprávnění služby Unity Catalog. Pro volající UDF je však dostačující udělit jim k této UDF oprávnění. Zejména volající UDF nepotřebují přístup k přihlašovacím údajům základní služby, protože UDF se provádí pomocí oprávnění k přihlašovacím údajům tvůrce UDF.

U dočasných funkcí je tvůrce vždy vyvolávač. Uživatelsky definované funkce, které běží v oboru No-PE, známé také jako vyhrazené clustery, používají místo toho oprávnění volajícího.

Výchozí přihlašovací údaje a aliasy

Do klauzule můžete zahrnout více přihlašovacích údajů CREDENTIALS , ale jeden lze označit jako DEFAULT. Pomocí klíčového AS slova můžete aliasovat jiné než výchozí přihlašovací údaje. Každé pověření musí mít jedinečný alias.

Opravené cloudové sady SDK automaticky vyberou výchozí přihlašovací údaje. Výchozí přihlašovací údaje mají přednost před všemi výchozími nastaveními zadanými v konfiguraci Sparku výpočetních prostředků a uchovávají se v definici UDF katalogu Unity.

Chcete-li použít poskytovatele azure-identity, musíte nainstalovat balíček DefaultAzureCredential. ENVIRONMENT Pomocí klauzule nainstalujte externí knihovny. Další informace o instalaci externích knihoven najdete v tématu Rozšíření UDF pomocí vlastních závislostí.

Příklad přihlašovacích údajů služby – Azure Functions

Následující příklad používá přihlašovací údaje služby k volání funkce Azure Functions z katalogu Batch Unity Python UDF:

%sql
CREATE OR REPLACE FUNCTION main.test.call_azure_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
  dependencies = '["azure-identity", "requests"]', environment_version = 'None'
)
AS $$
import json
import pandas as pd
import requests
from azure.identity import DefaultAzureCredential
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # DefaultAzureCredential automatically uses the DEFAULT credential
  credential = DefaultAzureCredential()
  token = credential.get_token("https://management.azure.com/.default")

  # Azure Function URL
  function_url = "https://your-function-app.azurewebsites.net/api/HashValuesFunction"

  # Propagate TaskContext information:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = {
      "values": vals.to_list(),
      "is_debug": bool(is_debug[0]),
      "context": user_ctx
    }

    headers = {
      "Authorization": f"Bearer {token.token}",
      "Content-Type": "application/json"
    }

    response = requests.post(function_url, json=payload, headers=headers)

    if response.status_code != 200:
      raise Exception(f"Function call failed: {response.text}")

    response_data = response.json()
    if "errorMessage" in response_data:
      raise Exception(str(response_data))

    yield pd.Series(response_data["values"])
$$;

Zavolejte uživatelem definovanou funkci po její registraci.

SELECT main.test.call_azure_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

Získejte kontext provádění úkolu

Použijte PySpark API TaskContext k získání informací o kontextu, jako je identita uživatele, značky clusteru, ID úlohy Spark a další. Viz Získání kontextu úkolu v UDF.

Nastavení DETERMINISTIC , pokud vaše funkce vytváří konzistentní výsledky

Přidejte DETERMINISTIC do definice funkce, pokud vytváří stejné výstupy pro stejné vstupy. To umožňuje optimalizaci dotazů zlepšit výkon.

Ve výchozím nastavení se funkce UDTFs katalogu Batch Unity předpokládá, že nejsou deterministioc, pokud nejsou explicitně deklarovány. Mezi příklady ne deterministických funkcí patří generování náhodných hodnot, přístup k aktuálním časům nebo kalendářním datům nebo volání externího rozhraní API.

Viz CREATE FUNCTION (SQL a Python)

Omezení

  • Funkce Pythonu musí zpracovávat NULL hodnoty nezávisle a všechna mapování typů musí následovat po mapování jazyka SQL v Azure Databricks.
  • Uživatelsky definované funkce Pythonu v katalogu Batch Unity jsou spuštěny v zabezpečeném, izolovaném prostředí a nemají přístup ke sdílenému systému souborů ani k interním službám.
  • Více volání uživatelem definované funkce v rámci fáze jsou serializována, dočasné výsledky jsou materializovány a mohou být přelity na disk.
  • Přihlašovací údaje služby jsou k dispozici pouze v uživatelsky definovaných funkcích Pythonu v katalogu Batch Unity a skalárních uživatelsky definovaných funkcích Pythonu. Nejsou podporovány ve standardních Python UDFs katalogu Unity.
  • Ve vyhrazených clusterech a u dočasných funkcí musí volající funkce mít ACCESS oprávnění k přihlašovacím údajům služby. Viz Udělení oprávnění pro přístup k externí cloudové službě pomocí přihlašovacích údajů služby.
  • Povolte funkci Public Preview Povolte sítě pro uživatelem definované uživatelem ve službě SQL Warehouse bezserverové služby SQL Warehouse na stránce Preview vašeho pracovního prostoru a povolte volání uživatelem definované uživatelem Pythonu ve službě Batch Unity Catalog do externích služeb na bezserverových výpočetních prostředcích SQL Warehouse.