Freigeben über


Benutzerdefinierte Batch-Python-Funktionen (UDFs) im Unity-Katalog

Von großer Bedeutung

Dieses Feature befindet sich in der Public Preview.

Batch Unity Catalog Python UDFs erweitern die Funktionen von Unity Catalog UDFs, indem Sie Python-Code schreiben können, um auf Batches von Daten zu arbeiten, wodurch die Effizienz erheblich verbessert wird, indem der Mit Zeilen-nach-Zeilen-UDFs verbundene Aufwand reduziert wird. Diese Optimierungen machen Unity Catalog Batch Python UDFs ideal für die Verarbeitung großer Datenmengen.

Anforderungen

Batch Unity Catalog Python UDFs erfordern Databricks Runtime-Versionen 16.3 und höher.

Erstellen eines Batch-Unity-Katalogs python UDF

Das Erstellen eines Batch-Unity-Katalog-Python-UDF ähnelt dem Erstellen eines regulären Unity-Katalog-UDF mit den folgenden Ergänzungen:

  • PARAMETER STYLE PANDAS: Gibt an, dass die UDF Daten in Batches mithilfe von Pandas Iteratoren verarbeitet.
  • HANDLER 'handler_function': Gibt die Handlerfunktion an, die aufgerufen wird, um die Batches zu verarbeiten.

Das folgende Beispiel zeigt, wie Sie einen Batch Unity Catalog Python UDF erstellen:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

Nach der Registrierung der UDF können Sie sie mit SQL oder Python aufrufen:

SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
  SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
  SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);

Batch-UDF-Handlerfunktion

Batch Unity Catalog Python UDFs erfordern eine Handlerfunktion, die Batches verarbeitet und Ergebnisse liefert. Sie müssen den Namen der Handlerfunktion angeben, wenn Sie die UDF mithilfe des HANDLER Schlüssels erstellen.

Die Handlerfunktion führt folgende Aktionen aus:

  1. Akzeptiert ein Iterator-Argument, das über eines oder mehrere pandas.Series iteriert. Jede pandas.Series enthält die Eingabeparameter der UDF.
  2. Iteriert über den Generator und verarbeitet die Daten.
  3. Gibt einen Generator-Iterator zurück.

Batch Unity Catalog Python UDFs müssen dieselbe Anzahl von Zeilen wie die Eingabe zurückgeben. Die Handlerfunktion stellt dies sicher, indem für jeden Batch eine pandas.Series mit der gleichen Länge wie die Eingabereihen erzielt wird.

Installieren benutzerdefinierter Abhängigkeiten

Sie können die Funktionalität von Batch Unity Catalog Python UDFs über die Databricks-Runtime-Umgebung hinaus erweitern, indem Sie benutzerdefinierte Abhängigkeiten für externe Bibliotheken definieren.

Siehe Erweitern von UDFs mithilfe von benutzerdefinierten Abhängigkeiten.

Batch-UDFs können einzelne oder mehrere Parameter akzeptieren

Einzelner Parameter: Wenn die Handlerfunktion einen einzelnen Eingabeparameter verwendet, empfängt sie einen Iterator, der einen pandas.Series für jeden Batch durchläuft.

%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  for value_batch in batch_iter:
    d = {"min": value_batch.min(), "max": value_batch.max()}
    yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;

Mehrere Parameter: Bei mehreren Eingabeparametern empfängt die Handlerfunktion einen Iterator, der mehrere pandas.Seriesdurchläuft. Die Werte in der Datenreihe sind in der gleichen Reihenfolge wie die Eingabeparameter.

%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for p1, p2 in batch_iter: # same order as arguments above
    yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);

Optimieren der Leistung durch Trennen von kostspieligen Vorgängen

Sie können rechenintensive Vorgänge optimieren, indem Sie diese Vorgänge von der Handlerfunktion trennen. Dadurch wird sichergestellt, dass sie nur einmal und nicht während jeder Iteration über Datenbatches ausgeführt werden.

Das folgende Beispiel zeigt, wie Sie sicherstellen können, dass eine teure Berechnung nur einmal ausgeführt wird:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
  # expensive computation...
  return 1

expensive_value = compute_value()
def handler_func(batch_iter):
  for batch in batch_iter:
    yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL

Umgebungsisolation

Hinweis

Freigegebene Isolationsumgebungen erfordern Databricks Runtime 17.1 und höher. In früheren Versionen werden alle Batch Unity Catalog Python UDFs im strikten Isolationsmodus ausgeführt.

