Bagikan melalui


Mendapatkan konteks tugas dalam UDF

Gunakan TASKContext PySpark API untuk mendapatkan informasi konteks saat menjalankan Batch Unity Catalog Python UDF atau PySpark UDF.

Misalnya, informasi konteks seperti identitas pengguna dan tag kluster dapat memverifikasi identitas pengguna untuk mengakses layanan eksternal.

Persyaratan

Menggunakan TaskContext untuk mendapatkan informasi konteks

Pilih tab untuk melihat contoh TaskContext untuk UDF PySpark atau Batch Unity Catalog Python UDFs .

PySpark UDF

Contoh UDF PySpark berikut mencetak konteks pengguna:

@udf
def log_context():
  import json
  from pyspark.taskcontext import TaskContext
  tc = TaskContext.get()

  # Returns current user executing the UDF
  session_user = tc.getLocalProperty("user")

  # Returns cluster tags
  tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags  ") or "[]"))

  # Returns current version details
  current_version = {
    "dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
    "dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
  }

  return {
    "user": session_user,
    "job_group_id": job_group_id,
    "tags": tags,
    "current_version": current_version
  }

Batch Katalog Unity Python UDF

Contoh Batch Unity Catalog Python UDF berikut mendapatkan identitas pengguna untuk memanggil fungsi AWS Lambda menggunakan kredensial layanan:

%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # Automatically picks up DEFAULT credential:
  session = boto3.Session()

  client = session.client("lambda", region_name="us-west-2")

  # Can propagate TaskContext information to lambda context:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

    res = client.invoke(
      FunctionName="HashValuesFunction",
      InvocationType="RequestResponse",
      ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
        "utf-8"
      ),
      Payload=payload,
    )

    response_payload = json.loads(res["Payload"].read().decode("utf-8"))
    if "errorMessage" in response_payload:
      raise Exception(str(response_payload))

    yield pd.Series(response_payload["values"])
$$;

Hubungi UDF setelah terdaftar:

SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

Properti dari TaskContext

Metode TaskContext.getLocalProperty() ini memiliki kunci properti berikut:

Kunci Properti Deskripsi Penggunaan Contoh
user Pengguna saat ini menjalankan UDF tc.getLocalProperty("user")
->"alice"
spark.jobGroup.id ID grup pekerjaan Spark yang terkait dengan UDF saat ini tc.getLocalProperty("spark.jobGroup.id")
->"jobGroup-92318"
spark.databricks.clusterUsageTags.clusterAllTags Tag metadata kluster sebagai pasangan kunci-nilai yang diformat sebagai representasi string kamus JSON tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")
->[{"Department": "Finance"}]
spark.databricks.clusterUsageTags.region Wilayah tempat ruang kerja berada tc.getLocalProperty("spark.databricks.clusterUsageTags.region")
->"us-west-2"
accountId ID akun Databricks untuk konteks yang sedang berjalan tc.getLocalProperty("accountId")
->"1234567890123456"
orgId ID Ruang Kerja (tidak tersedia di DBSQL) tc.getLocalProperty("orgId")
->"987654321"
spark.databricks.clusterUsageTags.sparkVersion Versi Databricks Runtime untuk kluster (pada lingkungan non-DBSQL) tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")
->"16.3"
spark.databricks.clusterUsageTags.dbsqlVersion Versi DBSQL (di lingkungan DBSQL) tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
->"2024.35"