Aracılığıyla paylaş


Unity Kataloğu'nda Batch Python Kullanıcı tanımlı işlevleri (UDF)

Önemli

Bu özellik Genel Önizlemededir.

Batch Unity Kataloğu Python UDF'leri, veri toplu işlemleri üzerinde çalışacak Python kodu yazmanıza olanak tanıyarak Unity Kataloğu UDF'lerinin özelliklerini genişletir ve satır satır UDF'lerle ilişkili ek yükü azaltarak verimliliği önemli ölçüde artırır. Bu iyileştirmeler Unity Kataloğu toplu python UDF'lerini büyük ölçekli veri işleme için ideal hale getirir.

Gereksinimler

Batch Unity Kataloğu Python UDF'leri Databricks Runtime 16.3 ve üzeri sürümleri gerektirir.

Python UDF ile Toplu Unity Kataloğu Oluşturma

Batch Unity Kataloğu Python UDF oluşturmak, aşağıdaki eklemelerle normal bir Unity Kataloğu UDF oluşturmaya benzer:

  • PARAMETER STYLE PANDAS: Bu, UDF'nin pandas yineleyicilerini kullanarak verileri toplu olarak işlediğini belirtir.
  • HANDLER 'handler_function': Bu, toplu işlemleri işlemek için çağrılan işleyici işlevini belirtir.

Aşağıdaki örnekte Batch Unity Kataloğu Python UDF'sinin nasıl oluşturulacağı gösterilmektedir:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

UDF'yi kaydettikten sonra SQL veya Python kullanarak çağırabilirsiniz:

SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
  SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
  SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);

Toplu UDF işleyici işlevi

Batch Unity Kataloğu Python UDF'leri toplu işlemleri işleyen ve sonuçları veren bir işleyici işlevi gerektirir. anahtarı kullanarak UDF'yi oluştururken işleyici işlevinin HANDLER adını belirtmeniz gerekir.

İşleyici işlevi aşağıdakileri yapar:

  1. Bir veya daha fazla pandas.Series üzerinde yineleme yapan bir yineleyici bağımsız değişkenini kabul eder. Her pandas.Series biri UDF'nin giriş parametrelerini içerir.
  2. Yaratıcı üzerinde yineleme yapar ve verileri işler.
  3. Bir oluşturucu yineleyicisi döndürür.

Batch Unity Kataloğu Python UDF'leri girişle aynı sayıda satır döndürmelidir. İşleyici işlevi, her bir toplu işlemde, giriş serisiyle aynı uzunlukta bir pandas.Series üreterek bunu sağlar.

Özel bağımlılıkları yükleme

Dış kitaplıklar için özel bağımlılıklar tanımlayarak Batch Unity Kataloğu Python UDF'lerinin işlevselliğini Databricks Runtime ortamının ötesine genişletebilirsiniz.

Bkz. Özel bağımlılıkları kullanarak UDF'leri genişletme.

Batch UDF'leri tek veya birden çok parametreyi kabul edebilir

Tek parametre: İşleyici işlevi tek bir giriş parametresi kullandığında, bu parametre her toplu işlem için bir pandas.Series üzerinde yineleme yapan bir yineleyici alır.

%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  for value_batch in batch_iter:
    d = {"min": value_batch.min(), "max": value_batch.max()}
    yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;

Birden çok parametre: Birden çok giriş parametresi için işleyici işlevi birden çok pandas.Seriesüzerinde yineleyen bir yineleyici alır. Serideki değerler giriş parametreleriyle aynı sıradadır.

%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for p1, p2 in batch_iter: # same order as arguments above
    yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);

Pahalı işlemleri ayırarak performansı iyileştirme

Bu işlemleri işleyici işlevinden ayırarak hesaplama açısından pahalı işlemleri iyileştirebilirsiniz. Bu, veri toplu işlemleri üzerinde yapılan her yineleme yerine yalnızca bir kez yürütülmesini sağlar.

Aşağıdaki örnek, pahalı bir hesaplamanın yalnızca bir kez gerçekleştirilmesinin nasıl sağlandığını gösterir:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
  # expensive computation...
  return 1

expensive_value = compute_value()
def handler_func(batch_iter):
  for batch in batch_iter:
    yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL

Ortam yalıtımı

Uyarı

Paylaşılan yalıtım ortamları Databricks Runtime 17.1 ve üzerini gerektirir. Önceki sürümlerde, tüm Batch Unity Kataloğu Python UDF'leri katı yalıtım modunda çalışır.

