Aracılığıyla paylaş


Veri kümelerini tanımlama işlevleri

pyspark.pipelines modülü, burada dp adıyla bilinen, dekoratörleri kullanarak temel işlevselliğinin büyük bir kısmını uygular. Bu dekoratörler bir akış veya toplu iş sorgusu tanımlayan ve Apache Spark DataFrame döndüren bir işlevi kabul eder. Aşağıdaki söz diziminde, işlem hattı veri kümesinin definine yönelik basit bir örnek gösterilmektedir:

from pyspark import pipelines as dp

@dp.table()
def function_name(): # This is the function decorated
  return (<query>) # This is the query logic that defines the dataset

Bu sayfa, işlem hatlarında veri kümelerini tanımlayan işlevlere ve sorgulara genel bir bakış sağlar. Kullanılabilir dekoratörlerin tam listesi için bkz. Pipeline geliştirici referansı.

Veri kümelerini tanımlamak için kullandığınız işlevler, üçüncü taraf API'lere yapılan çağrılar da dahil olmak üzere veri kümesiyle ilgili olmayan rastgele Python mantığını içermemelidir. İşlem hatları planlama, doğrulama ve güncelleştirmeler sırasında bu işlevleri birden çok kez çalıştırır. Rastgele mantık eklemek beklenmeyen sonuçlara yol açabilir.

Veri kümesi tanımını başlatmak için verileri okuma

İşlem hattı veri kümelerini tanımlamak için kullanılan işlevler genellikle bir spark.read veya spark.readStream işlemiyle başlar. Bu okuma işlemleri, DataFrame'i döndürmeden önce ek dönüştürmeleri tanımlamak için kullandığınız statik veya akış dataframe nesnesini döndürür. DataFrame döndüren spark işlemlerine örnek olarak spark.table, veya spark.rangeverilebilir.

İşlevler hiçbir zaman işlevin dışında tanımlanan DataFrame'lere başvurmamalıdır. Farklı bir kapsamda tanımlanan DataFrame'lere başvurmaya çalışmak beklenmeyen davranışlara neden olabilir. Birden çok tablo oluşturmaya yönelik meta programlama düzeni örneği için bkz. Döngüde for tablo oluşturma.

Aşağıdaki örneklerde toplu iş veya akış mantığı kullanarak verileri okumak için temel söz dizimi gösterilmektedir:

from pyspark import pipelines as dp

# Batch read on a table
@dp.materialized_view()
def function_name():
  return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dp.materialized_view()
def function_name():
  return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dp.table()
def function_name():
  return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dp.table()
def function_name():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .load("/Volumes/catalog_name/schema_name/volume_name/data_path")
  )

Dış REST API'den veri okumanız gerekiyorsa, python özel veri kaynağı kullanarak bu bağlantıyı uygulayın. Bkz. PySpark özel veri kaynakları.

Uyarı

Python veri koleksiyonlarından pandas DataFrame'ler, dikteler ve listeler gibi rastgele Apache Spark Veri Çerçeveleri oluşturmak mümkündür. Bu desenler geliştirme ve test sırasında yararlı olabilir, ancak çoğu üretim işlem hattı veri kümesi tanımı dosyalardan, dış sistemden veya mevcut bir tablodan veya görünümden veri yükleyerek başlamalıdır.

Zincirleme Dönüştürmeler

İşlem hatları neredeyse tüm Apache Spark DataFrame dönüşümlerini destekler. Veri kümesi tanımı işlevinize istediğiniz sayıda dönüşüm ekleyebilirsiniz, ancak kullandığınız yöntemlerin her zaman bir DataFrame nesnesi döndürdiğinden emin olmalısınız.

Birkaç aşağı akış iş yükünü yönlendiren bir aracı dönüştürmeniz varsa ancak bunu tablo olarak gerçekleştirmeniz gerekmiyorsa, işlem hattınıza geçici bir görünüm eklemek için kullanın @dp.temporary_view() . Ardından birden çok alt veri kümesi tanımında bu görünüme spark.read.table("temp_view_name") ile referans verebilirsiniz. Aşağıdaki söz diziminde bu desen gösterilmektedir:

