Az első strukturált streamelési számítási feladat futtatása
Ez a cikk az első strukturált streamelési lekérdezések Azure Databricksen való futtatásához szükséges alapfogalmak kód példáit és magyarázatát tartalmazza. A strukturált streamelést közel valós idejű és növekményes feldolgozási számítási feladatokhoz használhatja.
A strukturált streamelés az egyik olyan technológia, amely streamelési táblákat üzemeltet a Delta Live Tablesben. A Databricks a Delta Live Tables használatát javasolja az összes új ETL-, betöltési és strukturált streamelési számítási feladathoz. Lásd : Mi az a Delta Live Tables?.
Feljegyzés
Míg a Delta Live Tables kissé módosított szintaxist biztosít a streamelési táblák deklarálásához, a streamelési olvasások és átalakítások konfigurálásának általános szintaxisa az Azure Databricks összes streamelési használati esetére vonatkozik. A Delta Live Tables az állapotinformációk, metaadatok és számos konfiguráció kezelésével is leegyszerűsíti a streamelést.
Streamelési adatok olvasása az objektumtárolóból az Automatikus betöltő használatával
Az alábbi példa a JSON-adatok automatikus betöltővel való betöltését mutatja be, amely cloudFiles
formátumot és beállításokat jelöl. Ez a schemaLocation
beállítás lehetővé teszi a séma következtetését és fejlődését. Illessze be a következő kódot egy Databricks-jegyzetfüzetcellába, és futtassa a cellát egy streamelési adatkeret raw_df
létrehozásához:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Az Azure Databricks más olvasási műveleteihez hasonlóan a streamelt olvasás konfigurálása nem tölt be adatokat. A stream megkezdése előtt aktiválnia kell egy műveletet az adatokon.
Feljegyzés
A streamelt DataFrame hívása display()
elindít egy streamelési feladatot. A legtöbb strukturált streamelési használati esetben a streamet aktiváló műveletnek adatokat kell írnia egy fogadóba. Lásd a strukturált streamelés éles szempontjait.
Stream-átalakítás végrehajtása
A strukturált streamelés az Azure Databricksben és a Spark SQL-ben elérhető legtöbb átalakítást támogatja. Az MLflow-modelleket akárUDF-ként is betöltheti, és transzformációként streamelési előrejelzéseket készíthet.
Az alábbi példakód egy egyszerű átalakítást hajt végre a betöltött JSON-adatok további információval való gazdagításához a Spark SQL-függvények használatával:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
Az eredmény lekérdezési transformed_df
utasításokat tartalmaz az adatforrásba érkező egyes rekordok betöltésére és átalakítására.
Feljegyzés
A strukturált streamelés kötetlen vagy végtelen adathalmazként kezeli az adatforrásokat. Ezért egyes átalakítások nem támogatottak a strukturált streamelési számítási feladatokban, mert végtelen számú elem rendezésére lenne szükség.
A legtöbb összesítéshez és számos illesztéshez vízjelekkel, ablakokkal és kimeneti móddal kell kezelni az állapotinformációkat. Lásd: Vízjelek alkalmazása az adatfeldolgozási küszöbértékek szabályozásához.
Növekményes kötegírás végrehajtása a Delta Lake-be
Az alábbi példa egy megadott fájlútvonallal és ellenőrzőponttal ír a Delta Lake-be.
Fontos
Mindig adjon meg egy egyedi ellenőrzőpont-helyet minden konfigurált streamíróhoz. Az ellenőrzőpont egyedi identitást biztosít a streamhez, nyomon követve a streamlekérdezéshez tartozó összes feldolgozott rekordot és állapotinformációt.
Az availableNow
eseményindító beállítása arra utasítja a strukturált streamelést, hogy dolgozza fel a forrásadatkészlet összes korábban feldolgozatlan rekordját, majd állítsa le azt, hogy biztonságosan végrehajthassa a következő kódot anélkül, hogy a stream futtatásával kellene foglalkoznia:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
Ebben a példában nem érkeznek új rekordok az adatforrásba, ezért a kód ismételt végrehajtása nem fogad be új rekordokat.
Figyelmeztetés
A strukturált streamelés végrehajtása megakadályozhatja, hogy az automatikus leállítás leállítsa a számítási erőforrásokat. A váratlan költségek elkerülése érdekében mindenképpen állítsa le a streamelési lekérdezéseket.
Adatok olvasása a Delta Lake-ből, átalakítás és írás a Delta Lake-be
A Delta Lake széles körben támogatja a strukturált streamelést forrásként és fogadóként is. Lásd a Delta-tábla streamelési olvasásait és írásait.
Az alábbi példa szintaxisa azt mutatja be, hogy egy Delta-tábla összes új rekordja növekményesen betölthető, összekapcsolható egy másik Delta-tábla pillanatképével, és egy Delta-táblába írható:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
A forrástáblák olvasásához és a céltáblákba és a megadott ellenőrzőpont-helyre való íráshoz megfelelő engedélyekkel kell rendelkeznie. Töltse ki a szögletes zárójelekkel (<>
) jelölt összes paramétert az adatforrások és fogadók vonatkozó értékeinek használatával.
Feljegyzés
A Delta Live Tables teljes deklaratív szintaxist biztosít a Delta Lake-folyamatok létrehozásához, és automatikusan kezeli az olyan tulajdonságokat, mint az eseményindítók és az ellenőrzőpontok. Lásd : Mi az a Delta Live Tables?.
Adatok olvasása a Kafkából, átalakítás és írás a Kafkába
Az Apache Kafka és más üzenetkezelési buszok biztosítják a nagy adathalmazok számára elérhető legkisebb késést. Az Azure Databricks használatával átalakításokat alkalmazhat a Kafkából betöltött adatokra, majd adatokat írhat vissza a Kafkába.
Feljegyzés
Az adatok felhőobjektum-tárolóba történő írása további késési többletterhelést eredményez. Ha adatokat szeretne tárolni egy üzenetkezelő buszból a Delta Lake-ben, de a lehető legkisebb késést szeretné megkövetelni a streamelési számítási feladatokhoz, a Databricks azt javasolja, hogy külön streamelési feladatokat konfiguráljon az adatok tóházba való betöltéséhez, és közel valós idejű átalakításokat alkalmazzon az alsóbb rétegbeli üzenetkezelő buszok fogadói számára.
Az alábbi példakód egy egyszerű mintát mutat be a Kafkából származó adatok bővítéséhez egy Delta-táblában lévő adatokkal való összekapcsolással, majd a Kafkába való visszaírással:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
A Kafka szolgáltatáshoz való hozzáféréshez megfelelő engedélyekkel kell rendelkeznie. Töltse ki a szögletes zárójelekkel (<>
) jelölt összes paramétert az adatforrások és fogadók vonatkozó értékeinek használatával. Tekintse meg az Apache Kafka és az Azure Databricks streamfeldolgozását.