Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Önemli
Bu özellik Databricks Runtime 14.3 LTS ve üzeri Genel Önizleme'ndedir.
Kullanıcı tanımlı tablo işlevi (UDTF), skaler değerler yerine tablo döndüren işlevleri kaydetmenize olanak tanır. Her çağrıdan tek bir sonuç değeri döndüren skaler işlevlerin aksine, her UDTF bir SQL deyiminin FROM yan tümcesinde çağrılır ve bir tablonun tamamını çıkış olarak döndürür.
Her bir UDTF çağrısı sıfır veya daha fazla bağımsız değişken kabul edebilir. Bu bağımsız değişkenler, giriş tablolarının tamamını temsil eden skaler ifadeler veya tablo bağımsız değişkenleri olabilir.
UDF'ler iki şekilde kaydedilebilir:
- Unity Kataloğu: Unity Kataloğu'nda UDTF'yi yönetilen nesne olarak kaydedin. Bkz. Unity Kataloğu'nda Python kullanıcı tanımlı tablo işlevleri (UDF' ler).
- Oturum kapsamlı: Geçerli not defterine veya işe yalıtılmış yerel
SparkSessionöğesine kaydedin.
Tip
Databricks, işlevleri kullanıcılar ve ekipler arasında güvenli bir şekilde paylaşmayı ve yeniden kullanmayı kolaylaştıran merkezi idareden yararlanmak için UDF'lerin Unity Kataloğu'na kaydedilmesini önerir.
Temel UDTF söz dizimi
Apache Spark, çıktı satırları yaymak için eval kullanan zorunlu bir yield yöntemiyle Python UDF'lerini Python sınıfları olarak uygular.
Sınıfınızı UDTF olarak kullanmak için PySpark udtf işlevini içeri aktarmanız gerekir. Databricks, bu işlevin dekoratör olarak kullanılmasını ve returnType seçeneğini kullanarak alan adlarını ve türlerini açıkça belirtmeyi önerir (sınıfı sonraki bir bölümde açıklandığı gibi bir analyze yöntemi tanımlamadığı sürece).
Aşağıdaki UDTF, iki tamsayı bağımsız değişkeninin sabit listesini kullanarak bir tablo oluşturur:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
UDTF kaydet
SQL sorgularında kullanılmak üzere oturum kapsamlı bir UDTF kaydetmek için kullanın spark.udtf.register(). SQL işlevi ve Python UDTF sınıfı için bir ad sağlayın.
spark.udtf.register("get_sum_diff", GetSumDiff)
Kayıtlı bir UDTF'yi çağır.
Kaydoldıktan sonra, %sql magic komutunu veya spark.sql() işlevini kullanarak SQL'de UDTF'yi kullanabilirsiniz:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);").show()
%sql
SELECT * FROM get_sum_diff(1,2);
Oturum kapsamlı bir UDTF'yi Unity Kataloğu'na yükseltme
Önemli
Unity Kataloğu'nda Python UDF'lerinin kaydedilmesi Genel Önizleme aşamasındadır. Unity Kataloğu UDF'leri Databricks Runtime sürüm 17.1 ve üzerini gerektirir. Bkz . Gereksinimler.
Merkezi idareden yararlanmak ve kullanıcılar ve ekipler arasında işlevleri güvenli bir şekilde paylaşmayı ve yeniden kullanmayı kolaylaştırmak için oturum kapsamlı bir UDTF'yi Unity Kataloğu'na yükseltebilirsiniz.
Oturum kapsamlı bir UDTF'yi Unity Kataloğu'na yükseltmek için deyimiyle BIRLIKTE CREATE OR REPLACE FUNCTION SQL DDL kullanın. Aşağıdaki örnekte UDTF'nin GetSumDiff oturum kapsamlı bir işlevden Unity Kataloğu işlevine nasıl dönüştürüldüğü gösterilmektedir:
CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
$$;
SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13 | 7 |
+-----+------+
Unity Kataloğu UDF'leri hakkında daha fazla bilgi için bkz. Unity Kataloğu'nda Python kullanıcı tanımlı tablo işlevleri (UDF'ler).
Apache Ok kullanma
UDTF'niz giriş olarak az miktarda veri alıyor ancak büyük bir tablo çıktısı alıyorsa Databricks Apache Arrow kullanmanızı önerir. UDTF'yi bildirirken useArrow parametresini belirterek etkinleştirebilirsiniz:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Değişken argüman listeleri - *args ve **kwargs
Python *args veya **kwargs söz dizimini kullanabilir ve belirtilmeyen sayıda giriş değerini işlemek için mantık uygulayabilirsiniz.
Aşağıdaki örnek, bağımsız değişkenler için giriş uzunluğunu ve türlerini açıkça denetlerken aynı sonucu döndürür:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
İşte aynı örnek, ancak anahtar kelime argümanları kullanılarak:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Kayıt zamanında statik şema tanımlama
UDTF, sıralı sütun adları ve türleri dizisini içeren bir çıkış şemasına sahip satırları döndürür. UDTF şemasının tüm sorgular için her zaman aynı kalması gerekiyorsa, @udtf dekoratörden sonra statik, sabit bir şema belirtebilirsiniz. Ya StructTypeolmalıdır:
StructType().add("c1", StringType())
Veya yapı türünü temsil eden bir DDL dizesi:
c1: string
İşlev çağrısı zamanında dinamik şema hesaplama
UDF'ler, giriş bağımsız değişkenlerinin değerlerine bağlı olarak her çağrı için program aracılığıyla çıkış şemasını da hesaplayabilir. Bunu yapmak için, belirli bir UDTF çağrısına sağlanan bağımsız değişkenlere karşılık gelen sıfır veya daha fazla parametre kabul eden analyze adlı statik bir yöntem tanımlayın.
analyze yönteminin her bağımsız değişkeni, aşağıdaki alanları içeren AnalyzeArgument sınıfının bir örneğidir:
AnalyzeArgument sınıf alanı |
Açıklama |
|---|---|
dataType |
Giriş bağımsız değişkeninin türü DataTypeolarak belirtilmiştir. Giriş tablosu bağımsız değişkenleri için bu, tablonun sütunlarını temsil eden bir StructType tablodur. |
value |
Giriş bağımsız değişkeninin Optional[Any]olarak değeri. Bu, sabit olmayan tablo bağımsız değişkenleri veya literal skaler bağımsız değişkenler için None. |
isTable |
Giriş bağımsız değişkeninin BooleanTypeolarak bir tablo olup olmadığını kontrol edin. |
isConstantExpression |
Giriş bağımsız değişkeninin BooleanTypeolarak sabit katlanabilir bir ifade olup olmadığı. |
analyze yöntemi, sonuç tablosunun şemasını AnalyzeResult ve isteğe bağlı bazı alanları içeren sınıfının bir StructType örneğini döndürür. UDTF bir giriş tablosu bağımsız değişkenini kabul ederse, AnalyzeResult daha sonra açıklandığı gibi birkaç UDTF çağrısında giriş tablosunun satırlarını bölümleyip sıralamak için istenen bir yol da içerebilir.
AnalyzeResult sınıf alanı |
Açıklama |
|---|---|
schema |
sonuç tablosunun StructTypeolarak şeması. |
withSinglePartition |
Tüm giriş satırlarının BooleanTypeolarak aynı UDTF sınıf örneğine gönderilip gönderilmeymeyeceği. |
partitionBy |
Boş olmayan olarak ayarlanırsa, bölümleme ifadelerinin her benzersiz değer bileşimine sahip tüm satırlar UDTF sınıfının ayrı bir örneği tarafından kullanılır. |
orderBy |
Boş olmayan olarak ayarlanırsa, her bölüm içindeki satırların sırasını belirtir. |
select |
Boş olmayan olarak ayarlanırsa, bu, UDTF'nin Catalyst'in giriş TABLE bağımsız değişkenindeki sütunlara göre değerlendirmesi için belirttiği bir ifade dizisidir. UDTF listedeki her ad için listelendikleri sırayla bir giriş özniteliği alır. |
Bu analyze örnek, girdi dizesi parametresindeki her sözcük için bir çıkış sütunu döndürür.
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
MyUDTF(lit("hello world")).columns
['word_0', 'word_1']
Durumu gelecekteki eval çağrılarına iletme
analyze yöntemi, başlatma gerçekleştirmek ve ardından sonuçları aynı UDTF çağrısı için gelecekteki eval yöntem çağrılarına iletmek için uygun bir yer olarak işlev görebilir.
Bunu yapmak için AnalyzeResult bir alt sınıfı oluşturun ve analyze yönteminden alt sınıfın bir örneğini döndürin.
Ardından, bu örneği kabul etmek için __init__ yöntemine ek bir bağımsız değişken ekleyin.
Bu analyze örnek sabit bir çıkış şeması döndürür, ancak gelecekteki __init__ yöntem çağrıları tarafından kullanılacak sonuç meta verilerine özel bilgiler ekler:
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Çıktı satırlarını oluştur
eval yöntemi, giriş tablosu bağımsız değişkeninin her satırı için bir kez (veya tablo bağımsız değişkeni sağlanmadığında yalnızca bir kez) ve ardından sonunda bir terminate yöntemi çağrılır. Her iki yöntem de sonuç şemasına uyan sıfır veya daha fazla satır elde etmek için demetler, listeler veya pyspark.sql.Row nesneleri üretir.
Bu örnek, üç öğeden oluşan bir demet sağlayarak bir satır döndürür.
def eval(self, x, y, z):
yield (x, y, z)
Ayraçları da atlayabilirsiniz:
def eval(self, x, y, z):
yield x, y, z
Yalnızca bir sütun içeren bir satır döndürmek için sondaki virgülleri ekleyin:
def eval(self, x, y, z):
yield x,
Ayrıca bir pyspark.sql.Row nesnesi de sağlayabilirsiniz.
def eval(self, x, y, z):
from pyspark.sql.types import Row
yield Row(x, y, z)
Bu örnek, Python listesi kullanarak terminate yönteminden çıkış satırları verir. Bu amaçla UDTF değerlendirmesindeki önceki adımlardan sınıfın içinde durum depolayabilirsiniz.
def terminate(self):
yield [self.x, self.y, self.z]
UDTF'ye skaler argümanları geçirme
Skaler argümanları, sabit değerler veya bunlara dayalı işlevlerden oluşan sabit ifadeler olarak bir UDTF'ye geçirebilirsiniz. Mesela:
SELECT * FROM get_sum_diff(1, y => 2)
Tablo argümanlarını bir UDTF'ye aktarma
Python UDF'leri, skaler giriş bağımsız değişkenlerine ek olarak bir giriş tablosunu bağımsız değişken olarak kabul edebilir. Tek bir UDTF, bir tablo bağımsız değişkeni ve birden çok skaler bağımsız değişkeni kabul edebilir.
Daha sonra herhangi bir SQL sorgusu, TABLE anahtar sözcüğünü ve ardından TABLE(t)gibi uygun bir tablo tanımlayıcısını çevreleyen parantezleri kullanarak bir giriş tablosu sağlayabilir. Alternatif olarak, TABLE(SELECT a, b, c FROM t) veya TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))gibi bir tablo alt sorgusu geçirebilirsiniz.
Giriş tablosu bağımsız değişkeni daha sonra pyspark.sql.Row yöntemine bir eval bağımsız değişkeni olarak temsil edilir ve giriş tablosundaki her satır için bir eval yöntemi çağrısı yapılır. Her satırdaki sütunlarla etkileşime geçmek için standart PySpark sütun alanı ek açıklamalarını kullanabilirsiniz. Aşağıdaki örnek, PySpark Row türünü açıkça içeri aktarmayı ve id alanında geçirilen tabloyu filtrelemeyi gösterir:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
İşlevi sorgulamak için TABLE SQL anahtar sözcüğünü kullanın:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
İşlev çağrılarından giriş satırlarının bölümlesini belirtin
Tablo bağımsız değişkeniyle bir UDTF çağırırken, herhangi bir SQL sorgusu giriş tablosunu bir veya daha fazla giriş tablosu sütununun değerlerine göre birkaç UDTF çağrısı arasında bölümleyebilir.
Bir bölüm belirtmek için, PARTITION BY bağımsız değişkeninden sonraki işlev çağrısında TABLE tümcesini kullanın.
Bu, bölümleme sütunlarının her benzersiz değer bileşimine sahip tüm giriş satırlarının UDTF sınıfının tam olarak bir örneği tarafından tüketileceğini garanti eder.
Basit sütun başvurularına ek olarak, PARTITION BY yan tümcesinin giriş tablosu sütunlarına göre rastgele ifadeleri de kabul ettiğini unutmayın. Örneğin, bir dizenin LENGTH belirtebilir, bir tarihten bir ay ayıklayabilir veya iki değeri birleştirebilirsiniz.
Tüm giriş satırlarının UDTF sınıfının tam olarak bir örneği tarafından tüketilmesi gereken tek bir bölüm istemek için WITH SINGLE PARTITION yerine PARTITION BY belirtmek de mümkündür.
Her bölümde, UDTF'nin eval yöntemi bunları tükettiğinden isteğe bağlı olarak giriş satırlarının gerekli sıralamasını belirtebilirsiniz. Bunu yapmak için, yukarıda açıklanan ORDER BY veya PARTITION BY yan tümcesinin ardından bir WITH SINGLE PARTITION yan tümcesi sağlayın.
Örneğin, aşağıdaki UDTF'yi göz önünde bulundurun:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
Giriş tablosu üzerinden UDTF'yi çağırırken bölümleme seçeneklerini birden çok şekilde belirtebilirsiniz:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8);
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
analyze yönteminden giriş satırlarının bölümlesini belirtin
SQL sorgularında UDF'leri çağırırken giriş tablosunu bölümlemenin yukarıdaki yollarından her biri için, UDTF'nin analyze yönteminin aynı bölümleme yöntemini otomatik olarak belirtmesinin karşılık gelen bir yolu olduğunu unutmayın.
- UDTF'yi
SELECT * FROM udtf(TABLE(t) PARTITION BY a)olarak çağırmak yerineanalyzeyöntemini güncelleştirerek alanpartitionBy=[PartitioningColumn("a")]ayarlayabilir veSELECT * FROM udtf(TABLE(t))kullanarak işlevi çağırabilirsiniz. - Aynı belirteçle, SQL sorgusunda
TABLE(t) WITH SINGLE PARTITION ORDER BY bbelirtmek yerine,analyzealanlarıwithSinglePartition=trueveorderBy=[OrderingColumn("b")]ayarlayabilir ve sonraTABLE(t)geçirebilirsiniz. - SQL sorgusunda
TABLE(SELECT a FROM t)geçirmek yerineanalyzeselect=[SelectedColumn("a")]ayarlayabilir ve ardındanTABLE(t)geçirebilirsiniz.
Aşağıdaki örnekte, analyze sabit bir çıkış şeması döndürür, giriş tablosundan sütunların bir alt kümesini seçer ve giriş tablosunun date sütununun değerlerine göre birkaç UDTF çağrısında bölümlendiğini belirtir:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add("longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word)",
alias="length_word")])