메모
이 문서에서는 Databricks Runtime 13.3 이상용 Databricks Connect에 대해 설명합니다.
Python용 Databricks Connect는 UDF(사용자 정의 함수)를 지원합니다. UDF를 포함하는 DataFrame 작업이 실행되면 UDF는 Databricks Connect에 의해 직렬화되고 요청의 일부로 서버로 전송됩니다.
Scala용 Databricks Connect의 UDF에 대한 자세한 내용은 Scala용 Databricks Connect의 사용자 정의 함수를 참조하세요.
메모
사용자 정의 함수는 직렬화되고 역직렬화되므로 클라이언트의 Python 버전은 Azure Databricks 컴퓨팅의 Python 버전과 일치해야 합니다. 지원되는 버전은 버전 지원 매트릭스참조하세요.
UDF 정의
Python용 Databricks Connect에서 UDF를 만들려면 지원되는 다음 함수 중 하나를 사용합니다.
- PySpark 사용자 정의 함수
- PySpark 스트리밍 함수
예를 들어 다음 Python은 열의 값을 제곱하는 간단한 UDF를 설정합니다.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
종속성이 있는 UDF
중요합니다
이 기능은 공개 미리 보기 에 있으며 Python 16.4 이상용 Databricks Connect 및 Databricks Runtime 16.4 이상을 실행하는 클러스터가 필요합니다. 이 기능을 사용하려면 작업 영역의 Unity 카탈로그에서 향상된 Python UDF 미리 보기를 사용하도록 설정합니다.
Databricks Connect는 UDF에 필요한 Python 종속성 지정을 지원합니다. 이러한 종속성은 UDF의 Python 환경의 일부로 Databricks 컴퓨팅에 설치됩니다.
이 기능을 사용하면 사용자가 기본 환경에 제공된 패키지 외에도 UDF에 필요한 종속성을 지정할 수 있습니다. 기본 환경에서 제공되는 것과 다른 버전의 패키지를 설치하는 데 사용할 수도 있습니다.
종속성은 다음 원본에서 설치할 수 있습니다.
- PyPI 패키지
- PyPI 패키지는 PEP 508에 따라 지정할 수 있습니다(예
dicepyjokes<1: 또는simplejson==3.19.*.).
- PyPI 패키지는 PEP 508에 따라 지정할 수 있습니다(예
- Unity 카탈로그 볼륨에 저장된 파일
- 휠 패키지(
.whl)와 gzipped tar 파일()이.tar.gz모두 지원됩니다. re:[UC] 볼륨의 파일에 대한 사용 권한을 사용자에게 부여READ_FILE해야 합니다. - Unity 카탈로그 볼륨에서 패키지를 설치할 때 UDF를 호출하려면 원본 볼륨에 대한 권한이 필요합니다
READ VOLUME. 모든 계정 사용자에게 이 권한을 부여하면 새 사용자가 이 권한을 자동으로 사용할 수 있습니다. - Unity 카탈로그 볼륨 파일은
dbfs:<path>처럼 지정해야 하며, 예를 들면dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl또는dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz입니다.
- 휠 패키지(
UDF에 사용자 지정 종속성을 포함하려면 사용자 지정 종속성을 사용하여 withDependencies환경에 지정한 다음, 해당 환경을 사용하여 Spark 세션을 만듭니다. 종속성은 Databricks 컴퓨팅에 설치되며 이 Spark 세션을 사용하는 모든 UDF에서 사용할 수 있습니다.
다음 코드는 PyPI 패키지를 dice 종속성으로 선언합니다.
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
또는 볼륨에서 휠의 종속성을 지정하려면 다음을 수행합니다.
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Databricks 노트북 및 작업에서의 동작 방식
Notebook 및 작업에서 UDF 종속성은 REPL에 직접 설치해야 합니다. Databricks Connect는 지정된 모든 종속성이 이미 설치되어 있는지 확인하고 설치되지 않은 경우 예외를 throw하여 REPL Python 환경의 유효성을 검사합니다.
Notebook 환경 유효성 검사는 PyPI 및 Unity 카탈로그 볼륨 종속성 둘 다에 대해 발생합니다. 볼륨 종속성은 휠 파일의 경우 PEP-427 이상의 표준 Python 패키징 사양에 따라 패키징하고 원본 배포 파일의 경우 PEP-241 이상을 패키지해야 합니다. Python 패키징 표준에 대한 자세한 내용은 PyPA 설명서를 참조하세요.
제한점
- 로컬 개발 컴퓨터의 Python 휠 또는 원본 배포와 같은 파일은 종속성으로 직접 지정할 수 없습니다. 먼저 Unity 카탈로그 볼륨에 업로드해야 합니다.
- UDF 종속성 지원은
pyspark.sql.streaming.DataStreamWriter.foreachPython 18.0 이상 및pyspark.sql.streaming.DataStreamWriter.foreachBatchDatabricks Runtime 18.0 이상을 사용하는 클러스터와 Databricks Connect가 필요합니다. - UDF 종속성은 창 함수를 통해 pandas 집계 UDF에 대해 지원되지 않습니다.
예시
다음 예제에서는 환경에서 PyPI 및 볼륨 종속성을 정의하고, 해당 환경으로 세션을 만든 다음, 해당 종속성을 사용하는 UDF를 정의하고 호출합니다.
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Python 기본 환경
UDF는 클라이언트가 아닌 Databricks 컴퓨팅에서 실행됩니다. UDF가 실행되는 기본 Python 환경은 Databricks 컴퓨팅에 따라 달라집니다.
클러스터의 경우 기본 Python 환경은 클러스터에서 실행되는 Databricks 런타임 버전의 Python 환경입니다. 이 기본 환경의 Python 버전 및 패키지 목록은 Databricks 런타임 릴리스 노트의 시스템 환경 및 설치된 Python 라이브러리 섹션에서 찾을 수 있습니다.
서버리스 컴퓨팅의 경우 기본 Python 환경은 다음 표에 따라 서버리스 환경 버전 에 해당합니다.
| Databricks Connect 버전 | UDF 서버리스 환경 |
|---|---|
| 17.0 ~ 17.3, Python 3.12 | 버전 4 |
| 16.4.1에서 17 미만, Python 3.12 | 버전 3 |
| 15.4.10에서 16 이하로, Python 3.12 | 버전 3 |
| 15.4.10에서 16 미만, Python 3.11 | 버전 2 |
| 15.4.0에서 15.4.9로, 16.0에서 16.3으로 | 최신 서버리스 컴퓨팅. 안정적인 Python 환경을 위해 15.4.10 LTS 이상으로 마이그레이션하거나 16.4.1 LTS 이상으로 마이그레이션하세요. |