Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Verwenden Sie die TaskContext PySpark-API, um Kontextinformationen abzurufen, während Sie einen Batch Unity Catalog, Python UDF oder PySpark UDF ausführen.
Beispielsweise können Kontextinformationen wie die Identität des Benutzers und Cluster-Tags die Identität eines Benutzers für den Zugriff auf externe Dienste überprüfen.
Anforderungen
TaskContext wird in Databricks Runtime-Versionen 16.3 und höher unterstützt.
TaskContext wird für die folgenden UDF-Typen unterstützt:
Verwenden von TaskContext zum Abrufen von Kontextinformationen
Wählen Sie eine Registerkarte aus, um TaskContext-Beispiele für PySpark-UDFs oder Python-UDFs des Batch-Unity-Katalogs anzuzeigen.
PySpark UDF
Im folgenden PySpark-UDF-Beispiel wird der Kontext des Benutzers gedruckt:
@udf
def log_context():
import json
from pyspark.taskcontext import TaskContext
tc = TaskContext.get()
# Returns current user executing the UDF
session_user = tc.getLocalProperty("user")
# Returns cluster tags
tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags ") or "[]"))
# Returns current version details
current_version = {
"dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
"dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
}
return {
"user": session_user,
"job_group_id": job_group_id,
"tags": tags,
"current_version": current_version
}
Batch-UDF Unity Catalog/Python
Das folgende Batch Unity Catalog-Python-UDF-Beispiel ruft die Identität des Benutzers ab, um eine AWS Lambda-Funktion mithilfe von Service-Anmeldeinformationen aufzurufen:
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Can propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
Rufen Sie das UDF nach der Registrierung auf:
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
TaskContext-Eigenschaften
Die TaskContext.getLocalProperty() Methode verfügt über die folgenden Eigenschaftsschlüssel:
| Eigenschaftsschlüssel | Beschreibung | Beispielverwendung |
|---|---|---|
user |
Der Benutzer, der derzeit die UDF ausführt | tc.getLocalProperty("user")-> "alice" |
spark.jobGroup.id |
Die Spark-Auftragsgruppen-ID, die der aktuellen UDF zugeordnet ist | tc.getLocalProperty("spark.jobGroup.id")-> "jobGroup-92318" |
spark.databricks.clusterUsageTags.clusterAllTags |
Cluster-Metadaten-Tags als Schlüssel-Wert-Paare, die als Zeichenfolgendarstellung eines JSON-Wörterbuchs formatiert sind | tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")-> [{"Department": "Finance"}] |
spark.databricks.clusterUsageTags.region |
Die Region, in der sich der Arbeitsbereich befindet | tc.getLocalProperty("spark.databricks.clusterUsageTags.region")-> "us-west-2" |
accountId |
Databricks-Konto-ID für den ausgeführten Kontext | tc.getLocalProperty("accountId")-> "1234567890123456" |
orgId |
Arbeitsbereichs-ID (in DBSQL nicht verfügbar) | tc.getLocalProperty("orgId")-> "987654321" |
spark.databricks.clusterUsageTags.sparkVersion |
Databricks Runtime-Version für den Cluster (in Nicht-DBSQL-Umgebungen) | tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")-> "16.3" |
spark.databricks.clusterUsageTags.dbsqlVersion |
DBSQL-Version (in DBSQL-Umgebungen) | tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")-> "2024.35" |