Aynı sahip ve oturuma sahip Batch Unity Kataloğu Python UDF'leri varsayılan olarak bir yalıtım ortamını paylaşabilir. Bu, başlatılması gereken ayrı ortamların sayısını azaltarak performansı artırabilir ve bellek kullanımını azaltabilir.

Katı yalıtım

UDF'nin her zaman kendi, tamamen yalıtılmış ortamında çalıştığından emin olmak için karakteristik yan tümcesini STRICT ISOLATION ekleyin.

Çoğu UDF'nin katı yalıtıma ihtiyacı yoktur. Standart veri işleme UDF'leri varsayılan paylaşılan yalıtım ortamından yararlanır ve daha düşük bellek tüketimiyle daha hızlı çalışır.

Şu özelliklere sahip STRICT ISOLATION UDF'lere karakteristik yan tümcesini ekleyin:

  • eval(), exec() veya benzer işlevleri kullanarak girişi kod olarak çalıştırın.
  • Dosyaları yerel dosya sistemine yazma
  • Genel değişkenleri veya sistem durumunu değiştirme
  • Ortam değişkenlerini değiştirme

Aşağıdaki örnekte, girişi kod olarak yürüten ve katı yalıtım gerektiren bir UDF gösterilmektedir:

CREATE OR REPLACE TEMPORARY FUNCTION eval_string(input STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
STRICT ISOLATION
AS $$
import pandas as pd
from typing import Iterator

def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  for code_series in batch_iter:
    def eval_func(code):
      try:
        return str(eval(code))
      except Exception as e:
        return f"Error: {e}"
    yield code_series.apply(eval_func)
$$;

Batch Unity Kataloğu Python UDF'lerinde hizmet kimlik bilgileri

Batch Unity Kataloğu Python UDF'leri, dış bulut hizmetlerine erişmek için Unity Katalog Hizmeti Kimlik Bilgilerini kullanabilir. Bu özellikle güvenlik belirteci gibi bulut işlevlerini veri işleme iş akışlarıyla tümleştirmek için kullanışlıdır.

Uyarı

Hizmet kimlik bilgileri için UDF'ye özgü API:
UDF'lerde, hizmet kimlik bilgilerine erişmek için kullanın databricks.service_credentials.getServiceCredentialsProvider() .

Bu, UDF yürütme bağlamlarında bulunmayan not defterlerinde kullanılan işlevden dbutils.credentials.getServiceCredentialsProvider() farklıdır.

Hizmet kimlik bilgileri oluşturmak için bkz. Hizmet kimlik bilgileri oluşturma.

UDF tanımındaki yan tümcesinde CREDENTIALS kullanmak istediğiniz hizmet kimlik bilgilerini belirtin:

CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
  `credential-name` DEFAULT,
  `complicated-credential-name` AS short_name,
  `simple-cred`,
  cred_no_quotes
)
AS $$
# Python code here
$$;

Hizmet kimlik bilgileri izinleri

UDF oluşturucusunun Unity Kataloğu hizmeti kimlik bilgileri üzerinde ACCESS izni olmalıdır. Ancak UDF çağırıcıları için UDF üzerinde onlara EXECUTE izni vermek yeterlidir. Özellikle, UDF çağıranların temel hizmet kimlik bilgilerine erişmesine gerek yoktur, çünkü UDF, UDF oluşturucusunun kimlik bilgisi izinlerini kullanarak çalışır.

Geçici işlevler için oluşturucu her zaman çağırandır. Ayrılmış kümeler olarak da bilinen No-PE kapsamında çalışan UDF'ler bunun yerine çağıranın izinlerini kullanır.

Varsayılan kimlik bilgileri ve diğer adlar

Yan tümcesine CREDENTIALS birden çok kimlik bilgisi ekleyebilirsiniz, ancak yalnızca biri DEFAULT işaretlenebilir. Varsayılan olmayan kimlik bilgilerine AS anahtar sözcüğü kullanarak diğer ad verebilirsiniz. Her kimlik bilgisi için benzersiz bir takma ad olmalıdır.

Yama uygulanmış bulut SDK'ları varsayılan kimlik bilgilerini otomatik olarak algılar. Varsayılan kimlik bilgileri, işlemin Spark yapılandırmasında belirtilen varsayılan kimlik bilgilerinden önceliklidir ve Unity Kataloğu UDF tanımında kalır.

