Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Gebruik de TaskContext PySpark-API om contextinformatie op te halen tijdens het uitvoeren van een Batch Unity Catalog Python UDF of PySpark UDF.
Contextinformatie zoals de identiteit en clustertags van de gebruiker kan bijvoorbeeld de identiteit van een gebruiker verifiëren voor toegang tot externe services.
Behoeften
TaskContext wordt ondersteund in Databricks Runtime-versies 16.3 en hoger.
TaskContext wordt ondersteund op de volgende UDF-typen:
TaskContext gebruiken om contextinformatie op te halen
Selecteer een tabblad om TaskContext-voorbeelden te zien voor PySpark UDF's of Batch Unity Catalog Python UDF's.
PySpark UDF
In het volgende PySpark UDF-voorbeeld wordt de context van de gebruiker afgedrukt:
@udf
def log_context():
import json
from pyspark.taskcontext import TaskContext
tc = TaskContext.get()
# Returns current user executing the UDF
session_user = tc.getLocalProperty("user")
# Returns cluster tags
tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags ") or "[]"))
# Returns current version details
current_version = {
"dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
"dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
}
return {
"user": session_user,
"job_group_id": job_group_id,
"tags": tags,
"current_version": current_version
}
Batch Unity Catalogus Python UDF
In het volgende Batch Unity Catalog Python UDF-voorbeeld wordt de identiteit van de gebruiker ophaalt om een AWS Lambda-functie aan te roepen met behulp van een servicereferentie:
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Can propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
Roep de UDF aan nadat deze is geregistreerd:
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
Eigenschappen van TaskContext
De TaskContext.getLocalProperty() methode heeft de volgende eigenschapssleutels:
| Eigenschapssleutel | Beschrijving | voorbeeldgebruik |
|---|---|---|
user |
De gebruiker die momenteel de UDF uitvoert | tc.getLocalProperty("user")-> "alice" |
spark.jobGroup.id |
De Spark-taakgroep-ID die is gekoppeld aan de huidige UDF | tc.getLocalProperty("spark.jobGroup.id")-> "jobGroup-92318" |
spark.databricks.clusterUsageTags.clusterAllTags |
Tags voor clustermetagegevens als sleutel-waardeparen die zijn opgemaakt als een tekenreeksweergave van een JSON-woordenlijst | tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")-> [{"Department": "Finance"}] |
spark.databricks.clusterUsageTags.region |
De regio waarin de werkruimte zich bevindt | tc.getLocalProperty("spark.databricks.clusterUsageTags.region")-> "us-west-2" |
accountId |
Databricks-account-id voor de actieve context | tc.getLocalProperty("accountId")-> "1234567890123456" |
orgId |
Werkruimte-id (niet beschikbaar in DBSQL) | tc.getLocalProperty("orgId")-> "987654321" |
spark.databricks.clusterUsageTags.sparkVersion |
Databricks Runtime-versie voor het cluster (in niet-DBSQL-omgevingen) | tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")-> "16.3" |
spark.databricks.clusterUsageTags.dbsqlVersion |
DBSQL-versie (in DBSQL-omgevingen) | tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")-> "2024.35" |