Aracılığıyla paylaş


Python kullanıcı tanımlı tablo işlevleri (UDF)

Önemli

Bu özellik Databricks Runtime 14.3 LTS ve üzeri Genel Önizleme'ndedir.

Kullanıcı tanımlı tablo işlevi (UDTF), skaler değerler yerine tablo döndüren işlevleri kaydetmenize olanak tanır. Her çağrıdan tek bir sonuç değeri döndüren skaler işlevlerin aksine, her UDTF bir SQL deyiminin FROM yan tümcesinde çağrılır ve bir tablonun tamamını çıkış olarak döndürür.

Her bir UDTF çağrısı sıfır veya daha fazla bağımsız değişken kabul edebilir. Bu bağımsız değişkenler, giriş tablolarının tamamını temsil eden skaler ifadeler veya tablo bağımsız değişkenleri olabilir.

UDF'ler iki şekilde kaydedilebilir:

Tip

Databricks, işlevleri kullanıcılar ve ekipler arasında güvenli bir şekilde paylaşmayı ve yeniden kullanmayı kolaylaştıran merkezi idareden yararlanmak için UDF'lerin Unity Kataloğu'na kaydedilmesini önerir.

Temel UDTF söz dizimi

Apache Spark, çıktı satırları yaymak için eval kullanan zorunlu bir yield yöntemiyle Python UDF'lerini Python sınıfları olarak uygular.

Sınıfınızı UDTF olarak kullanmak için PySpark udtf işlevini içeri aktarmanız gerekir. Databricks, bu işlevin dekoratör olarak kullanılmasını ve returnType seçeneğini kullanarak alan adlarını ve türlerini açıkça belirtmeyi önerir (sınıfı sonraki bir bölümde açıklandığı gibi bir analyze yöntemi tanımlamadığı sürece).

Aşağıdaki UDTF, iki tamsayı bağımsız değişkeninin sabit listesini kullanarak bir tablo oluşturur:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

UDTF kaydet

SQL sorgularında kullanılmak üzere oturum kapsamlı bir UDTF kaydetmek için kullanın spark.udtf.register(). SQL işlevi ve Python UDTF sınıfı için bir ad sağlayın.

spark.udtf.register("get_sum_diff", GetSumDiff)

Kayıtlı bir UDTF'yi çağır.

Kaydoldıktan sonra, %sql magic komutunu veya spark.sql() işlevini kullanarak SQL'de UDTF'yi kullanabilirsiniz:

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);").show()
%sql
SELECT * FROM get_sum_diff(1,2);

Oturum kapsamlı bir UDTF'yi Unity Kataloğu'na yükseltme

Önemli

Unity Kataloğu'nda Python UDF'lerinin kaydedilmesi Genel Önizleme aşamasındadır. Unity Kataloğu UDF'leri Databricks Runtime sürüm 17.1 ve üzerini gerektirir. Bkz . Gereksinimler.

Merkezi idareden yararlanmak ve kullanıcılar ve ekipler arasında işlevleri güvenli bir şekilde paylaşmayı ve yeniden kullanmayı kolaylaştırmak için oturum kapsamlı bir UDTF'yi Unity Kataloğu'na yükseltebilirsiniz.

Oturum kapsamlı bir UDTF'yi Unity Kataloğu'na yükseltmek için deyimiyle BIRLIKTE CREATE OR REPLACE FUNCTION SQL DDL kullanın. Aşağıdaki örnekte UDTF'nin GetSumDiff oturum kapsamlı bir işlevden Unity Kataloğu işlevine nasıl dönüştürüldüğü gösterilmektedir:

CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y
$$;

SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13  | 7    |
+-----+------+

