Delen via


Taakcontext ophalen in een UDF

Gebruik de TaskContext PySpark-API om contextinformatie op te halen tijdens het uitvoeren van een Batch Unity Catalog Python UDF of PySpark UDF.

Contextinformatie zoals de identiteit en clustertags van de gebruiker kan bijvoorbeeld de identiteit van een gebruiker verifiëren voor toegang tot externe services.

Behoeften

TaskContext gebruiken om contextinformatie op te halen

Selecteer een tabblad om TaskContext-voorbeelden te zien voor PySpark UDF's of Batch Unity Catalog Python UDF's.

PySpark UDF

In het volgende PySpark UDF-voorbeeld wordt de context van de gebruiker afgedrukt:

@udf
def log_context():
  import json
  from pyspark.taskcontext import TaskContext
  tc = TaskContext.get()

  # Returns current user executing the UDF
  session_user = tc.getLocalProperty("user")

  # Returns cluster tags
  tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags  ") or "[]"))

  # Returns current version details
  current_version = {
    "dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
    "dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
  }

  return {
    "user": session_user,
    "job_group_id": job_group_id,
    "tags": tags,
    "current_version": current_version
  }

Batch Unity Catalogus Python UDF

In het volgende Batch Unity Catalog Python UDF-voorbeeld wordt de identiteit van de gebruiker ophaalt om een AWS Lambda-functie aan te roepen met behulp van een servicereferentie:

%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # Automatically picks up DEFAULT credential:
  session = boto3.Session()

  client = session.client("lambda", region_name="us-west-2")

  # Can propagate TaskContext information to lambda context:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

    res = client.invoke(
      FunctionName="HashValuesFunction",
      InvocationType="RequestResponse",
      ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
        "utf-8"
      ),
      Payload=payload,
    )

    response_payload = json.loads(res["Payload"].read().decode("utf-8"))
    if "errorMessage" in response_payload:
      raise Exception(str(response_payload))

    yield pd.Series(response_payload["values"])
$$;

Roep de UDF aan nadat deze is geregistreerd:

SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

Eigenschappen van TaskContext

De TaskContext.getLocalProperty() methode heeft de volgende eigenschapssleutels:

Eigenschapssleutel Beschrijving voorbeeldgebruik
user De gebruiker die momenteel de UDF uitvoert tc.getLocalProperty("user")
->"alice"
spark.jobGroup.id De Spark-taakgroep-ID die is gekoppeld aan de huidige UDF tc.getLocalProperty("spark.jobGroup.id")
->"jobGroup-92318"
spark.databricks.clusterUsageTags.clusterAllTags Tags voor clustermetagegevens als sleutel-waardeparen die zijn opgemaakt als een tekenreeksweergave van een JSON-woordenlijst tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")
->[{"Department": "Finance"}]
spark.databricks.clusterUsageTags.region De regio waarin de werkruimte zich bevindt tc.getLocalProperty("spark.databricks.clusterUsageTags.region")
->"us-west-2"
accountId Databricks-account-id voor de actieve context tc.getLocalProperty("accountId")
->"1234567890123456"
orgId Werkruimte-id (niet beschikbaar in DBSQL) tc.getLocalProperty("orgId")
->"987654321"
spark.databricks.clusterUsageTags.sparkVersion Databricks Runtime-versie voor het cluster (in niet-DBSQL-omgevingen) tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")
->"16.3"
spark.databricks.clusterUsageTags.dbsqlVersion DBSQL-versie (in DBSQL-omgevingen) tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
->"2024.35"