Sağlayıcıyı kullanmak için azure-identity paketini yüklemeniz gerekir. Dış kitaplıkları yüklemek için yan tümcesini ENVIRONMENT kullanın. Dış kitaplıkları yükleme hakkında daha fazla bilgi edinmek için bkz. Özel bağımlılıkları kullanarak UDF'leri genişletme.

Hizmet kimlik bilgisi örneği - Azure Blob Depolama

Aşağıdaki örnek, Batch Unity Kataloğu Python UDF'den Azure Blob Depolama'ya erişmek için bir hizmet kimlik bilgisi kullanır:

%sql
CREATE OR REPLACE FUNCTION main.test.read_azure_blob(blob_name STRING) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
  dependencies = '["azure-identity", "azure-storage-blob"]', environment_version = 'None'
)
AS $$
import pandas as pd
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient


def batchhandler(it):
  # DefaultAzureCredential automatically uses the DEFAULT service credential
  credential = DefaultAzureCredential()
  blob_service = BlobServiceClient(
    account_url="https://your-storage-account.blob.core.windows.net",
    credential=credential
  )
  container = blob_service.get_container_client("your-container")

  for blob_names in it:
    results = []
    for name in blob_names:
      blob_client = container.get_blob_client(name)
      try:
        content = blob_client.download_blob().readall().decode("utf-8")
        results.append(content)
      except Exception as e:
        results.append(f"Error: {e}")
    yield pd.Series(results)
$$;

Kaydedildikten sonra UDF'yi çağırın:

SELECT main.test.read_azure_blob(blob_name)
FROM VALUES
('config/settings.json'),
('data/input.txt')
AS t(blob_name)

Görev yürütme bağlamını al

Kullanıcının kimliği, küme etiketleri, spark görev kimliği ve daha fazlası gibi bağlam bilgilerini almak için TaskContext PySpark API'sini kullanın. See Bir UDF'de görev bağlamını al.

İşlevinizin tutarlı sonuçlar üretip üretmediğini ayarlama DETERMINISTIC

Aynı girişler için aynı çıkışları oluşturuyorsa işlev tanımınıza ekleyin DETERMINISTIC . Bu, sorgu iyileştirmelerinin performansı geliştirmesine olanak tanır.

Varsayılan olarak, Batch Unity Kataloğu Python UDF'lerinin açıkça bildirilmediği sürece deterministioc olmadığı varsayılır. Belirlenemeyen işlevlere örnek olarak şunlar verilebilir: rastgele değerler oluşturma, geçerli saatlere veya tarihlere erişme veya dış API çağrıları yapma.

Bkz.CREATE FUNCTION (SQL ve Python)

Sınırlamalar

  • Python işlevlerinin değerleri bağımsız olarak işlemesi NULL ve tüm tür eşlemelerinin Azure Databricks SQL dil eşlemelerini izlemesi gerekir.
  • Batch Unity Kataloğu Python UDF'leri güvenli, yalıtılmış bir ortamda çalışır ve paylaşılan dosya sistemine veya iç hizmetlere erişimi yoktur.
  • Bir aşamadaki birden çok UDF çağrısı serileştirilir ve ara sonuçlar elde edilir ve diske taşabilir.
  • Hizmet kimlik bilgileri yalnızca Batch Unity Kataloğu Python UDF'lerinde ve Skaler Python UDF'lerinde kullanılabilir. Standart Unity Kataloğu Python UDF'lerinde desteklenmez.
  • Ayrılmış kümelerde ve geçici işlevler için, işlev çağıranın hizmet kimlik bilgileri üzerinde ACCESS izinleri olmalıdır. Bkz Hizmet kimlik bilgilerini kullanarak bir dış bulut hizmetine erişim izni verme.
  • Genel Önizleme özelliğini etkinleştirme Sunucusuz SQL ambarı işlemindeki dış hizmetlere Batch Unity Kataloğu Python UDF çağrıları yapmak için çalışma alanınızın Önizlemeler sayfasındaki Sunucusuz SQL Ambarlarında UDF'ler için ağ oluşturmayı etkinleştirin.
  • Sunucusuz bir not defterinde veya iş işleminde Batch Unity Kataloğu Python UDF çağrıları yapmak için sunucusuz çıkış denetimini yapılandırmanız gerekir