Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Gunakan TASKContext PySpark API untuk mendapatkan informasi konteks saat menjalankan Batch Unity Catalog Python UDF atau PySpark UDF.
Misalnya, informasi konteks seperti identitas pengguna dan tag kluster dapat memverifikasi identitas pengguna untuk mengakses layanan eksternal.
Persyaratan
TaskContext didukung pada Databricks Runtime versi 16.3 ke atas.
TaskContext didukung pada jenis UDF berikut:
Menggunakan TaskContext untuk mendapatkan informasi konteks
Pilih tab untuk melihat contoh TaskContext untuk UDF PySpark atau Batch Unity Catalog Python UDFs .
PySpark UDF
Contoh UDF PySpark berikut mencetak konteks pengguna:
@udf
def log_context():
import json
from pyspark.taskcontext import TaskContext
tc = TaskContext.get()
# Returns current user executing the UDF
session_user = tc.getLocalProperty("user")
# Returns cluster tags
tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags ") or "[]"))
# Returns current version details
current_version = {
"dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
"dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
}
return {
"user": session_user,
"job_group_id": job_group_id,
"tags": tags,
"current_version": current_version
}
Batch Katalog Unity Python UDF
Contoh Batch Unity Catalog Python UDF berikut mendapatkan identitas pengguna untuk memanggil fungsi AWS Lambda menggunakan kredensial layanan:
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Can propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
Hubungi UDF setelah terdaftar:
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
Properti dari TaskContext
Metode TaskContext.getLocalProperty() ini memiliki kunci properti berikut:
| Kunci Properti | Deskripsi | Penggunaan Contoh |
|---|---|---|
user |
Pengguna saat ini menjalankan UDF | tc.getLocalProperty("user")-> "alice" |
spark.jobGroup.id |
ID grup pekerjaan Spark yang terkait dengan UDF saat ini | tc.getLocalProperty("spark.jobGroup.id")-> "jobGroup-92318" |
spark.databricks.clusterUsageTags.clusterAllTags |
Tag metadata kluster sebagai pasangan kunci-nilai yang diformat sebagai representasi string kamus JSON | tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")-> [{"Department": "Finance"}] |
spark.databricks.clusterUsageTags.region |
Wilayah tempat ruang kerja berada | tc.getLocalProperty("spark.databricks.clusterUsageTags.region")-> "us-west-2" |
accountId |
ID akun Databricks untuk konteks yang sedang berjalan | tc.getLocalProperty("accountId")-> "1234567890123456" |
orgId |
ID Ruang Kerja (tidak tersedia di DBSQL) | tc.getLocalProperty("orgId")-> "987654321" |
spark.databricks.clusterUsageTags.sparkVersion |
Versi Databricks Runtime untuk kluster (pada lingkungan non-DBSQL) | tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")-> "16.3" |
spark.databricks.clusterUsageTags.dbsqlVersion |
Versi DBSQL (di lingkungan DBSQL) | tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")-> "2024.35" |