Anmerkung
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen, dich anzumelden oder die Verzeichnisse zu wechseln.
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen , die Verzeichnisse zu wechseln.
Verwenden Sie die TaskContext PySpark-API, um Kontextinformationen abzurufen, während Sie einen Batch Unity Catalog, Python UDF oder PySpark UDF ausführen.
Beispielsweise können Kontextinformationen wie die Identität des Benutzers und Cluster-Tags die Identität eines Benutzers für den Zugriff auf externe Dienste überprüfen.
Anforderungen
TaskContext wird in Databricks Runtime-Versionen 16.3 und höher unterstützt.
TaskContext wird für die folgenden UDF-Typen unterstützt:
Verwenden von TaskContext zum Abrufen von Kontextinformationen
Wählen Sie eine Registerkarte aus, um TaskContext-Beispiele für PySpark-UDFs oder Python-UDFs des Batch-Unity-Katalogs anzuzeigen.
PySpark UDF
Im folgenden PySpark-UDF-Beispiel wird der Kontext des Benutzers gedruckt:
@udf
def log_context():
import json
from pyspark.taskcontext import TaskContext
tc = TaskContext.get()
# Returns current user executing the UDF
session_user = tc.getLocalProperty("user")
# Returns cluster tags
tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags ") or "[]"))
# Returns current version details
current_version = {
"dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
"dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
}
return {
"user": session_user,
"job_group_id": job_group_id,
"tags": tags,
"current_version": current_version
}
Batch-UDF Unity Catalog/Python
Das folgende Batch Unity Catalog-Python-UDF-Beispiel ruft die Identität des Benutzers ab, um eine AWS Lambda-Funktion mithilfe von Service-Anmeldeinformationen aufzurufen:
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Can propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
Rufen Sie das UDF nach der Registrierung auf:
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
TaskContext-Eigenschaften
Die TaskContext.getLocalProperty() Methode verfügt über die folgenden Eigenschaftsschlüssel:
| Eigenschaftsschlüssel | Beschreibung | Beispielverwendung |
|---|---|---|
user |
Der Benutzer, der derzeit die UDF ausführt | tc.getLocalProperty("user")-> "alice" |
spark.jobGroup.id |
Die Spark-Auftragsgruppen-ID, die der aktuellen UDF zugeordnet ist | tc.getLocalProperty("spark.jobGroup.id")-> "jobGroup-92318" |
spark.databricks.clusterUsageTags.clusterAllTags |
Cluster-Metadaten-Tags als Schlüssel-Wert-Paare, die als Zeichenfolgendarstellung eines JSON-Wörterbuchs formatiert sind | tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")-> [{"Department": "Finance"}] |
spark.databricks.clusterUsageTags.region |
Die Region, in der sich der Arbeitsbereich befindet | tc.getLocalProperty("spark.databricks.clusterUsageTags.region")-> "us-west-2" |
accountId |
Databricks-Konto-ID für den ausgeführten Kontext | tc.getLocalProperty("accountId")-> "1234567890123456" |
orgId |
Arbeitsbereichs-ID (in DBSQL nicht verfügbar) | tc.getLocalProperty("orgId")-> "987654321" |
spark.databricks.clusterUsageTags.sparkVersion |
Databricks Runtime-Version für den Cluster (in Nicht-DBSQL-Umgebungen) | tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")-> "16.3" |
spark.databricks.clusterUsageTags.dbsqlVersion |
DBSQL-Version (in DBSQL-Umgebungen) | tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")-> "2024.35" |