Batch Unity Catalog Python UDFs mit demselben Besitzer und derselben Sitzung können standardmäßig eine Isolationsumgebung gemeinsam nutzen. Dies kann die Leistung verbessern und die Speicherauslastung verringern, indem die Anzahl separater Umgebungen reduziert wird, die gestartet werden müssen.

Strenge Isolierung

Um sicherzustellen, dass eine UDF immer in einer eigenen, vollständig isolierten Umgebung ausgeführt wird, fügen Sie die STRICT ISOLATION Merkmalsklausel hinzu.

Die meisten UDFs benötigen keine strenge Isolierung. Standarddatenverarbeitungs-UDFs profitieren von der standardmäßigen gemeinsamen Isolationsumgebung und können schneller bei geringerer Speichernutzung ausgeführt werden.

Fügen Sie die STRICT ISOLATION Merkmalsklausel zu UDFs hinzu, die:

  • Ausführen von Eingaben als Code mithilfe von eval(), exec()oder ähnlichen Funktionen
  • Schreiben von Dateien in das lokale Dateisystem
  • Ändern globaler Variablen oder Systemstatus
  • Ändern von Umgebungsvariablen

Das folgende Beispiel zeigt eine UDF, die Eingaben als Code ausführt und eine strenge Isolierung erfordert:

CREATE OR REPLACE TEMPORARY FUNCTION eval_string(input STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
STRICT ISOLATION
AS $$
import pandas as pd
from typing import Iterator

def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  for code_series in batch_iter:
    def eval_func(code):
      try:
        return str(eval(code))
      except Exception as e:
        return f"Error: {e}"
    yield code_series.apply(eval_func)
$$;

Dienstanmeldeinformationen in Batch Unity Catalog Python UDFs

Batch Unity Catalog Python UDFs können Unity Catalog Service Credentials verwenden, um auf externe Clouddienste zuzugreifen. Dies ist besonders hilfreich für die Integration von Cloudfunktionen wie Sicherheitstokenizern in Datenverarbeitungsworkflows.

Hinweis

UDF-spezifische API für Dienstanmeldeinformationen:
Verwenden Sie databricks.service_credentials.getServiceCredentialsProvider() in UDFs den Zugriff auf Dienstanmeldeinformationen.

Dies unterscheidet sich von der dbutils.credentials.getServiceCredentialsProvider() in Notizbüchern verwendeten Funktion, die in UDF-Ausführungskontexten nicht verfügbar ist.

Informationen zum Erstellen einer Dienstanmeldeinformationen finden Sie unter Erstellen von Dienstanmeldeinformationen.

Geben Sie die Dienstanmeldeinformationen an, die Sie in der CREDENTIALS-Klausel der UDF-Definition verwenden möchten.

CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
  `credential-name` DEFAULT,
  `complicated-credential-name` AS short_name,
  `simple-cred`,
  cred_no_quotes
)
AS $$
# Python code here
$$;

Berechtigungen für Dienstanmeldeinformationen

Die UDF-erstellende Person muss über die ACCESS-Berechtigung für die Unity Catalog-Dienstanmeldeinformationen verfügen. Für UDF-aufrufende Personen reicht es jedoch aus, ihnen die EXECUTE-Berechtigung für das UDF zu erteilen. Insbesondere benötigen UDF-Aufrufer keinen Zugriff auf die zugrunde liegenden Dienstanmeldedaten, da die UDF mithilfe der Berechtigungen des UDF-Erstellers ausgeführt wird.

Bei temporären Funktionen ist der Ersteller immer der Aufrufer. UDFs, die in No-PE Bereich ausgeführt werden, auch als dedizierte Cluster bezeichnet, verwenden stattdessen die Berechtigungen des Aufrufers.

Standardanmeldeinformationen und Aliase

Sie können mehrere Anmeldeinformationen in die CREDENTIALS Klausel einschließen, aber nur eine kann als DEFAULTgekennzeichnet werden. Sie können nicht standardmäßige Anmeldeinformationen mithilfe des AS Schlüsselworts aliasen. Jede Anmeldeinformation muss über einen eindeutigen Alias verfügen.

Gepatchte Cloud-SDKs greifen automatisch Standardanmeldeinformationen auf. Die Standardanmeldeinformationen haben Vorrang vor jedem in der Spark-Konfiguration des Computes angegebenen Standard und bleiben in der UDF-Definition des Unity Catalogs erhalten.

