Udostępnij przez


Pobieranie kontekstu zadania w funkcji zdefiniowanej przez użytkownika

Użyj interfejsu API PySpark TaskContext, aby uzyskać informacje kontekstowe podczas uruchamiania funkcji zdefiniowanej przez użytkownika języka Python wykazu usługi Batch Unity lub funkcji zdefiniowanej przez użytkownika PySpark.

Na przykład informacje kontekstowe, takie jak tożsamość użytkownika i tagi klastra, mogą weryfikować tożsamość użytkownika w celu uzyskania dostępu do usług zewnętrznych.

Wymagania

Używanie TaskContext do uzyskiwania informacji kontekstowych

Wybierz kartę, aby wyświetlić przykłady TaskContext dla funkcji zdefiniowanych przez użytkownika PySpark lub funkcji zdefiniowanych przez użytkownika języka Python w wykazie usługi Batch Unity.

PySpark UDF

Poniższy przykład funkcji zdefiniowanej przez użytkownika PySpark drukuje kontekst użytkownika:

@udf
def log_context():
  import json
  from pyspark.taskcontext import TaskContext
  tc = TaskContext.get()

  # Returns current user executing the UDF
  session_user = tc.getLocalProperty("user")

  # Returns cluster tags
  tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags  ") or "[]"))

  # Returns current version details
  current_version = {
    "dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
    "dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
  }

  return {
    "user": session_user,
    "job_group_id": job_group_id,
    "tags": tags,
    "current_version": current_version
  }

Batch Unity Catalog funkcja UDF w Pythonie

Poniższy przykład funkcji zdefiniowanej przez użytkownika języka Python w wykazie aparatu Batch Unity pobiera tożsamość użytkownika w celu wywołania funkcji Lambda platformy AWS przy użyciu poświadczeń usługi:

%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # Automatically picks up DEFAULT credential:
  session = boto3.Session()

  client = session.client("lambda", region_name="us-west-2")

  # Can propagate TaskContext information to lambda context:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

    res = client.invoke(
      FunctionName="HashValuesFunction",
      InvocationType="RequestResponse",
      ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
        "utf-8"
      ),
      Payload=payload,
    )

    response_payload = json.loads(res["Payload"].read().decode("utf-8"))
    if "errorMessage" in response_payload:
      raise Exception(str(response_payload))

    yield pd.Series(response_payload["values"])
$$;

Zadzwoń do UDF po jego zarejestrowaniu:

SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

Właściwości TaskContext

Metoda TaskContext.getLocalProperty() ma następujące klucze właściwości:

Klucz właściwości Opis Przykładowe użycie
user Użytkownik aktualnie wykonujący funkcję zdefiniowaną przez użytkownika tc.getLocalProperty("user")
->"alice"
spark.jobGroup.id Identyfikator grupy zadań platformy Spark skojarzony z bieżącą funkcją zdefiniowaną przez użytkownika tc.getLocalProperty("spark.jobGroup.id")
->"jobGroup-92318"
spark.databricks.clusterUsageTags.clusterAllTags Tagi metadanych klastra jako pary klucz-wartość sformatowane jako reprezentacja ciągu słownika JSON tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")
->[{"Department": "Finance"}]
spark.databricks.clusterUsageTags.region Region, w którym znajduje się obszar roboczy tc.getLocalProperty("spark.databricks.clusterUsageTags.region")
->"us-west-2"
accountId Identyfikator konta usługi Databricks dla uruchomionego kontekstu tc.getLocalProperty("accountId")
->"1234567890123456"
orgId Identyfikator obszaru roboczego (niedostępny w DBSQL) tc.getLocalProperty("orgId")
->"987654321"
spark.databricks.clusterUsageTags.sparkVersion Wersja środowiska uruchomieniowego usługi Databricks dla klastra (w środowiskach innych niż DBSQL) tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")
->"16.3"
spark.databricks.clusterUsageTags.dbsqlVersion Wersja DBSQL (w środowiskach DBSQL) tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
->"2024.35"