Aracılığıyla paylaş


Görev bağlamını UDF'de alma

Batch Unity Kataloğu Python UDF veya PySpark UDF çalıştırırken bağlam bilgilerini almak için TaskContext PySpark API'sini kullanın.

Örneğin, kullanıcının kimliği ve küme etiketleri gibi bağlam bilgileri dış hizmetlere erişmek için kullanıcının kimliğini doğrulayabilir.

Gereksinimler

Bağlam bilgilerini almak için TaskContext kullanma

PySpark UDF'leri veya Batch Unity Kataloğu Python UDF'leri için TaskContext örneklerini görmek için bir sekme seçin.

PySpark UDF

Aşağıdaki PySpark UDF örneği kullanıcının bağlamını yazdırır:

@udf
def log_context():
  import json
  from pyspark.taskcontext import TaskContext
  tc = TaskContext.get()

  # Returns current user executing the UDF
  session_user = tc.getLocalProperty("user")

  # Returns cluster tags
  tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags  ") or "[]"))

  # Returns current version details
  current_version = {
    "dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
    "dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
  }

  return {
    "user": session_user,
    "job_group_id": job_group_id,
    "tags": tags,
    "current_version": current_version
  }

Batch Unity Kataloğu Python UDF

Aşağıdaki Batch Unity Kataloğu Python UDF örneği, hizmet kimlik bilgilerini kullanarak bir AWS Lambda işlevini çağırmak için kullanıcının kimliğini alır:

%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # Automatically picks up DEFAULT credential:
  session = boto3.Session()

  client = session.client("lambda", region_name="us-west-2")

  # Can propagate TaskContext information to lambda context:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

    res = client.invoke(
      FunctionName="HashValuesFunction",
      InvocationType="RequestResponse",
      ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
        "utf-8"
      ),
      Payload=payload,
    )

    response_payload = json.loads(res["Payload"].read().decode("utf-8"))
    if "errorMessage" in response_payload:
      raise Exception(str(response_payload))

    yield pd.Series(response_payload["values"])
$$;

Kaydedildikten sonra UDF'yi çağırın:

SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

TaskContext özellikleri

TaskContext.getLocalProperty() yönteminde aşağıdaki özellik anahtarları vardır:

Özellik Anahtarı Açıklama Örnek Kullanım
user Şu anda UDF'yi yürüten kullanıcı tc.getLocalProperty("user")
->"alice"
spark.jobGroup.id Geçerli UDF ile bağlantılı Spark görev grubu kimliği tc.getLocalProperty("spark.jobGroup.id")
->"jobGroup-92318"
spark.databricks.clusterUsageTags.clusterAllTags Anahtar-değer çiftleri şeklinde küme meta veri etiketlerini JSON sözlüğü string gösterimi olarak biçimlendirin. tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")
->[{"Department": "Finance"}]
spark.databricks.clusterUsageTags.region Çalışma alanının bulunduğu bölge tc.getLocalProperty("spark.databricks.clusterUsageTags.region")
->"us-west-2"
accountId Çalışan bağlam için Databricks hesap kimliği tc.getLocalProperty("accountId")
->"1234567890123456"
orgId Çalışma Alanı Kimliği (DBSQL'de kullanılamaz) tc.getLocalProperty("orgId")
->"987654321"
spark.databricks.clusterUsageTags.sparkVersion Küme için Databricks Runtime sürümü (DBSQL olmayan ortamlarda) tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")
->"16.3"
spark.databricks.clusterUsageTags.dbsqlVersion DBSQL sürümü (DBSQL ortamlarında) tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
->"2024.35"