Unity Kataloğu UDF'leri hakkında daha fazla bilgi için bkz. Unity Kataloğu'nda Python kullanıcı tanımlı tablo işlevleri (UDF'ler).

Apache Ok kullanma

UDTF'niz giriş olarak az miktarda veri alıyor ancak büyük bir tablo çıktısı alıyorsa Databricks Apache Arrow kullanmanızı önerir. UDTF'yi bildirirken useArrow parametresini belirterek etkinleştirebilirsiniz:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Değişken argüman listeleri - *args ve **kwargs

Python *args veya **kwargs söz dizimini kullanabilir ve belirtilmeyen sayıda giriş değerini işlemek için mantık uygulayabilirsiniz.

Aşağıdaki örnek, bağımsız değişkenler için giriş uzunluğunu ve türlerini açıkça denetlerken aynı sonucu döndürür:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

İşte aynı örnek, ancak anahtar kelime argümanları kullanılarak:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Kayıt zamanında statik şema tanımlama

UDTF, sıralı sütun adları ve türleri dizisini içeren bir çıkış şemasına sahip satırları döndürür. UDTF şemasının tüm sorgular için her zaman aynı kalması gerekiyorsa, @udtf dekoratörden sonra statik, sabit bir şema belirtebilirsiniz. Ya StructTypeolmalıdır:

StructType().add("c1", StringType())

Veya yapı türünü temsil eden bir DDL dizesi:

c1: string

İşlev çağrısı zamanında dinamik şema hesaplama

UDF'ler, giriş bağımsız değişkenlerinin değerlerine bağlı olarak her çağrı için program aracılığıyla çıkış şemasını da hesaplayabilir. Bunu yapmak için, belirli bir UDTF çağrısına sağlanan bağımsız değişkenlere karşılık gelen sıfır veya daha fazla parametre kabul eden analyze adlı statik bir yöntem tanımlayın.

analyze yönteminin her bağımsız değişkeni, aşağıdaki alanları içeren AnalyzeArgument sınıfının bir örneğidir:

AnalyzeArgument sınıf alanı Açıklama
dataType Giriş bağımsız değişkeninin türü DataTypeolarak belirtilmiştir. Giriş tablosu bağımsız değişkenleri için bu, tablonun sütunlarını temsil eden bir StructType tablodur.
value Giriş bağımsız değişkeninin Optional[Any]olarak değeri. Bu, sabit olmayan tablo bağımsız değişkenleri veya literal skaler bağımsız değişkenler için None.
isTable Giriş bağımsız değişkeninin BooleanTypeolarak bir tablo olup olmadığını kontrol edin.
isConstantExpression Giriş bağımsız değişkeninin BooleanTypeolarak sabit katlanabilir bir ifade olup olmadığı.

analyze yöntemi, sonuç tablosunun şemasını AnalyzeResult ve isteğe bağlı bazı alanları içeren sınıfının bir StructType örneğini döndürür. UDTF bir giriş tablosu bağımsız değişkenini kabul ederse, AnalyzeResult daha sonra açıklandığı gibi birkaç UDTF çağrısında giriş tablosunun satırlarını bölümleyip sıralamak için istenen bir yol da içerebilir.

AnalyzeResult sınıf alanı Açıklama
schema sonuç tablosunun StructTypeolarak şeması.
withSinglePartition Tüm giriş satırlarının BooleanTypeolarak aynı UDTF sınıf örneğine gönderilip gönderilmeymeyeceği.
partitionBy Boş olmayan olarak ayarlanırsa, bölümleme ifadelerinin her benzersiz değer bileşimine sahip tüm satırlar UDTF sınıfının ayrı bir örneği tarafından kullanılır.
orderBy Boş olmayan olarak ayarlanırsa, her bölüm içindeki satırların sırasını belirtir.
select Boş olmayan olarak ayarlanırsa, bu, UDTF'nin Catalyst'in giriş TABLE bağımsız değişkenindeki sütunlara göre değerlendirmesi için belirttiği bir ifade dizisidir. UDTF listedeki her ad için listelendikleri sırayla bir giriş özniteliği alır.

Bu analyze örnek, girdi dizesi parametresindeki her sözcük için bir çıkış sütunu döndürür.

from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult


@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result

MyUDTF(lit("hello world")).columns
['word_0', 'word_1']

Durumu gelecekteki eval çağrılarına iletme

analyze yöntemi, başlatma gerçekleştirmek ve ardından sonuçları aynı UDTF çağrısı için gelecekteki eval yöntem çağrılarına iletmek için uygun bir yer olarak işlev görebilir.

Bunu yapmak için AnalyzeResult bir alt sınıfı oluşturun ve analyze yönteminden alt sınıfın bir örneğini döndürin. Ardından, bu örneği kabul etmek için __init__ yöntemine ek bir bağımsız değişken ekleyin.

Bu analyze örnek sabit bir çıkış şeması döndürür, ancak gelecekteki __init__ yöntem çağrıları tarafından kullanılacak sonuç meta verilerine özel bilgiler ekler:

from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Çıktı satırlarını oluştur

eval yöntemi, giriş tablosu bağımsız değişkeninin her satırı için bir kez (veya tablo bağımsız değişkeni sağlanmadığında yalnızca bir kez) ve ardından sonunda bir terminate yöntemi çağrılır. Her iki yöntem de sonuç şemasına uyan sıfır veya daha fazla satır elde etmek için demetler, listeler veya pyspark.sql.Row nesneleri üretir.

Bu örnek, üç öğeden oluşan bir demet sağlayarak bir satır döndürür.

def eval(self, x, y, z):
  yield (x, y, z)

Ayraçları da atlayabilirsiniz:

def eval(self, x, y, z):
  yield x, y, z

Yalnızca bir sütun içeren bir satır döndürmek için sondaki virgülleri ekleyin:

def eval(self, x, y, z):
  yield x,

Ayrıca bir pyspark.sql.Row nesnesi de sağlayabilirsiniz.

def eval(self, x, y, z):
  from pyspark.sql.types import Row
  yield Row(x, y, z)

Bu örnek, Python listesi kullanarak terminate yönteminden çıkış satırları verir. Bu amaçla UDTF değerlendirmesindeki önceki adımlardan sınıfın içinde durum depolayabilirsiniz.

def terminate(self):
  yield [self.x, self.y, self.z]

UDTF'ye skaler argümanları geçirme

Skaler argümanları, sabit değerler veya bunlara dayalı işlevlerden oluşan sabit ifadeler olarak bir UDTF'ye geçirebilirsiniz. Mesela:

SELECT * FROM get_sum_diff(1, y => 2)

Tablo argümanlarını bir UDTF'ye aktarma

Python UDF'leri, skaler giriş bağımsız değişkenlerine ek olarak bir giriş tablosunu bağımsız değişken olarak kabul edebilir. Tek bir UDTF, bir tablo bağımsız değişkeni ve birden çok skaler bağımsız değişkeni kabul edebilir.

Daha sonra herhangi bir SQL sorgusu, TABLE anahtar sözcüğünü ve ardından TABLE(t)gibi uygun bir tablo tanımlayıcısını çevreleyen parantezleri kullanarak bir giriş tablosu sağlayabilir. Alternatif olarak, TABLE(SELECT a, b, c FROM t) veya TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))gibi bir tablo alt sorgusu geçirebilirsiniz.