Sie müssen das azure-identity Paket installieren, um den DefaultAzureCredential Anbieter zu verwenden. Verwenden Sie die ENVIRONMENT Klausel, um externe Bibliotheken zu installieren. Weitere Informationen zum Installieren externer Bibliotheken finden Sie unter Erweitern von UDFs mithilfe von benutzerdefinierten Abhängigkeiten.

Beispiel für Dienstzugangsdaten – Azure Blob Storage

Im folgenden Beispiel wird eine Dienstanmeldeinformation für den Zugriff auf Azure Blob Storage aus einem Batch Unity Catalog Python UDF verwendet:

%sql
CREATE OR REPLACE FUNCTION main.test.read_azure_blob(blob_name STRING) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
  dependencies = '["azure-identity", "azure-storage-blob"]', environment_version = 'None'
)
AS $$
import pandas as pd
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient


def batchhandler(it):
  # DefaultAzureCredential automatically uses the DEFAULT service credential
  credential = DefaultAzureCredential()
  blob_service = BlobServiceClient(
    account_url="https://your-storage-account.blob.core.windows.net",
    credential=credential
  )
  container = blob_service.get_container_client("your-container")

  for blob_names in it:
    results = []
    for name in blob_names:
      blob_client = container.get_blob_client(name)
      try:
        content = blob_client.download_blob().readall().decode("utf-8")
        results.append(content)
      except Exception as e:
        results.append(f"Error: {e}")
    yield pd.Series(results)
$$;

Rufen Sie das UDF nach der Registrierung auf:

SELECT main.test.read_azure_blob(blob_name)
FROM VALUES
('config/settings.json'),
('data/input.txt')
AS t(blob_name)

Aufgabenausführungskontext abrufen

Verwenden Sie die TaskContext PySpark-API, um Kontextinformationen wie Die Identität des Benutzers, Clustertags, Sparkauftrags-ID und vieles mehr abzurufen. Weitere Informationen finden Sie unter Abrufen des Aufgabenkontexts in einer UDF.

Festlegen DETERMINISTIC , ob Ihre Funktion konsistente Ergebnisse erzeugt

Fügen Sie DETERMINISTIC ihrer Funktionsdefinition hinzu, wenn sie dieselben Ausgaben für die gleichen Eingaben erzeugt. Dadurch können Abfrageoptimierungen die Leistung verbessern.

Standardmäßig wird davon ausgegangen, dass Batch Unity Catalog Python UDTFs nicht deterministioc ist, es sei denn, es wurde explizit deklariert. Beispiele für nicht deterministische Funktionen sind: Generieren von Zufallswerten, Zugreifen auf aktuelle Uhrzeiten oder Datumsangaben oder Durchführen externer API-Aufrufe.

Siehe CREATE FUNCTION (SQL und Python)

Einschränkungen

  • Python-Funktionen müssen Werte unabhängig verarbeiten NULL , und alle Typzuordnungen müssen Azure Databricks SQL-Sprachzuordnungen folgen.
  • Batch Unity Catalog Python UDFs werden in einer sicheren, isolierten Umgebung ausgeführt und haben keinen Zugriff auf ein freigegebenes Dateisystem oder interne Dienste.
  • Mehrere UDF-Aufrufe innerhalb einer Phase werden serialisiert, und Zwischenergebnisse werden materialisiert und können auf den Datenträger überlaufen.
  • Dienstanmeldeinformationen sind nur in Batch Unity Catalog Python UDFs und Scalar Python UDFs verfügbar. Sie werden in den Standard-Python-UDFs des Unity-Katalogs nicht unterstützt.
  • Auf dedizierten Clustern und für temporäre Funktionen muss die funktionsaufrufende Person über ACCESS-Berechtigungen für die Dienstanmeldeinformationen verfügen. Siehe Erteilen von Berechtigungen zum Verwenden von Dienstanmeldeinformationen für den Zugriff auf einen externen Clouddienst.
  • Aktivieren Sie das Feature "Public Preview " Aktivieren Sie das Netzwerk für UDFs in Serverless SQL Warehouses auf der Vorschauseite Ihres Arbeitsbereichs , um Batch Unity Catalog Python UDF-Aufrufe an externe Dienste auf serverlosen SQL-Warehouse-Compute durchzuführen.