Hämta uppgiftskontext i en UDF

Använd TaskContext PySpark-API:et för att hämta kontextinformation när du kör en Batch Unity-katalog, Python UDF eller PySpark UDF.

Kontextinformation som användarens identitet och klustertaggar kan till exempel verifiera en användares identitet för att få åtkomst till externa tjänster.

Kravspecifikation

Använda TaskContext för att hämta kontextinformation

Välj en flik för att se TaskContext-exempel för PySpark UDF:er eller Batch Unity Catalog Python UDF:er .

PySpark UDF

I följande PySpark UDF-exempel skrivs användarens kontext ut:

@udf
def log_context():
  import json
  from pyspark.taskcontext import TaskContext
  tc = TaskContext.get()

  # Returns current user executing the UDF
  session_user = tc.getLocalProperty("user")

  # Returns cluster tags
  tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags  ") or "[]"))

  # Returns current version details
  current_version = {
    "dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
    "dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
  }

  return {
    "user": session_user,
    "job_group_id": job_group_id,
    "tags": tags,
    "current_version": current_version
  }

Batch Unity Katalog Python UDF

Följande Batch Unity Catalog Python UDF-exempel hämtar användarens identitet för att anropa en AWS Lambda-funktion med hjälp av en tjänstautentiseringsuppgift:

%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # Automatically picks up DEFAULT credential:
  session = boto3.Session()

  client = session.client("lambda", region_name="us-west-2")

  # Can propagate TaskContext information to lambda context:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

    res = client.invoke(
      FunctionName="HashValuesFunction",
      InvocationType="RequestResponse",
      ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
        "utf-8"
      ),
      Payload=payload,
    )

    response_payload = json.loads(res["Payload"].read().decode("utf-8"))
    if "errorMessage" in response_payload:
      raise Exception(str(response_payload))

    yield pd.Series(response_payload["values"])
$$;

Ring UDF efter att den har registrerats:

SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

TaskContext-egenskaper

Metoden TaskContext.getLocalProperty() har följande egenskapsnycklar:

Nyckel för egenskap Beskrivning Exempel på användning
user Den användare som för närvarande kör UDF tc.getLocalProperty("user")
->"alice"
spark.jobGroup.id Det Spark-jobbgrupps-ID som är associerat med den aktuella UDF:en tc.getLocalProperty("spark.jobGroup.id")
->"jobGroup-92318"
spark.databricks.clusterUsageTags.clusterAllTags Klustermetadatataggar som nyckel/värde-par formaterade som en strängrepresentation av en JSON-ordlista tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")
->[{"Department": "Finance"}]
spark.databricks.clusterUsageTags.region Den region där arbetsytan finns tc.getLocalProperty("spark.databricks.clusterUsageTags.region")
->"us-west-2"
accountId Databricks-konto-ID för den kontext som körs tc.getLocalProperty("accountId")
->"1234567890123456"
orgId Arbetsyte-ID (inte tillgängligt i DBSQL) tc.getLocalProperty("orgId")
->"987654321"
spark.databricks.clusterUsageTags.sparkVersion Databricks Runtime-version för klustret (i icke-DBSQL-miljöer) tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")
->"16.3"
spark.databricks.clusterUsageTags.dbsqlVersion DBSQL-version (i DBSQL-miljöer) tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
->"2024.35"