중요하다
이 기능은 Databricks Runtime 14.3 LTS 이상에서 공개 미리보기 상태입니다.
UDTF(사용자 정의 테이블 함수)를 사용하면 스칼라 값 대신 테이블을 반환하는 함수를 등록할 수 있습니다. 각 호출에서 단일 결과 값을 반환하는 스칼라 함수와 달리 각 UDTF는 SQL 문의 FROM 절에서 호출되고 전체 테이블을 출력으로 반환합니다.
각 UDTF 호출은 0개 이상의 인수를 수락할 수 있습니다. 이러한 인수는 전체 입력 테이블을 나타내는 스칼라 식 또는 테이블 인수일 수 있습니다.
UDF는 다음 두 가지 방법으로 등록할 수 있습니다.
- Unity 카탈로그: Unity 카탈로그에서 UDTF를 관리되는 개체로 등록합니다. Unity 카탈로그에서 Python UDF(사용자 정의 테이블 함수)를 참조하세요.
- 세션 범위: 현재 Notebook 또는 작업에 격리된 로컬
SparkSession에 등록합니다.
팁 (조언)
Databricks는 사용자와 팀 간에 함수를 보다 쉽게 공유하고 재사용할 수 있도록 중앙 집중식 거버넌스를 활용하기 위해 Unity 카탈로그에 UDF를 등록하는 것이 좋습니다.
기본 UDTF 구문
Apache Spark는 Python UDTF를 Python 클래스 및 필수 eval 메서드로 구현하며, 이 메서드는 yield을 사용하여 출력 행을 내보냅니다.
클래스를 UDTF로 사용하려면 PySpark udtf 함수를 가져와야 합니다. Databricks는 이 함수를 데코레이터로 사용하고 returnType 옵션을 사용하여 필드 이름과 형식을 명시적으로 지정하는 것이 좋습니다(클래스가 이후 섹션에서 설명한 대로 analyze 메서드를 정의하지 않는 한).
다음 UDTF는 두 정수 인수의 고정 목록을 사용하여 테이블을 만듭니다.
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
UDTF를 등록하기
SQL 쿼리 spark.udtf.register()에서 사용할 세션 범위 UDTF를 등록하려면 . SQL 함수 및 Python UDTF 클래스의 이름을 제공합니다.
spark.udtf.register("get_sum_diff", GetSumDiff)
등록된 UDTF 호출
등록되면 %sql 매직 명령 또는 spark.sql() 함수를 사용하여 SQL에서 UDTF를 사용할 수 있습니다.
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);").show()
%sql
SELECT * FROM get_sum_diff(1,2);
세션 범위 UDTF를 Unity 카탈로그로 업그레이드
중요하다
Unity 카탈로그에 Python UDF 등록은 공개 미리 보기로 제공됩니다. Unity 카탈로그 UDF에는 Databricks 런타임 버전 17.1 이상이 필요합니다. 요구 사항을 참조하세요.
세션 범위 UDTF를 Unity 카탈로그로 업그레이드하여 중앙 집중식 거버넌스를 활용하고 사용자 및 팀 간에 함수를 보다 쉽게 공유하고 재사용할 수 있습니다.
세션 범위 UDTF를 Unity 카탈로그로 업그레이드하려면 문과 함께 SQL DDL을 CREATE OR REPLACE FUNCTION 사용합니다. 다음 예제에서는 세션 범위 함수에서 Unity 카탈로그 함수로 UDTF를 변환 GetSumDiff 하는 방법을 보여줍니다.
CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
$$;
SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13 | 7 |
+-----+------+
Unity 카탈로그 UDF에 대한 자세한 내용은 Unity 카탈로그의 Python UDF(사용자 정의 테이블 함수)를 참조하세요.
Apache 화살표 사용
UDTF가 소량의 데이터를 입력으로 수신하지만 큰 테이블을 출력하는 경우 Databricks는 Apache Arrow를 사용하는 것이 좋습니다. UDTF를 선언할 때 useArrow 매개 변수를 지정하여 사용하도록 설정할 수 있습니다.
@udtf(returnType="c1: int, c2: int", useArrow=True)
변수 인수 목록 - *args 및 **kwargs
Python *args 또는 **kwargs 구문을 사용하고 지정되지 않은 수의 입력 값을 처리하는 논리를 구현할 수 있습니다.
다음 예제에서는 인수의 입력 길이 및 형식을 명시적으로 확인하면서 동일한 결과를 반환합니다.
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
다음은 동일한 예제이지만 키워드 인수를 사용하는 예제입니다.
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
등록 시 정적 스키마 정의
UDTF는 순서가 지정된 열 이름 및 형식 시퀀스로 구성된 출력 스키마가 있는 행을 반환합니다. UDTF 스키마가 모든 쿼리에 대해 항상 동일하게 유지되어야 하는 경우 @udtf 데코레이터 뒤의 고정된 고정 스키마를 지정할 수 있습니다.
StructType중 하나여야 합니다.
StructType().add("c1", StringType())
또는 구조체 형식을 나타내는 DDL 문자열입니다.
c1: string
함수 호출 시 동적 스키마 컴퓨팅
UDF는 입력 인수의 값에 따라 각 호출에 대해 프로그래밍 방식으로 출력 스키마를 계산할 수도 있습니다. 이렇게 하려면 특정 UDTF 호출에 제공된 인수에 해당하는 매개 변수를 0개 이상 허용하는 analyze 정적 메서드를 정의합니다.
analyze 메서드의 각 인수는 다음 필드를 포함하는 AnalyzeArgument 클래스의 인스턴스입니다.
AnalyzeArgument 클래스 필드 |
설명 |
|---|---|
dataType |
입력 인수의 형식은 DataType입니다. 입력 테이블 인수의 경우 테이블 StructType 의 열을 나타내는 인수입니다. |
value |
입력 인수의 값은 Optional[Any]입니다. 이는 상수가 아닌 테이블 인수나 리터럴 스칼라 인수의 경우에 None을 의미합니다. |
isTable |
입력 인수가 BooleanType테이블 중 하나인지 확인합니다. |
isConstantExpression |
입력 인수가 BooleanType와 같이 상수로 접힐 수 있는 표현식인지 여부입니다. |
메서드는 analyze 결과 테이블의 AnalyzeResult 스키마와 일부 선택적 필드가 포함된 클래스의 인스턴스를 StructType 반환합니다. UDTF가 입력 테이블 인수를 수락하는 경우 AnalyzeResult 나중에 설명한 대로 여러 UDTF 호출에서 입력 테이블의 행을 분할하고 정렬하는 요청된 방법을 포함할 수도 있습니다.
AnalyzeResult 클래스 필드 |
설명 |
|---|---|
schema |
StructType결과 테이블의 스키마입니다. |
withSinglePartition |
입력된 모든 행을 같은 BooleanTypeUDTF 클래스 인스턴스로 보낼지 여부입니다. |
partitionBy |
비어있지 않은 것으로 설정하면 분할 식의 각 고유한 값 조합이 있는 모든 행이 UDTF 클래스의 별도 인스턴스에서 사용됩니다. |
orderBy |
비어있지 않은 행으로 설정하면 각 파티션 내의 행 순서가 지정됩니다. |
select |
비어 있지 않은 값으로 설정된 경우, 이는 UDTF가 지정한 식의 연속으로, Catalyst가 입력 TABLE 인수의 열에 대해 평가하도록 하는 것입니다. UDTF는 목록의 각 이름에 대해 나열된 순서대로 하나의 입력 특성을 받습니다. |
이 analyze 예제에서는 입력 문자열 인수의 각 단어에 대해 하나의 출력 열을 반환합니다.
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
MyUDTF(lit("hello world")).columns
['word_0', 'word_1']
상태를 향후 eval 호출로 전달
analyze 메서드는 초기화를 수행한 다음 결과를 동일한 UDTF 호출에 대한 향후 eval 메서드 호출로 전달하는 편리한 위치로 사용될 수 있습니다.
이렇게 하려면 AnalyzeResult 서브클래스를 만들고 analyze 메서드에서 서브클래스의 인스턴스를 반환합니다.
그런 다음 __init__ 메서드에 인수를 추가하여 해당 인스턴스를 수락합니다.
이 analyze 예제에서는 상수 출력 스키마를 반환하지만 이후 __init__ 메서드 호출에서 사용할 결과 메타데이터에 사용자 지정 정보를 추가합니다.
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
출력 행 생성
eval 메서드는 입력 테이블 인수의 각 행에 대해 한 번 실행되거나 테이블 인수가 제공되지 않은 경우 한 번만 실행한 다음 끝에 terminate 메서드를 한 번 호출합니다. 두 메서드는 튜플, 목록 또는 pyspark.sql.Row 개체를 생성하여 결과 스키마를 준수하는 0개 이상의 행을 출력합니다.
이 예제에서는 다음 세 가지 요소의 튜플을 제공하여 행을 반환합니다.
def eval(self, x, y, z):
yield (x, y, z)
괄호를 생략할 수도 있습니다.
def eval(self, x, y, z):
yield x, y, z
열이 하나만 있는 행을 반환하려면 후행 쉼표를 추가하세요.
def eval(self, x, y, z):
yield x,
pyspark.sql.Row 개체를 생성할 수도 있습니다.
def eval(self, x, y, z):
from pyspark.sql.types import Row
yield Row(x, y, z)
이 예제에서는 Python 목록을 사용하여 terminate 메서드에서 출력 행을 생성합니다. 이 목적을 위해 UDTF 평가의 이전 단계에서 클래스 내에 상태를 저장할 수 있습니다.
def terminate(self):
yield [self.x, self.y, self.z]
UDTF에 스칼라 인수 전달
스칼라 인수를 UDTF에 리터럴 값 또는 함수를 기반으로 하는 상수 식으로 전달할 수 있습니다. 예를 들어:
SELECT * FROM get_sum_diff(1, y => 2)
UDTF에 테이블 인수 전달
Python UDF는 스칼라 입력 인수 외에도 입력 테이블을 인수로 수락할 수 있습니다. 단일 UDTF는 테이블 인수와 여러 스칼라 인수를 수락할 수도 있습니다.
그런 다음 모든 SQL 쿼리는 TABLE 키워드를 사용한 다음 TABLE(t)같은 적절한 테이블 식별자를 둘러싼 괄호를 사용하여 입력 테이블을 제공할 수 있습니다. 또는 TABLE(SELECT a, b, c FROM t) 또는 TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))같은 테이블 하위 쿼리를 전달할 수 있습니다.
그런 다음 입력 테이블 인수는 입력 테이블의 각 행에 대해 pyspark.sql.Row 메서드를 한 번 호출하여 eval 메서드에 대한 eval 인수로 표시됩니다. 표준 PySpark 열 필드 주석을 사용하여 각 행의 열과 상호 작용할 수 있습니다. 다음 예제에서는 PySpark Row 형식을 명시적으로 가져온 다음 id 필드에서 전달된 테이블을 필터링하는 방법을 보여 줍니다.
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
함수를 쿼리하려면 TABLE SQL 키워드를 사용합니다.
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
함수 호출에서 입력 행 분할 지정
테이블 인수를 사용하여 UDTF를 호출할 때 SQL 쿼리는 하나 이상의 입력 테이블 열 값에 따라 여러 UDTF 호출에서 입력 테이블을 분할할 수 있습니다.
파티션을 지정하려면 PARTITION BY 인수 뒤의 함수 호출에서 TABLE 절을 사용합니다.
이렇게 하면 분할 열의 각 고유한 값 조합이 있는 모든 입력 행이 정확히 하나의 UDTF 클래스 인스턴스에서 사용됩니다.
간단한 열 참조 외에도 PARTITION BY 절은 입력 테이블 열에 따라 임의의 식을 허용합니다. 예를 들어 문자열의 LENGTH 지정하거나, 날짜에서 한 달을 추출하거나, 두 값을 연결할 수 있습니다.
WITH SINGLE PARTITION 대신 PARTITION BY 지정하여 모든 입력 행을 정확히 하나의 UDTF 클래스 인스턴스에서 사용해야 하는 파티션 하나만 요청할 수도 있습니다.
각 파티션 내에서 UDTF의 eval 메서드가 입력 행을 처리하는 과정에서, 입력 행에 대해 필요한 순서를 선택적으로 지정할 수 있습니다. 이렇게 하려면 위에서 설명한 ORDER BY 또는 PARTITION BY 절 다음에 WITH SINGLE PARTITION 절을 제공합니다.
예를 들어 다음 UDTF를 고려합니다.
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
입력 테이블을 통해 UDTF를 호출할 때 다음과 같은 여러 가지 방법으로 분할 옵션을 지정할 수 있습니다.
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8);
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
analyze 메서드에서 입력 행 분할 지정
SQL 쿼리에서 UDTF를 호출할 때 입력 테이블을 분할하는 위의 각 방법에 대해 UDTF의 analyze 메서드가 동일한 분할 방법을 자동으로 지정하는 해당 방법이 있습니다.
- UDTF를
SELECT * FROM udtf(TABLE(t) PARTITION BY a)호출하는 대신analyze메서드를 업데이트하여 필드partitionBy=[PartitioningColumn("a")]설정하고SELECT * FROM udtf(TABLE(t))사용하여 함수를 호출할 수 있습니다. - 마찬가지로 SQL 쿼리에서
TABLE(t) WITH SINGLE PARTITION ORDER BY b을(를) 지정하는 대신,analyze을(를) 설정하여 필드withSinglePartition=true및orderBy=[OrderingColumn("b")]을(를) 만들고, 그런 다음TABLE(t)을(를) 전달할 수 있습니다. - SQL 쿼리에서
TABLE(SELECT a FROM t)전달하는 대신analyzeselect=[SelectedColumn("a")]설정한 다음TABLE(t)전달할 수 있습니다.
다음 예제에서 analyze 상수 출력 스키마를 반환하고, 입력 테이블에서 열의 하위 집합을 선택하고, 입력 테이블이 date 열의 값에 따라 여러 UDTF 호출에서 분할되도록 지정합니다.
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add("longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word)",
alias="length_word")])