Megosztás a következőn keresztül:


Adathalmazok definiálására vonatkozó függvények

A pyspark.pipelines (itt más néven dp) modul az alapvető funkciók nagy részét dekorátorok használatával valósítja meg. Ezek a dekorátorok elfogadnak egy függvényt, amely stream- vagy köteglekérdezést határoz meg, és egy Apache Spark DataFrame-et ad vissza. Az alábbi szintaxis egy egyszerű példát mutat be egy folyamatadatkészlet definiálására:

from pyspark import pipelines as dp

@dp.table()
def function_name(): # This is the function decorated
  return (<query>) # This is the query logic that defines the dataset

Ez a lap áttekintést nyújt a folyamatok adatkészleteit meghatározó függvényekről és lekérdezésekről. Az elérhető dekorátorok teljes listájának megtekintéséhez lásd: Folyamatfejlesztői referencia.

Az adathalmazok definiálásához használt függvények nem tartalmazhatnak az adathalmaztól független tetszőleges Python-logikát, beleértve a külső API-k felé irányuló hívásokat is. A folyamatok többször futtatják ezeket a függvényeket a tervezés, az ellenőrzés és a frissítések során. Az tetszőleges logika beleszámítása váratlan eredményekhez vezethet.

Adatok olvasása adathalmazdefiníció elindításához

A folyamatadatkészletek definiálásához használt függvények általában spark.read vagy spark.readStream művelettel kezdődnek. Ezek az olvasási műveletek egy statikus vagy streamelt DataFrame-objektumot ad vissza, amelyet további átalakítások definiálásához használ a DataFrame visszaadása előtt. A DataFrame-et visszaadó spark-műveletekre további példák a következők: spark.tablevagy spark.range.

A függvények soha nem hivatkoznak a függvényen kívül definiált DataFrame-ekre. Ha egy másik hatókörben definiált DataFrame-ekre próbál hivatkozni, az váratlan viselkedést eredményezhet. Több tábla létrehozására szolgáló metaprogramozási mintára példa: Táblák létrehozása hurokbanfor.

Az alábbi példák az adatok kötegelt vagy streamelési logikával történő olvasásának alapszintaxisát mutatják be:

from pyspark import pipelines as dp

# Batch read on a table
@dp.materialized_view()
def function_name():
  return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dp.materialized_view()
def function_name():
  return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dp.table()
def function_name():
  return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dp.table()
def function_name():
  return (spark.read
    .format("cloudFiles")
    .option("cloudFile.format", "parquet")
    .load("/Volumes/catalog_name/schema_name/volume_name/data_path")
  )

Ha külső REST API-ból kell adatokat olvasnia, ezt a kapcsolatot pythonos egyéni adatforrás használatával valósíthatja meg. Lásd: PySpark egyéni adatforrások.

Megjegyzés:

Tetszőleges Apache Spark DataFrame-eket hozhat létre Python-adatgyűjteményekből, beleértve a pandas DataFrame-eket, a diktálásokat és a listákat. Ezek a minták hasznosak lehetnek a fejlesztés és a tesztelés során, de az éles folyamat adatkészletének legtöbb definíciója fájlból, külső rendszerből vagy egy meglévő táblából vagy nézetből való adatbetöltéssel kezdődik.

Láncolásos átalakítások

A folyamatok szinte minden Apache Spark DataFrame-átalakítást támogatnak. Tetszőleges számú átalakítást belefoglalhat az adathalmaz definíciós függvényébe, de győződjön meg arról, hogy az ön által használt metódusok mindig egy DataFrame-objektumot adnak vissza.

Ha olyan köztes átalakítással rendelkezik, amely több alsóbb rétegbeli számítási feladatot hajt le, de nem kell táblázatként létrehoznia, @dp.temporary_view() használjon ideiglenes nézetet a folyamathoz. Ezt a nézetet ezután több alsóbb rétegbeli adatkészlet-definícióval spark.read.table("temp_view_name") is hivatkozhatja. A következő szintaxis ezt a mintát mutatja be:

from pyspark import pipelines as dp

@dp.temporary_view()
def a():
  return spark.read.table("source").filter(...)

@dp.materialized_view()
def b():
  return spark.read.table("a").groupBy(...)

@dp.materialized_view()
def c():
  return spark.read.table("a").groupBy(...)

Ez biztosítja, hogy a folyamat teljes mértékben tisztában legyen a folyamattervezés során megjelenő átalakításokkal, és megakadályozza az adathalmaz-definíciókon kívül futó tetszőleges Python-kódokkal kapcsolatos lehetséges problémákat.

Az ön függvényén belül láncolhatja a DataFrame-eket, hogy új DataFrame-eket készítsen anélkül, hogy az eredményeket növekményes nézetként, materializált nézetként vagy folyamatosan frissülő táblaként írna, ahogy az alábbi példában is látható.

from pyspark import pipelines as dp

@dp.table()
def multiple_transformations():
  df1 = spark.read.table("source").filter(...)
  df2 = df1.groupBy(...)
  return df2.filter(...)

Ha az összes DataFrame köteglogikával végzi el a kezdeti olvasást, a visszatérési eredmény egy statikus DataFrame lesz. Ha van olyan lekérdezése, amely streaming módszerrel történik, a visszatérési eredmény egy streamelt DataFrame lesz.

Adjon vissza egy DataFrame-et

A @dp.table használatával hozzon létre egy streamelési táblát egy streaming olvasás eredményeiből. Használja a @dp.materialized_view elemet egy materializált nézet létrehozásához a kötegelt olvasás eredményeiből. A legtöbb más dekoratőr streamelő és statikus DataFrame-en is dolgozik, míg néhányhoz streamelő DataFrame szükséges.

Az adathalmaz meghatározásához használt függvénynek Spark DataFrame-et kell visszaadnia. Soha ne használjon olyan metódusokat, amelyek a folyamatadatkészlet kódjának részeként fájlokat vagy táblákat mentenek vagy írnak.

Példák olyan Apache Spark-műveletekre, amelyeket soha nem szabad használni a folyamatkódban:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

Megjegyzés:

A csővezetékek a Pandas a Spark platformon való használatát is támogatják az adathalmazok meghatározásához szükséges függvényekkel. Lásd Spark Pandas API-t.

SQL használata Python-folyamatban

A PySpark támogatja az spark.sql operátort, hogy DataFrame-kódot írjon az SQL használatával. Ha ezt a mintát a pipeline forráskódjában használja, az materializált nézetekre vagy stream táblákra fordítódik.

Az alábbi példakód egyenértékű az adathalmaz lekérdezési logikájának használatával spark.read.table("catalog_name.schema_name.table_name") :

@dp.materialized_view
def my_table():
  return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.read és dlt.read_stream (örökölt)

A régebbi dlt modul tartalmaz dlt.read() és dlt.read_stream() funkciókat, amelyek a korábbi folyamat közzétételi módjának funkcióit támogatják. Ezek a metódusok támogatottak, de a Databricks az alábbiak miatt mindig a spark.read.table() és spark.readStream.table() függvények használatát javasolja.

  • A dlt függvények korlátozottan támogatják az aktuális folyamaton kívül definiált adathalmazok olvasását.
  • A spark függvények támogatják az olvasási műveletekhez hasonló skipChangeCommitsbeállítások megadását. A függvények nem támogatják a beállítások megadását dlt .
  • A dlt modult a pyspark.pipelines modul váltotta fel. A Databricks azt javasolja, hogy from pyspark import pipelines as dp használja az pyspark.pipelines importálását a pipelines kódjának Pythonban való írásakor.