Oharra
Orrialde honetara sartzeak baimena behar du. Saioa hasteko edo direktorioak aldatzen saia zaitezke.
Orrialde honetara sartzeak baimena behar du. Direktorioak aldatzen saia zaitezke.
Use la API de TaskContext PySpark para obtener información de contexto mientras ejecuta una UDF de Python o una UDF de PySpark de Batch Unity Catalog.
Por ejemplo, la información de contexto, como la identidad del usuario y las etiquetas de clúster, puede verificar la identidad de un usuario para acceder a servicios externos.
Requisitos
TaskContext es compatible con las versiones 16.3 y posteriores de Databricks Runtime.
TaskContext es compatible con los siguientes tipos de UDF:
Uso de TaskContext para obtener información de contexto
Seleccione una pestaña para ver ejemplos de TaskContext para las UDF de PySpark o las UDF de Python del catálogo de Unity por lotes.
PySpark UDF
En el siguiente ejemplo de UDF de PySpark se imprime el contexto del usuario:
@udf
def log_context():
import json
from pyspark.taskcontext import TaskContext
tc = TaskContext.get()
# Returns current user executing the UDF
session_user = tc.getLocalProperty("user")
# Returns cluster tags
tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags ") or "[]"))
# Returns current version details
current_version = {
"dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
"dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
}
return {
"user": session_user,
"job_group_id": job_group_id,
"tags": tags,
"current_version": current_version
}
UDF de Python por lotes en Unity Catalog
En el siguiente ejemplo de UDF de Python de Batch Unity Catalog se obtiene la identidad del usuario para llamar a una función de AWS Lambda mediante una credencial de servicio:
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Can propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
Llame a la UDF después de registrarla:
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
Propiedades de TaskContext
El TaskContext.getLocalProperty() método tiene las siguientes claves de propiedad:
| Clave de propiedad | Descripción | Uso de ejemplo |
|---|---|---|
user |
El usuario que está ejecutando actualmente la UDF | tc.getLocalProperty("user")-> "alice" |
spark.jobGroup.id |
El identificador de grupo de trabajos de Spark asociado a la UDF actual | tc.getLocalProperty("spark.jobGroup.id")-> "jobGroup-92318" |
spark.databricks.clusterUsageTags.clusterAllTags |
Etiquetas de metadatos de clúster como pares clave-valor con formato de representación de cadena de un diccionario JSON | tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")-> [{"Department": "Finance"}] |
spark.databricks.clusterUsageTags.region |
La región en la que reside el área de trabajo | tc.getLocalProperty("spark.databricks.clusterUsageTags.region")-> "us-west-2" |
accountId |
Id. de cuenta de Databricks para el contexto en ejecución | tc.getLocalProperty("accountId")-> "1234567890123456" |
orgId |
Id. de área de trabajo (no disponible en DBSQL) | tc.getLocalProperty("orgId")-> "987654321" |
spark.databricks.clusterUsageTags.sparkVersion |
Versión de Databricks Runtime para el clúster (en entornos que no son DBSQL) | tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")-> "16.3" |
spark.databricks.clusterUsageTags.dbsqlVersion |
Versión de DBSQL (en entornos DBSQL) | tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")-> "2024.35" |