Jegyzet
Az oldalhoz való hozzáférés engedélyezést igényel. Próbálhatod be jelentkezni vagy könyvtárat váltani.
Az oldalhoz való hozzáférés engedélyezést igényel. Megpróbálhatod a könyvtár váltását.
Fontos
Ez a funkció nyilvános előzetes verzióban van elérhető a Databricks Runtime 14.3 LTS és újabb verziókban.
A felhasználó által definiált táblafüggvény (UDTF) lehetővé teszi olyan függvények regisztrálását, amelyek skaláris értékek helyett táblákat ad vissza. Az egyes hívásokból egyetlen eredményértéket visszaadó skaláris függvényekkel ellentétben a rendszer minden UDTF-et meghív egy SQL-utasítás záradékában FROM , és kimenetként egy teljes táblát ad vissza.
Minden UDTF-hívás elfogadhat nulla vagy több argumentumot. Ezek az argumentumok lehetnek skaláris kifejezések vagy táblaargumentumok, amelyek teljes beviteli táblákat jelölnek.
Az UDF-ek kétféleképpen regisztrálhatók:
- Unity Catalog: Regisztrálja az UDTF-t szabályozott objektumként a Unity Katalógusban. Lásd a Python felhasználó által definiált táblafüggvényeit (UDTF-eket) a Unity Catalogban.
- Munkamenet-hatókörű: Regisztráljon a helyi, az aktuális jegyzetfüzetre vagy feladatra elkülönített helyi
SparkSessionpéldányra.
Jótanács
A Databricks azt javasolja, hogy regisztrálja az UDTF-eket a Unity Katalógusban a központosított irányítás előnyeinek kihasználásához, amely megkönnyíti a funkciók biztonságos megosztását és újrafelhasználását a felhasználók és csapatok között.
Alapszintű UDTF-szintaxis
Az Apache Spark Python-osztályokként implementálja a Python UDTF-eket egy kötelező eval metódussal, amely yield használ kimeneti sorok kibocsátására.
Az osztály UDTF-ként való használatához importálnia kell a PySpark udtf függvényt. A Databricks azt javasolja, hogy ezt a függvényt dekorátorként használja, és explicit módon adja meg a mezőneveket és -típusokat a returnType beállítással (kivéve, ha az osztály egy analyze metódust határoz meg egy későbbi szakaszban leírtak szerint).
A következő UDTF két egész argumentum rögzített listájával hoz létre egy táblát:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
UDTF regisztrálása
Ha egy munkamenet-hatókörű UDTF-t szeretne regisztrálni az SQL-lekérdezésekben való használatra, használja a következőt spark.udtf.register(): . Adjon nevet az SQL-függvénynek és a Python UDTF-osztálynak.
spark.udtf.register("get_sum_diff", GetSumDiff)
A regisztrált UDTF meghívása
Regisztráció után az UDTF az SQL-ben vagy a %sql varázsparanccsal, vagy a spark.sql() függvénnyel használható.
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);").show()
%sql
SELECT * FROM get_sum_diff(1,2);
Munkamenet-hatókörű UDTF frissítése Unity-katalógusra
Fontos
A Python UDTF-ek regisztrálása a Unity Katalógusban nyilvános előzetes verzióban érhető el. A Unity Catalog UDTF-jeihez a Databricks Runtime 17.1-es vagy újabb verziója szükséges. Lásd: Követelmények.
A munkamenet-hatókörű UDTF-et unitykatalógusra frissítheti, hogy kihasználhassa a központosított irányítás előnyeit, és megkönnyítse a funkciók biztonságos megosztását és újrafelhasználását a felhasználók és csapatok között.
A munkamenet-hatókörű UDTF Unity Catalogra való frissítéséhez használja az SQL DDL-t az CREATE OR REPLACE FUNCTION utasítással. Az alábbi példa bemutatja, hogyan konvertálhatja az GetSumDiff UDTF-t munkamenet-hatókörű függvényből Unity Catalog-függvénysé:
CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
$$;
SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13 | 7 |
+-----+------+
További információ a Unity Catalog UDTF-jeiről: Python felhasználó által definiált táblafüggvények (UDTF-ek) a Unity Catalogban.
Az Apache Arrow használata
Ha az UDTF kis mennyiségű adatot fogad bemenetként, de nagy táblát ad ki, a Databricks az Apache Arrow használatát javasolja. Az UDTF deklarálásakor megadhatja a useArrow paramétert:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Változó argumentumlistái – *args és **kwargs
A Python *args vagy **kwargs szintaxisával és logikájának implementálásával meghatározatlan számú bemeneti értéket kezelhet.
Az alábbi példa ugyanazt az eredményt adja vissza, miközben explicit módon ellenőrzi az argumentumok bemeneti hosszát és típusait:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Ugyanez a példa, de kulcsszóargumentumok használatával:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Statikus séma definiálása a regisztrációkor
Az UDTF olyan sorokat ad vissza, amelyek kimeneti sémája oszlopnevek és -típusok rendezett sorozatát tartalmazza. Ha az UDTF-séma mindig ugyanaz marad az összes lekérdezés esetében, a @udtf dekorátor után statikus, rögzített sémát is megadhat. Vagy StructType-nak kell lennie:
StructType().add("c1", StringType())
Vagy egy strukturális típust képviselő DDL-sztring:
c1: string
Dinamikus séma kiszámítása függvényhívási időpontban
Az UDTF-ek programozott módon is kiszámíthatják a kimeneti sémát az egyes hívásokhoz a bemeneti argumentumok értékeitől függően. Ehhez definiáljon egy analyze nevű statikus metódust, amely az adott UDTF-híváshoz megadott argumentumoknak megfelelő nulla vagy több paramétert fogad el.
A analyze metódus minden argumentuma a AnalyzeArgument osztály egyik példánya, amely a következő mezőket tartalmazza:
AnalyzeArgument osztálymező |
Leírás |
|---|---|
dataType |
A bemeneti argumentum típusa DataType. A bemeneti tábla argumentumainál ez StructType a tábla oszlopait jelöli. |
value |
A bemeneti argumentum értéke Optional[Any]. Ez None olyan táblaargumentumokhoz vagy konstans skaláris argumentumokhoz, amelyek nem állandók. |
isTable |
Azt jelzi, hogy a bemeneti argumentum tábla-e BooleanType. |
isConstantExpression |
Azt jelzi, hogy a bemeneti argumentum egy konstans-összecsukható kifejezés, mint például BooleanType. |
A analyze metódus az AnalyzeResult osztály egy példányát adja vissza, amely tartalmazza az eredménytábla sémáját StructType-ként, valamint néhány választható mezőt. Ha az UDTF elfogad egy bemeneti táblaargumentumot, akkor a AnalyzeResult tartalmazhatja a bemeneti tábla sorainak particionálásának és sorrendbe állításának egy kérelmezett módját több UDTF hívás során, ahogyan az később ismertetésre kerül.
AnalyzeResult osztálymező |
Leírás |
|---|---|
schema |
Az eredménytábla sémája StructType. |
withSinglePartition |
Az összes bemeneti sor küldése ugyanarra az UDTF-osztálypéldányra, mint egy BooleanType. |
partitionBy |
Ha nem üresre van állítva, a particionálási kifejezések minden egyedi értékkombinációját tartalmazó sorokat az UDTF osztály egy külön példánya használja fel. |
orderBy |
Ha nem üresre van állítva, ez az egyes partíciók sorainak sorrendjét adja meg. |
select |
Ha nem üres értékre van állítva, ez a kifejezéssorozat, amelyet az UDTF ad meg a Katalizátor számára, hogy kiértékelje a bemeneti TABLE argumentum oszlopait. Az UDTF egy bemeneti attribútumot kap a listában szereplő nevekhez a listában szereplő sorrendben. |
Ez a analyze példa egy kimeneti oszlopot ad vissza a bemeneti sztring argumentum minden egyes szava számára.
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
MyUDTF(lit("hello world")).columns
['word_0', 'word_1']
A jövőbeli eval hívások állapotának továbbítása
A analyze metódus kényelmes hely lehet az inicializálás végrehajtásához, majd továbbíthatja az eredményeket ugyanazon UDTF-hívás jövőbeli eval metódushívásaihoz.
Ehhez hozzon létre egy AnalyzeResult alosztályt, és adja vissza az alosztály egy példányát a analyze metódusból.
Ezután adjon hozzá egy további argumentumot a __init__ metódushoz a példány elfogadásához.
Ez a analyze példa egy állandó kimeneti sémát ad vissza, de egyéni adatokat ad hozzá az eredmény metaadataihoz, amelyeket a jövőbeli __init__ metódushívások használnak fel:
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Hozamkimeneti sorok
A eval metódus a bemeneti tábla argumentumának minden sorához egyszer fut (vagy csak egyszer, ha nincs megadva táblaargumentum), amelyet a terminate metódus egy meghívása követ a végén. Bármelyik metódus nulla vagy több olyan sort ad ki, amely megfelel az eredménysémának, a függvények, listák vagy pyspark.sql.Row objektumok használatával.
Ez a példa egy sort ad vissza három elemből álló tuppel megadásával:
def eval(self, x, y, z):
yield (x, y, z)
Kihagyhatja a zárójeleket is:
def eval(self, x, y, z):
yield x, y, z
Adjon hozzá egy záró vesszőt egy olyan sor visszaadásához, amely csak egy oszlopot tartalmaz:
def eval(self, x, y, z):
yield x,
Egy pyspark.sql.Row objektumot is megadhat.
def eval(self, x, y, z):
from pyspark.sql.types import Row
yield Row(x, y, z)
Ez a példa a terminate metódus kimeneti sorait egy Python-lista használatával adja eredményként. Az állapotot az osztályon belül tárolhatja az UDTF-értékelés korábbi lépéseiből erre a célra.
def terminate(self):
yield [self.x, self.y, self.z]
Skaláris argumentumok átadása UDTF-nek
Skaláris argumentumokat adhat az UDTF-nek konstans kifejezésekként, amelyek literális értékekből vagy azokra épülő függvényekből állnak. Például:
SELECT * FROM get_sum_diff(1, y => 2)
Táblaargumentumok átadása UDTF-nek
A Python UDTF-jei a skaláris bemeneti argumentumok mellett argumentumként is elfogadhatnak egy bemeneti táblát. Egyetlen UDTF egy táblaargumentumot és több skaláris argumentumot is elfogadhat.
Ezután bármely SQL-lekérdezés megadhat egy bemeneti táblát a TABLE kulcsszóval, majd zárójelek követik a megfelelő táblaazonosítót, például TABLE(t). Alternatív megoldásként átadhat egy táblázat lekérdezést, például TABLE(SELECT a, b, c FROM t) vagy TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).
A bemeneti tábla argumentuma ezután pyspark.sql.Row argumentumként jelenik meg a eval metódushoz, a bemeneti tábla minden sorához egy hívással a eval metódushoz. Standard PySpark-oszlopmező-széljegyzetekkel kezelheti az egyes sorok oszlopait. Az alábbi példa a PySpark Row típus explicit importálását, majd az átadott tábla szűrését mutatja be a id mezőben:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
A függvény lekérdezéséhez használja a TABLE SQL-kulcsszót:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
A bemeneti sorok particionálásának megadása függvényhívásokból
Ha egy UDTF-t táblaargumentummal hív meg, bármely SQL-lekérdezés particionálhatja a bemeneti táblát több UDTF-hívás között egy vagy több bemeneti táblaoszlop értékei alapján.
Partíció megadásához használja a PARTITION BY záradékot a függvényhívásban a TABLE argumentum után.
Ez garantálja, hogy a particionálási oszlopok értékeinek egyedi kombinációjával rendelkező összes bemeneti sort az UDTF-osztály pontosan egy példánya használja fel.
Vegye figyelembe, hogy az egyszerű oszlophivatkozások mellett a PARTITION BY záradék tetszőleges kifejezéseket is elfogad a bemeneti tábla oszlopai alapján. Megadhatja például egy karakterlánc LENGTH, kinyerhet egy hónapot egy dátumból, vagy összefűzhet két értéket.
A WITH SINGLE PARTITION helyett PARTITION BY is megadható, ha csak egy partíciót kér le, amelyben az összes bemeneti sort pontosan az UDTF osztály egy példányának kell felhasználnia.
Az egyes partíciókon belül megadhatja a bemeneti sorok kötelező sorrendjét, mivel az UDTF metódusa felhasználja eval őket. Ehhez adjon meg egy ORDER BY záradékot a fent leírt PARTITION BY vagy WITH SINGLE PARTITION záradék után.
Vegyük például a következő UDTF-t:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
A particionálási beállításokat több módon is megadhatja, amikor meghívja az UDTF-t a bemeneti táblán keresztül:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8);
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Adja meg a bemeneti sorok particionálását a analyze metódusból
Vegye figyelembe, hogy az SQL-lekérdezések UDTF-jeinek meghívásakor a bemeneti tábla particionálásának fenti módjai mindegyikénél létezik egy megfelelő módszer arra, hogy az UDTF metódusa analyze automatikusan ugyanazt a particionálási módszert adja meg.
- Az UDTF
SELECT * FROM udtf(TABLE(t) PARTITION BY a)hívása helyett frissítheti aanalyzemetódust a mezőpartitionBy=[PartitioningColumn("a")]beállításához, és egyszerűen meghívhatja a függvényt aSELECT * FROM udtf(TABLE(t))használatával. - Ezzel az elvvel az SQL-lekérdezésben
TABLE(t) WITH SINGLE PARTITION ORDER BY bmegadása helyettanalyzebeállíthatja a mezőketwithSinglePartition=trueésorderBy=[OrderingColumn("b")], majd csak átadjaTABLE(t). - Ahelyett, hogy
TABLE(SELECT a FROM t)-t adna át az SQL-lekérdezésben, beállíthatja a(z)analyze-et úgy, hogy azselect=[SelectedColumn("a")]legyen, majd egyszerűen átadhatja a(z)TABLE(t)-at.
A következő példában analyze egy állandó kimeneti sémát ad vissza, kiválasztja a bemeneti tábla oszlopainak egy részét, és megadja, hogy a bemeneti tábla több UDTF-hívásban particionálva legyen a date oszlop értékei alapján:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add("longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word)",
alias="length_word")])