Sdílet prostřednictvím


Získání kontextu úkolu v UDF

Pomocí rozhraní PySpark TaskContext API získejte kontextové informace při spuštění Python UDF nebo PySpark UDF v rámci Batch Unity Catalog.

Například kontextové informace, jako je identita uživatele a značky clusteru, můžou ověřit identitu uživatele pro přístup k externím službám.

Požadavky

Získání informací o kontextu pomocí TaskContextu

Vyberte kartu pro zobrazení ukázek TaskContextu uživatelsky definovaných funkcí PySpark nebo uživatelsky definovaných funkcí v katalogu Batch Unity v Pythonu.

PySpark UDF

Následující příklad PySpark UDF vytiskne kontext uživatele:

@udf
def log_context():
  import json
  from pyspark.taskcontext import TaskContext
  tc = TaskContext.get()

  # Returns current user executing the UDF
  session_user = tc.getLocalProperty("user")

  # Returns cluster tags
  tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags  ") or "[]"))

  # Returns current version details
  current_version = {
    "dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
    "dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
  }

  return {
    "user": session_user,
    "job_group_id": job_group_id,
    "tags": tags,
    "current_version": current_version
  }

Dávkový katalog Unity pro Python UDF

Následující příklad funkce Batch Unity Catalog Python UDF získá uživatelskou identitu pro volání funkce AWS Lambda pomocí přihlašovacích údajů služby.

%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # Automatically picks up DEFAULT credential:
  session = boto3.Session()

  client = session.client("lambda", region_name="us-west-2")

  # Can propagate TaskContext information to lambda context:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

    res = client.invoke(
      FunctionName="HashValuesFunction",
      InvocationType="RequestResponse",
      ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
        "utf-8"
      ),
      Payload=payload,
    )

    response_payload = json.loads(res["Payload"].read().decode("utf-8"))
    if "errorMessage" in response_payload:
      raise Exception(str(response_payload))

    yield pd.Series(response_payload["values"])
$$;

Zavolejte uživatelem definovanou funkci po její registraci.

SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

Vlastnosti TaskContext

Metoda TaskContext.getLocalProperty() má následující klíče vlastností:

Klíč vlastnosti Popis příklad využití
user Uživatel, který právě spouští uživatelem definovanou funkci tc.getLocalProperty("user")
->"alice"
spark.jobGroup.id ID skupiny úloh Sparku přidružené k aktuální uživatelsky definované funkci tc.getLocalProperty("spark.jobGroup.id")
->"jobGroup-92318"
spark.databricks.clusterUsageTags.clusterAllTags Značky metadat clusteru jako páry klíč-hodnota formátované jako řetězcová reprezentace slovníku JSON tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")
->[{"Department": "Finance"}]
spark.databricks.clusterUsageTags.region Oblast, ve které se nachází pracovní prostor tc.getLocalProperty("spark.databricks.clusterUsageTags.region")
->"us-west-2"
accountId ID účtu Databricks pro běžící kontext tc.getLocalProperty("accountId")
->"1234567890123456"
orgId ID pracovního prostoru (není k dispozici ve službě DBSQL) tc.getLocalProperty("orgId")
->"987654321"
spark.databricks.clusterUsageTags.sparkVersion Verze databricks Runtime pro cluster (v prostředích mimo DBSQL) tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")
->"16.3"
spark.databricks.clusterUsageTags.dbsqlVersion Verze DBSQL (v prostředích DBSQL) tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
->"2024.35"