중요합니다
이 기능은 공개 미리보기 단계에 있습니다.
Batch Unity Catalog Python UDF는 데이터를 일괄 처리할 수 있는 Python 코드를 작성할 수 있게 하여 Unity Catalog UDF의 기능을 확장합니다. 이는 행별 UDF와 관련된 오버헤드를 줄임으로써 효율성을 크게 향상시킵니다. 이러한 최적화는 Unity Catalog 배치 Python UDF를 대규모 데이터 처리에 이상적이게 만듭니다.
요구 사항
Batch Unity Catalog Python UDF는 Databricks 런타임 버전 16.3 이상이 필요합니다.
배치 Unity Catalog Python UDF 생성하기
배치 Unity 카탈로그 Python UDF를 작성하는 것은 정규 Unity 카탈로그 UDF를 작성하는 것과 유사하며 다음 추가 사항이 있습니다:
-
PARAMETER STYLE PANDAS: 이것은 UDF가 판다스 반복자를 사용하여 데이터를 일괄 처리하는 것을 명시합니다. -
HANDLER 'handler_function': 이는 배치 처리를 위해 호출되는 핸들러 함수를 지정합니다.
다음 예제에서는 배치 유니티 카탈로그 Python UDF를 만드는 방법을 보여줍니다.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
UDF를 등록한 후에는 SQL 또는 Python을 사용하여 호출할 수 있습니다.
SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);
배치 UDF 핸들러 함수
Batch Unity Catalog Python UDF는 배치를 처리하고 결과를 산출하는 핸들러 함수가 필요합니다. UDF를 생성할 때 HANDLER 키를 사용하여 핸들러 함수의 이름을 지정해야 합니다.
제어기 함수는 다음을 수행합니다:
- 하나 이상의
pandas.Series를 반복하는 이터레이터 인수를 수용합니다. 각pandas.Series에는 UDF의 입력 매개변수가 포함되어 있습니다. - 제너레이터를 반복하면서 데이터를 처리합니다.
- generator iterator를 반환합니다.
배치 Unity Catalog Python UDFs는 입력과 동일한 수의 행을 반환해야 합니다. 처리기 함수는 각 배치에 대해 입력 시리즈와 동일한 길이의 pandas.Series을 생성하여 이를 보장합니다.
사용자 지정 종속성 설치
Databricks 런타임 환경을 넘어 Batch Unity Catalog Python UDF의 기능을 확장하기 위해 외부 라이브러리에 대한 사용자 정의 종속성을 정의할 수 있습니다.
사용자 지정 종속성 사용하여 UDF 확장참조하세요.
Batch UDF는 단일 또는 복수의 매개변수를 수용할 수 있습니다.
단일 매개변수: 처리기 함수가 단일 입력 매개변수를 사용할 때, 각 배치에 대해 pandas.Series를 순회하는 반복자를 받습니다.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for value_batch in batch_iter:
d = {"min": value_batch.min(), "max": value_batch.max()}
yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;
여러 매개변수: 여러 입력 매개변수에 대해, 핸들러 함수는 여러 pandas.Series를 순회하는 반복자를 받습니다. 시리즈의 값은 입력 매개변수와 동일한 순서로 배열되어 있습니다.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for p1, p2 in batch_iter: # same order as arguments above
yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);
성능을 최적화하려면 비용이 많이 드는 작업을 분리하십시오.
컴퓨팅 비용이 많이 드는 작업은 이러한 작업을 핸들러 함수에서 분리하여 최적화할 수 있습니다. 이러한 방식으로 데이터 배치마다 반복되는 대신, 한 번만 실행되도록 보장합니다.
다음 예제는 비용이 많이 드는 계산이 한 번만 수행되도록 보장하는 방법을 보여줍니다.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
# expensive computation...
return 1
expensive_value = compute_value()
def handler_func(batch_iter):
for batch in batch_iter:
yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL
격리 경계 및 보안
비고
공유 격리 환경에는 Databricks Runtime 17.1 이상이 필요합니다. 이전 버전에서는 모든 Batch Unity 카탈로그 Python UDF가 엄격한 격리 모드에서 실행됩니다.
동일한 소유자와 Batch Unity 카탈로그 Python UDF는 기본적으로 격리 환경을 공유할 수 있습니다. 이렇게 하면 시작해야 하는 개별 환경의 수를 줄여 성능을 향상시키고 메모리 사용량을 줄일 수 있습니다.
엄격한 격리
UDF가 항상 완전히 격리된 자체 환경에서 실행되도록 하려면 특성 절을 STRICT ISOLATION 추가합니다.
대부분의 UDF는 엄격한 격리가 필요하지 않습니다. 표준 데이터 처리 UDF는 기본 공유 격리 환경의 이점을 활용하고 메모리 사용량을 줄이면 더 빠르게 실행됩니다.
UDF에 STRICT ISOLATION 다음과 같은 특성 절을 추가합니다.
- 또는 유사한 함수를 사용하여
eval()exec()코드로 입력 실행 - 로컬 파일 시스템에 파일 쓰기
- 전역 변수 또는 시스템 상태 수정
- 환경 변수 액세스 또는 수정
다음 예제에서는 입력을 코드로 실행하고 엄격한 격리가 필요한 UDF를 보여 줍니다.
CREATE OR REPLACE TEMPORARY FUNCTION eval_string(input STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
STRICT ISOLATION
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for code_series in batch_iter:
def eval_func(code):
try:
return str(eval(code))
except Exception as e:
return f"Error: {e}"
yield code_series.apply(eval_func)
$$;
Batch Unity 카탈로그 Python UDF의 서비스 자격 증명
배치 Unity Catalog Python UDF는 외부 클라우드 서비스를 액세스하기 위해 Unity Catalog 서비스 자격 증명을 사용할 수 있습니다. 데이터 처리 워크플로에 보안 토큰화 도구와 같은 클라우드 기능을 통합하는 데 특히 유용합니다.
비고
서비스 자격 증명에 대한 UDF별 API:
UDF에서 서비스 자격 증명에 액세스하기 위해 databricks.service_credentials.getServiceCredentialsProvider()를 사용합니다.
이는 Notebook에서 사용되는 dbutils.credentials.getServiceCredentialsProvider() 함수와 다르며, 이는 UDF 실행 컨텍스트에서는 사용할 수 없습니다.
서비스 자격 증명을 만들려면 서비스 자격 증명 만들기를 참조하세요.
사용하려는 서비스 자격 증명을 UDF 정의의 CREDENTIALS 절에 지정하십시오.
CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
`credential-name` DEFAULT,
`complicated-credential-name` AS short_name,
`simple-cred`,
cred_no_quotes
)
AS $$
# Python code here
$$;
서비스 자격 증명 권한
UDF 생성자는 Unity Catalog 서비스 자격 증명에 대한 ACCESS 권한을 가져야 합니다. 하지만 UDF 호출자에게는 UDF에 대한 EXECUTE 권한을 부여하는 것으로 충분합니다. 특히, UDF 호출자는 UDF 생성자의 인증 권한을 사용하여 UDF가 실행되므로, 기본 서비스 자격 증명에 대한 접근이 필요하지 않습니다.
임시 함수의 경우, 생성자가 항상 호출자입니다. 전용 클러스터라고도 알려진 No-PE 스코프에서 실행되는 UDF는 호출자의 권한을 대신 사용합니다.
기본 자격 증명 및 별칭
여러 자격 증명을 CREDENTIALS 절에 포함할 수 있지만, DEFAULT로 표시할 수 있는 것은 하나뿐입니다. 기본 설정이 아닌 자격 증명에는 AS 키워드를 사용하여 별칭을 만들 수 있습니다. 각 자격 증명은 고유한 별명을 가져야 합니다.
수정된 클라우드 SDK는 기본 자격 증명을 자동으로 가져옵니다. 기본 자격 증명은 컴퓨트의 Spark 구성에서 지정된 모든 기본값보다 우선하며, Unity Catalog UDF 정의에 지속적으로 유지됩니다.
azure-identity 패키지를 설치하여 DefaultAzureCredential 제공자를 사용하십시오.
ENVIRONMENT 절을 사용하여 외부 라이브러리를 설치하십시오. 외부 라이브러리를 설치하는 방법에 대해 자세히 알아보려면 사용자 정의 종속성을 사용하여 UDF 확장을 참조하십시오.
서비스 자격 증명 예제 - Azure Functions
다음 예제에서는 서비스 자격 증명을 사용하여 Batch Unity 카탈로그 Python UDF에서 Azure Function을 호출합니다.
%sql
CREATE OR REPLACE FUNCTION main.test.call_azure_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
dependencies = '["azure-identity", "requests"]', environment_version = 'None'
)
AS $$
import json
import pandas as pd
import requests
from azure.identity import DefaultAzureCredential
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# DefaultAzureCredential automatically uses the DEFAULT credential
credential = DefaultAzureCredential()
token = credential.get_token("https://management.azure.com/.default")
# Azure Function URL
function_url = "https://your-function-app.azurewebsites.net/api/HashValuesFunction"
# Propagate TaskContext information:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = {
"values": vals.to_list(),
"is_debug": bool(is_debug[0]),
"context": user_ctx
}
headers = {
"Authorization": f"Bearer {token.token}",
"Content-Type": "application/json"
}
response = requests.post(function_url, json=payload, headers=headers)
if response.status_code != 200:
raise Exception(f"Function call failed: {response.text}")
response_data = response.json()
if "errorMessage" in response_data:
raise Exception(str(response_data))
yield pd.Series(response_data["values"])
$$;
UDF가 등록된 후 호출하십시오.
SELECT main.test.call_azure_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
작업 실행 컨텍스트 가져오기
TaskContext PySpark API를 사용하여 사용자 정보, 클러스터 태그, 스파크 작업 ID 등과 같은 컨텍스트 정보를 얻으세요. UDF에서 작업 컨텍스트 가져오기를 참조하십시오.
DETERMINISTIC 함수가 일관된 결과를 생성하는지 설정
동일한 입력에 대해 동일한 출력을 생성하는 경우 함수 정의에 추가 DETERMINISTIC 합니다. 이렇게 하면 쿼리 최적화를 통해 성능을 향상시킬 수 있습니다.
기본적으로 Batch Unity 카탈로그 Python UDF는 명시적으로 선언되지 않는 한 비결정적인 것으로 간주됩니다. 비결정적 함수의 예로는 임의 값 생성, 현재 시간 또는 날짜 액세스 또는 외부 API 호출 등이 있습니다.
참조 CREATE FUNCTION (SQL 및 Python)
제한점
- Python 함수는
NULL값을 독립적으로 처리해야 하며, 모든 유형 매핑은 Azure Databricks SQL 언어 매핑을 따라야 합니다. - Batch Unity Catalog의 Python UDF는 안전하고 격리된 환경에서 실행되며 공유 파일 시스템이나 내부 서비스에 접근할 수 없습니다.
- 단계 내에서 여러 UDF 호출은 직렬화되며 중간 결과는 저장될 수 있으며 디스크로 넘쳐날 수 있습니다.
- 서비스 자격 증명은 Batch Unity 카탈로그 Python UDF 및 스칼라 Python UDF에서만 사용할 수 있습니다. 표준 Unity 카탈로그 Python UDF에서는 지원되지 않습니다.
- 전용 클러스터와 임시 기능의 경우, 함수 호출자는 서비스 자격 증명에 대한
ACCESS권한이 있어야 합니다. 외부 클라우드 서비스에 접근하기 위한 서비스 자격 증명 사용 권한 부여에 대해 알아보기. - 공개 미리 보기 기능을 사용하도록 설정 작업 영역의 미리 보기 페이지에서서버리스 SQL Warehouse의 UDF에 대한 네트워킹을 사용하도록 설정하여 서버리스 SQL 웨어하우스 컴퓨팅의 외부 서비스에 대한 Batch Unity 카탈로그 Python UDF 호출을 수행합니다.
- 서버리스 Notebook 또는 작업 컴퓨팅에서 Batch Unity 카탈로그 Python UDF를 호출하려면 서버리스 출구 제어를 구성해야 합니다.