Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Użyj interfejsu API PySpark TaskContext, aby uzyskać informacje kontekstowe podczas uruchamiania funkcji zdefiniowanej przez użytkownika języka Python wykazu usługi Batch Unity lub funkcji zdefiniowanej przez użytkownika PySpark.
Na przykład informacje kontekstowe, takie jak tożsamość użytkownika i tagi klastra, mogą weryfikować tożsamość użytkownika w celu uzyskania dostępu do usług zewnętrznych.
Wymagania
TaskContext jest obsługiwana w środowisku Databricks Runtime w wersji 16.3 lub nowszej.
TaskContext jest obsługiwana w następujących typach funkcji zdefiniowanych przez użytkownika:
Używanie TaskContext do uzyskiwania informacji kontekstowych
Wybierz kartę, aby wyświetlić przykłady TaskContext dla funkcji zdefiniowanych przez użytkownika PySpark lub funkcji zdefiniowanych przez użytkownika języka Python w wykazie usługi Batch Unity.
PySpark UDF
Poniższy przykład funkcji zdefiniowanej przez użytkownika PySpark drukuje kontekst użytkownika:
@udf
def log_context():
import json
from pyspark.taskcontext import TaskContext
tc = TaskContext.get()
# Returns current user executing the UDF
session_user = tc.getLocalProperty("user")
# Returns cluster tags
tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags ") or "[]"))
# Returns current version details
current_version = {
"dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
"dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
}
return {
"user": session_user,
"job_group_id": job_group_id,
"tags": tags,
"current_version": current_version
}
Batch Unity Catalog funkcja UDF w Pythonie
Poniższy przykład funkcji zdefiniowanej przez użytkownika języka Python w wykazie aparatu Batch Unity pobiera tożsamość użytkownika w celu wywołania funkcji Lambda platformy AWS przy użyciu poświadczeń usługi:
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Can propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
Zadzwoń do UDF po jego zarejestrowaniu:
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
Właściwości TaskContext
Metoda TaskContext.getLocalProperty() ma następujące klucze właściwości:
| Klucz właściwości | Opis | Przykładowe użycie |
|---|---|---|
user |
Użytkownik aktualnie wykonujący funkcję zdefiniowaną przez użytkownika | tc.getLocalProperty("user")-> "alice" |
spark.jobGroup.id |
Identyfikator grupy zadań platformy Spark skojarzony z bieżącą funkcją zdefiniowaną przez użytkownika | tc.getLocalProperty("spark.jobGroup.id")-> "jobGroup-92318" |
spark.databricks.clusterUsageTags.clusterAllTags |
Tagi metadanych klastra jako pary klucz-wartość sformatowane jako reprezentacja ciągu słownika JSON | tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")-> [{"Department": "Finance"}] |
spark.databricks.clusterUsageTags.region |
Region, w którym znajduje się obszar roboczy | tc.getLocalProperty("spark.databricks.clusterUsageTags.region")-> "us-west-2" |
accountId |
Identyfikator konta usługi Databricks dla uruchomionego kontekstu | tc.getLocalProperty("accountId")-> "1234567890123456" |
orgId |
Identyfikator obszaru roboczego (niedostępny w DBSQL) | tc.getLocalProperty("orgId")-> "987654321" |
spark.databricks.clusterUsageTags.sparkVersion |
Wersja środowiska uruchomieniowego usługi Databricks dla klastra (w środowiskach innych niż DBSQL) | tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")-> "16.3" |
spark.databricks.clusterUsageTags.dbsqlVersion |
Wersja DBSQL (w środowiskach DBSQL) | tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")-> "2024.35" |