Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
A pyspark.pipelines (itt más néven dp) modul az alapvető funkciók nagy részét dekorátorok használatával valósítja meg. Ezek a dekorátorok elfogadnak egy függvényt, amely stream- vagy köteglekérdezést határoz meg, és egy Apache Spark DataFrame-et ad vissza. Az alábbi szintaxis egy egyszerű példát mutat be egy folyamatadatkészlet definiálására:
from pyspark import pipelines as dp
@dp.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
Ez a lap áttekintést nyújt a folyamatok adatkészleteit meghatározó függvényekről és lekérdezésekről. Az elérhető dekorátorok teljes listájának megtekintéséhez lásd: Folyamatfejlesztői referencia.
Az adathalmazok definiálásához használt függvények nem tartalmazhatnak az adathalmaztól független tetszőleges Python-logikát, beleértve a külső API-k felé irányuló hívásokat is. A folyamatok többször futtatják ezeket a függvényeket a tervezés, az ellenőrzés és a frissítések során. Az tetszőleges logika beleszámítása váratlan eredményekhez vezethet.
Adatok olvasása adathalmazdefiníció elindításához
A folyamatadatkészletek definiálásához használt függvények általában spark.read vagy spark.readStream művelettel kezdődnek. Ezek az olvasási műveletek egy statikus vagy streamelt DataFrame-objektumot ad vissza, amelyet további átalakítások definiálásához használ a DataFrame visszaadása előtt. A DataFrame-et visszaadó spark-műveletekre további példák a következők: spark.tablevagy spark.range.
A függvények soha nem hivatkoznak a függvényen kívül definiált DataFrame-ekre. Ha egy másik hatókörben definiált DataFrame-ekre próbál hivatkozni, az váratlan viselkedést eredményezhet. Több tábla létrehozására szolgáló metaprogramozási mintára példa: Táblák létrehozása hurokbanfor.
Az alábbi példák az adatok kötegelt vagy streamelési logikával történő olvasásának alapszintaxisát mutatják be:
from pyspark import pipelines as dp
# Batch read on a table
@dp.materialized_view()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dp.materialized_view()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dp.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dp.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
Ha külső REST API-ból kell adatokat olvasnia, ezt a kapcsolatot pythonos egyéni adatforrás használatával valósíthatja meg. Lásd: PySpark egyéni adatforrások.
Megjegyzés:
Tetszőleges Apache Spark DataFrame-eket hozhat létre Python-adatgyűjteményekből, beleértve a pandas DataFrame-eket, a diktálásokat és a listákat. Ezek a minták hasznosak lehetnek a fejlesztés és a tesztelés során, de az éles folyamat adatkészletének legtöbb definíciója fájlból, külső rendszerből vagy egy meglévő táblából vagy nézetből való adatbetöltéssel kezdődik.
Láncolásos átalakítások
A folyamatok szinte minden Apache Spark DataFrame-átalakítást támogatnak. Tetszőleges számú átalakítást belefoglalhat az adathalmaz definíciós függvényébe, de győződjön meg arról, hogy az ön által használt metódusok mindig egy DataFrame-objektumot adnak vissza.
Ha olyan köztes átalakítással rendelkezik, amely több alsóbb rétegbeli számítási feladatot hajt le, de nem kell táblázatként létrehoznia, @dp.temporary_view() használjon ideiglenes nézetet a folyamathoz. Ezt a nézetet ezután több alsóbb rétegbeli adatkészlet-definícióval spark.read.table("temp_view_name") is hivatkozhatja. A következő szintaxis ezt a mintát mutatja be:
from pyspark import pipelines as dp
@dp.temporary_view()
def a():
return spark.read.table("source").filter(...)
@dp.materialized_view()
def b():
return spark.read.table("a").groupBy(...)
@dp.materialized_view()
def c():
return spark.read.table("a").groupBy(...)
Ez biztosítja, hogy a folyamat teljes mértékben tisztában legyen a folyamattervezés során megjelenő átalakításokkal, és megakadályozza az adathalmaz-definíciókon kívül futó tetszőleges Python-kódokkal kapcsolatos lehetséges problémákat.
Az ön függvényén belül láncolhatja a DataFrame-eket, hogy új DataFrame-eket készítsen anélkül, hogy az eredményeket növekményes nézetként, materializált nézetként vagy folyamatosan frissülő táblaként írna, ahogy az alábbi példában is látható.
from pyspark import pipelines as dp
@dp.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
Ha az összes DataFrame köteglogikával végzi el a kezdeti olvasást, a visszatérési eredmény egy statikus DataFrame lesz. Ha van olyan lekérdezése, amely streaming módszerrel történik, a visszatérési eredmény egy streamelt DataFrame lesz.
Adjon vissza egy DataFrame-et
A @dp.table használatával hozzon létre egy streamelési táblát egy streaming olvasás eredményeiből. Használja a @dp.materialized_view elemet egy materializált nézet létrehozásához a kötegelt olvasás eredményeiből. A legtöbb más dekoratőr streamelő és statikus DataFrame-en is dolgozik, míg néhányhoz streamelő DataFrame szükséges.
Az adathalmaz meghatározásához használt függvénynek Spark DataFrame-et kell visszaadnia. Soha ne használjon olyan metódusokat, amelyek a folyamatadatkészlet kódjának részeként fájlokat vagy táblákat mentenek vagy írnak.
Példák olyan Apache Spark-műveletekre, amelyeket soha nem szabad használni a folyamatkódban:
collect()count()toPandas()save()saveAsTable()start()toTable()
Megjegyzés:
A csővezetékek a Pandas a Spark platformon való használatát is támogatják az adathalmazok meghatározásához szükséges függvényekkel. Lásd Spark Pandas API-t.
SQL használata Python-folyamatban
A PySpark támogatja az spark.sql operátort, hogy DataFrame-kódot írjon az SQL használatával. Ha ezt a mintát a pipeline forráskódjában használja, az materializált nézetekre vagy stream táblákra fordítódik.
Az alábbi példakód egyenértékű az adathalmaz lekérdezési logikájának használatával spark.read.table("catalog_name.schema_name.table_name") :
@dp.materialized_view
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read és dlt.read_stream (örökölt)
A régebbi dlt modul tartalmaz dlt.read() és dlt.read_stream() funkciókat, amelyek a korábbi folyamat közzétételi módjának funkcióit támogatják. Ezek a metódusok támogatottak, de a Databricks az alábbiak miatt mindig a spark.read.table() és spark.readStream.table() függvények használatát javasolja.
- A
dltfüggvények korlátozottan támogatják az aktuális folyamaton kívül definiált adathalmazok olvasását. - A
sparkfüggvények támogatják az olvasási műveletekhez hasonlóskipChangeCommitsbeállítások megadását. A függvények nem támogatják a beállítások megadásátdlt. - A
dltmodult apyspark.pipelinesmodul váltotta fel. A Databricks azt javasolja, hogyfrom pyspark import pipelines as dphasználja azpyspark.pipelinesimportálását a pipelines kódjának Pythonban való írásakor.