from pyspark import pipelines as dp

@dp.temporary_view()
def a():
  return spark.read.table("source").filter(...)

@dp.materialized_view()
def b():
  return spark.read.table("a").groupBy(...)

@dp.materialized_view()
def c():
  return spark.read.table("a").groupBy(...)

Bu işlem hattı, işlem hattı planlama sırasında görünümünüzdeki dönüşümler hakkında tam bilgi edinmesini sağlar ve veri kümesi tanımları dışında çalışan rastgele Python koduyla ilgili olası sorunları önler.

İşlevinizde, aşağıdaki örnekte olduğu gibi artımlı sonuçları görünümler, gerçekleştirilmiş görünümler veya akış tabloları olarak yazmadan yeni DataFrame'ler oluşturmak için DataFrame'leri birbirine zincirleyebilirsiniz:

from pyspark import pipelines as dp

@dp.table()
def multiple_transformations():
  df1 = spark.read.table("source").filter(...)
  df2 = df1.groupBy(...)
  return df2.filter(...)

Tüm DataFrame'leriniz toplu iş mantığını kullanarak ilk okumalarını gerçekleştiriyorsa, dönüş sonucunuz statik bir DataFrame olur. Akışta olan sorgularınız varsa, dönüş sonucunuz bir akış DataFrame'idir.

DataFrame döndürme

@dp.table kullanarak akış okumasının sonuçlarından bir akış tablosu oluşturun. Toplu okumanın sonuçlarından bir maddi görünüm oluşturmak için @dp.materialized_view kullanın. Diğer dekoratörlerin çoğu hem akış hem de statik DataFrame'ler üzerinde çalışırken, birkaçı ise yalnızca akış verisi gerektirir.

Veri kümesini tanımlamak için kullanılan işlevin Spark DataFrame döndürmesi gerekir. İşlem hattı veri kümesi kodunuzun bir parçası olarak dosyaları veya tabloları kaydeden veya bu tablolara yazan yöntemleri asla kullanmayın.

İşlem hattı kodunda hiçbir zaman kullanılmaması gereken Apache Spark işlemleri örnekleri:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

Uyarı

İşlem hatları ayrıca veri kümesi tanımı işlevleri için Spark üzerinde Pandas'ın kullanılmasını da destekler. Bkz. Spark üzerinde Pandas API'si.

Python işlem hattında SQL kullanma

PySpark, SQL kullanarak DataFrame kodu yazma işlecini destekler spark.sql . İşlem hattı kaynak kodunda bu deseni kullandığınızda, materyalize edilmiş görünümler veya akış tabloları için derlenir.

Aşağıdaki kod örneği, veri kümesi sorgu mantığı için kullanmaya spark.read.table("catalog_name.schema_name.table_name") eşdeğerdir:

@dp.materialized_view
def my_table():
  return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.read ve dlt.read_stream (eski)

Eski dlt modül, eski işlem hattı yayımlama modunda işlevselliği desteklemek için sunulan dlt.read() ve dlt.read_stream() işlevlerini içerir. Bu yöntemler desteklenir, ancak Databricks aşağıdaki sebeplerden dolayı her zaman spark.read.table() ve spark.readStream.table() işlevlerinin kullanılmasını önerir:

  • İşlevler dlt , geçerli işlem hattı dışında tanımlanan veri kümelerini okumak için sınırlı desteğe sahiptir.
  • İşlevler, spark gibi seçenekleri, örneğin skipChangeCommits, okuma işlemleri için belirtmeyi destekler. dlt işlevleri tarafından seçenek belirtmek desteklenmez.
  • dlt modülü, pyspark.pipelines modülü ile değiştirilmiştir. Databricks, from pyspark import pipelines as dp kullanarak Python'da işlem hattı kodu yazarken pyspark.pipelines içe aktarmayı önermektedir.