Not
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Använd TaskContext PySpark-API:et för att hämta kontextinformation när du kör en Batch Unity-katalog, Python UDF eller PySpark UDF.
Kontextinformation som användarens identitet och klustertaggar kan till exempel verifiera en användares identitet för att få åtkomst till externa tjänster.
Kravspecifikation
TaskContext stöds i Databricks Runtime version 16.3 och senare.
TaskContext stöds på följande UDF-typer:
Använda TaskContext för att hämta kontextinformation
Välj en flik för att se TaskContext-exempel för PySpark UDF:er eller Batch Unity Catalog Python UDF:er .
PySpark UDF
I följande PySpark UDF-exempel skrivs användarens kontext ut:
@udf
def log_context():
import json
from pyspark.taskcontext import TaskContext
tc = TaskContext.get()
# Returns current user executing the UDF
session_user = tc.getLocalProperty("user")
# Returns cluster tags
tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags ") or "[]"))
# Returns current version details
current_version = {
"dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
"dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
}
return {
"user": session_user,
"job_group_id": job_group_id,
"tags": tags,
"current_version": current_version
}
Batch Unity Katalog Python UDF
Följande Batch Unity Catalog Python UDF-exempel hämtar användarens identitet för att anropa en AWS Lambda-funktion med hjälp av en tjänstautentiseringsuppgift:
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Can propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
Ring UDF efter att den har registrerats:
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
TaskContext-egenskaper
Metoden TaskContext.getLocalProperty() har följande egenskapsnycklar:
| Nyckel för egenskap | Beskrivning | Exempel på användning |
|---|---|---|
user |
Den användare som för närvarande kör UDF | tc.getLocalProperty("user")-> "alice" |
spark.jobGroup.id |
Det Spark-jobbgrupps-ID som är associerat med den aktuella UDF:en | tc.getLocalProperty("spark.jobGroup.id")-> "jobGroup-92318" |
spark.databricks.clusterUsageTags.clusterAllTags |
Klustermetadatataggar som nyckel/värde-par formaterade som en strängrepresentation av en JSON-ordlista | tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")-> [{"Department": "Finance"}] |
spark.databricks.clusterUsageTags.region |
Den region där arbetsytan finns | tc.getLocalProperty("spark.databricks.clusterUsageTags.region")-> "us-west-2" |
accountId |
Databricks-konto-ID för den kontext som körs | tc.getLocalProperty("accountId")-> "1234567890123456" |
orgId |
Arbetsyte-ID (inte tillgängligt i DBSQL) | tc.getLocalProperty("orgId")-> "987654321" |
spark.databricks.clusterUsageTags.sparkVersion |
Databricks Runtime-version för klustret (i icke-DBSQL-miljöer) | tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")-> "16.3" |
spark.databricks.clusterUsageTags.dbsqlVersion |
DBSQL-version (i DBSQL-miljöer) | tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")-> "2024.35" |