Partilhar via


Obter contexto de tarefa em uma UDF

Use a API TaskContext PySpark para obter informações de contexto durante a execução de um Batch Unity Catalog Python UDF ou PySpark UDF.

Por exemplo, informações de contexto, como a identidade do usuário e as tags de cluster, podem verificar a identidade de um usuário para acessar serviços externos.

Requerimentos

Use TaskContext para obter informações de contexto

Selecione uma guia para ver exemplos de TaskContext para UDFs PySpark ou UDFs Python do Batch Unity Catalog .

PySpark UDF

O seguinte exemplo de UDF do PySpark imprime o contexto do usuário:

@udf
def log_context():
  import json
  from pyspark.taskcontext import TaskContext
  tc = TaskContext.get()

  # Returns current user executing the UDF
  session_user = tc.getLocalProperty("user")

  # Returns cluster tags
  tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags  ") or "[]"))

  # Returns current version details
  current_version = {
    "dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
    "dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
  }

  return {
    "user": session_user,
    "job_group_id": job_group_id,
    "tags": tags,
    "current_version": current_version
  }

Batch Unity Catálogo Python UDF

O seguinte exemplo de UDF Python do Batch Unity Catalog obtém a identidade do usuário para chamar uma função do AWS Lambda usando uma credencial de serviço:

%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # Automatically picks up DEFAULT credential:
  session = boto3.Session()

  client = session.client("lambda", region_name="us-west-2")

  # Can propagate TaskContext information to lambda context:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

    res = client.invoke(
      FunctionName="HashValuesFunction",
      InvocationType="RequestResponse",
      ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
        "utf-8"
      ),
      Payload=payload,
    )

    response_payload = json.loads(res["Payload"].read().decode("utf-8"))
    if "errorMessage" in response_payload:
      raise Exception(str(response_payload))

    yield pd.Series(response_payload["values"])
$$;

Chame a UDF depois de ser registada:

SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

Propriedades TaskContext

O TaskContext.getLocalProperty() método tem as seguintes chaves de propriedade:

Chave do imóvel Descrição Utilização de Exemplo
user O usuário atualmente executando o UDF tc.getLocalProperty("user")
->"alice"
spark.jobGroup.id O ID do grupo de trabalho do Spark associado ao UDF atual tc.getLocalProperty("spark.jobGroup.id")
->"jobGroup-92318"
spark.databricks.clusterUsageTags.clusterAllTags Tags de metadados de cluster como pares chave-valor formatados como uma representação de cadeia de caracteres de um dicionário JSON tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")
->[{"Department": "Finance"}]
spark.databricks.clusterUsageTags.region A região onde o espaço de trabalho reside tc.getLocalProperty("spark.databricks.clusterUsageTags.region")
->"us-west-2"
accountId ID da conta Databricks para o contexto em execução tc.getLocalProperty("accountId")
->"1234567890123456"
orgId ID do espaço de trabalho (não disponível no DBSQL) tc.getLocalProperty("orgId")
->"987654321"
spark.databricks.clusterUsageTags.sparkVersion Versão do Databricks Runtime para o cluster (em ambientes não-DBSQL) tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")
->"16.3"
spark.databricks.clusterUsageTags.dbsqlVersion Versão do DBSQL (em ambientes DBSQL) tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
->"2024.35"