Giriş tablosu bağımsız değişkeni daha sonra pyspark.sql.Row yöntemine bir eval bağımsız değişkeni olarak temsil edilir ve giriş tablosundaki her satır için bir eval yöntemi çağrısı yapılır. Her satırdaki sütunlarla etkileşime geçmek için standart PySpark sütun alanı ek açıklamalarını kullanabilirsiniz. Aşağıdaki örnek, PySpark Row türünü açıkça içeri aktarmayı ve id alanında geçirilen tabloyu filtrelemeyi gösterir:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

İşlevi sorgulamak için TABLE SQL anahtar sözcüğünü kullanın:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

İşlev çağrılarından giriş satırlarının bölümlesini belirtin

Tablo bağımsız değişkeniyle bir UDTF çağırırken, herhangi bir SQL sorgusu giriş tablosunu bir veya daha fazla giriş tablosu sütununun değerlerine göre birkaç UDTF çağrısı arasında bölümleyebilir.

Bir bölüm belirtmek için, PARTITION BY bağımsız değişkeninden sonraki işlev çağrısında TABLE tümcesini kullanın. Bu, bölümleme sütunlarının her benzersiz değer bileşimine sahip tüm giriş satırlarının UDTF sınıfının tam olarak bir örneği tarafından tüketileceğini garanti eder.

Basit sütun başvurularına ek olarak, PARTITION BY yan tümcesinin giriş tablosu sütunlarına göre rastgele ifadeleri de kabul ettiğini unutmayın. Örneğin, bir dizenin LENGTH belirtebilir, bir tarihten bir ay ayıklayabilir veya iki değeri birleştirebilirsiniz.

Tüm giriş satırlarının UDTF sınıfının tam olarak bir örneği tarafından tüketilmesi gereken tek bir bölüm istemek için WITH SINGLE PARTITION yerine PARTITION BY belirtmek de mümkündür.

Her bölümde, UDTF'nin eval yöntemi bunları tükettiğinden isteğe bağlı olarak giriş satırlarının gerekli sıralamasını belirtebilirsiniz. Bunu yapmak için, yukarıda açıklanan ORDER BY veya PARTITION BY yan tümcesinin ardından bir WITH SINGLE PARTITION yan tümcesi sağlayın.

Örneğin, aşağıdaki UDTF'yi göz önünde bulundurun:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Giriş tablosu üzerinden UDTF'yi çağırırken bölümleme seçeneklerini birden çok şekilde belirtebilirsiniz:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8);
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

analyze yönteminden giriş satırlarının bölümlesini belirtin

SQL sorgularında UDF'leri çağırırken giriş tablosunu bölümlemenin yukarıdaki yollarından her biri için, UDTF'nin analyze yönteminin aynı bölümleme yöntemini otomatik olarak belirtmesinin karşılık gelen bir yolu olduğunu unutmayın.

  • UDTF'yi SELECT * FROM udtf(TABLE(t) PARTITION BY a)olarak çağırmak yerine analyze yöntemini güncelleştirerek alan partitionBy=[PartitioningColumn("a")] ayarlayabilir ve SELECT * FROM udtf(TABLE(t))kullanarak işlevi çağırabilirsiniz.
  • Aynı belirteçle, SQL sorgusunda TABLE(t) WITH SINGLE PARTITION ORDER BY b belirtmek yerine, analyze alanları withSinglePartition=true ve orderBy=[OrderingColumn("b")] ayarlayabilir ve sonra TABLE(t)geçirebilirsiniz.
  • SQL sorgusunda TABLE(SELECT a FROM t) geçirmek yerine analyzeselect=[SelectedColumn("a")] ayarlayabilir ve ardından TABLE(t)geçirebilirsiniz.

Aşağıdaki örnekte, analyze sabit bir çıkış şeması döndürür, giriş tablosundan sütunların bir alt kümesini seçer ve giriş tablosunun date sütununun değerlerine göre birkaç UDTF çağrısında bölümlendiğini belirtir:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add("longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word)",
        alias="length_word")])