Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Use a API TaskContext do PySpark para obter informações de contexto ao executar uma UDF Python do Catálogo do Lote do Unity ou UDF do PySpark.
Por exemplo, informações de contexto, como a identidade do usuário e as marcas de cluster, podem verificar a identidade de um usuário para acessar serviços externos.
Requisitos
Há suporte para TaskContext nas versões 16.3 e superiores do Databricks Runtime.
TaskContext é suportado nos seguintes tipos de UDF:
Usar TaskContext para obter informações de contexto
Selecione uma guia para ver exemplos de TaskContext para UDFs do PySpark ou UDFs do Python do Catálogo do Unity em Lote.
PySpark UDF
O exemplo de UDF do PySpark a seguir imprime o contexto do usuário:
@udf
def log_context():
import json
from pyspark.taskcontext import TaskContext
tc = TaskContext.get()
# Returns current user executing the UDF
session_user = tc.getLocalProperty("user")
# Returns cluster tags
tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags ") or "[]"))
# Returns current version details
current_version = {
"dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
"dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
}
return {
"user": session_user,
"job_group_id": job_group_id,
"tags": tags,
"current_version": current_version
}
UDF em lote do Python do Catálogo do Unity
O exemplo de UDF Python do Catálogo do Lote do Unity a seguir obtém a identidade do usuário para chamar uma função do AWS Lambda usando uma credencial de serviço:
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Can propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
Chame a UDF depois que ela for registrada:
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
Propriedades TaskContext
O TaskContext.getLocalProperty() método tem as seguintes chaves de propriedade:
| Chave de propriedade | Descrição | Utilização de Exemplo |
|---|---|---|
user |
O usuário que executa a UDF no momento | tc.getLocalProperty("user")-> "alice" |
spark.jobGroup.id |
A ID do grupo de trabalhos do Spark associada à UDF atual | tc.getLocalProperty("spark.jobGroup.id")-> "jobGroup-92318" |
spark.databricks.clusterUsageTags.clusterAllTags |
Agrupar tags de metadados como pares de chave-valor formatados como uma representação de cadeia de caracteres de um dicionário JSON | tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")-> [{"Department": "Finance"}] |
spark.databricks.clusterUsageTags.region |
A região em que o espaço de trabalho reside | tc.getLocalProperty("spark.databricks.clusterUsageTags.region")-> "us-west-2" |
accountId |
ID da conta do Databricks para o contexto de execução | tc.getLocalProperty("accountId")-> "1234567890123456" |
orgId |
ID do espaço de trabalho (não disponível no DBSQL) | tc.getLocalProperty("orgId")-> "987654321" |
spark.databricks.clusterUsageTags.sparkVersion |
Versão do Databricks Runtime para o cluster (em ambientes não DBSQL) | tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")-> "16.3" |
spark.databricks.clusterUsageTags.dbsqlVersion |
Versão do DBSQL (em ambientes DBSQL) | tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")-> "2024.35" |