Jegyzet
Az oldalhoz való hozzáférés engedélyezést igényel. Próbálhatod be jelentkezni vagy könyvtárat váltani.
Az oldalhoz való hozzáférés engedélyezést igényel. Megpróbálhatod a könyvtár váltását.
Az Azure Databricks használatával lekérdezheti a streamelési adatforrásokat a strukturált streamelés használatával. Az Azure Databricks széles körű támogatást nyújt a Python és a Scala streamelési számítási feladataihoz, és támogatja az SQL legtöbb strukturált streamelési funkcióját.
Az alábbi példák azt mutatják be, hogy a jegyzetfüzetek interaktív fejlesztése során a streamelési adatok manuális ellenőrzéséhez memória-fogadót használnak. A jegyzetfüzet felhasználói felületén lévő sorkimeneti korlátok miatt előfordulhat, hogy nem figyeli meg a streamelési lekérdezések által beolvasott összes adatot. Éles számítási feladatokban a streamelési lekérdezéseket csak egy céltáblába vagy külső rendszerbe való írással kell aktiválnia.
Feljegyzés
A streamelési adatok interaktív lekérdezéseinek SQL-támogatása a teljes körű számításon futó jegyzetfüzetekre korlátozódik. Az SQL-t akkor is használhatja, ha streamelési táblákat deklarál a Databricks SQL-ben vagy a Lakeflow Spark Deklaratív folyamatokban. Lásd a streamelési táblákat és a Lakeflow Spark deklaratív folyamatait.
Adatok lekérdezése streamelési rendszerekből
Az Azure Databricks streamelési adatolvasókat biztosít a következő streamelési rendszerekhez:
- Kafka
- Kinézis
- PubSub
- Pulzár
A lekérdezések ezen rendszereken való inicializálásakor meg kell adnia a konfiguráció részleteit, amelyek a konfigurált környezettől és az olvasni kívánt rendszertől függően változnak. Lásd: Standard összekötők a Lakeflow Connectben.
A streamelési rendszereket magában foglaló gyakori számítási feladatok közé tartozik az adatbetöltés a lakehouse-ba, és a streamfeldolgozás a külső rendszerekbe való adatbetöltéshez. A streamelési számítási feladatokról további információt a strukturált streamelési fogalmakban talál.
Az alábbi példák a Kafkából származó interaktív streamelést szemléltetik:
Python
display(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'latest'
);
Táblázat lekérdezése folyamatos olvasási módban
Az Azure Databricks alapértelmezés szerint a Delta Lake használatával hozza létre az összes táblát. Ha streamelési lekérdezést futtat egy Delta táblán, a lekérdezés automatikusan hozzáadja az új rekordokat, amikor a tábla egy verziója véglegesítésre kerül. Alapértelmezés szerint a streamelési lekérdezések arra számítanak, hogy a forrástáblák csak hozzáfűzött rekordokat tartalmaznak. Ha frissítéseket és törléseket tartalmazó streamelési adatokkal kell dolgoznia, a Databricks javasolja a Lakeflow Spark deklaratív csővezetékek és AUTO CDC ... INTO használatát. Lásd az AUTO CDC API-k: Egyszerűsítse a változáskövető adatrögzítést a csővezetékekkel.
Az alábbi példák egy táblázatból származó interaktív streamelési olvasást mutatnak be:
Python
display(spark.readStream.table("table_name"))
SQL
SELECT * FROM STREAM table_name
Adatok lekérdezése a felhőobjektum-tárolóban az Automatikus betöltővel
A felhőbeli objektumtárolóból az Azure Databricks felhőalapú adatösszekötővel, az Auto Loaderrel streamelheti az adatokat. Az összekötőt Unity Catalog-kötetekben vagy más felhőobjektum-tárolóhelyeken tárolt fájlokkal használhatja. A Databricks kötetek használatát javasolja a felhőobjektum-tárolóban lévő adatokhoz való hozzáférés kezeléséhez. Lásd: Csatlakozás adatforrásokhoz és külső szolgáltatásokhoz.
Az Azure Databricks ezt az összekötőt a népszerű strukturált, részben strukturált és strukturálatlan formátumban tárolt felhőobjektum-tárolókban tárolt adatok streamelésére optimalizálja. A Databricks azt javasolja, hogy a betöltött adatokat szinte nyers formátumban tárolja az átviteli sebesség maximalizálása és a sérült rekordok vagy sémamódosítások miatti adatvesztés minimalizálása érdekében.
Az adatok felhőbeli objektumtárolóból való betöltésére vonatkozó további javaslatokért lásd: Standard összekötők a Lakeflow Connectben.
Az alábbi példák egy kötet JSON-fájljainak könyvtárából származó interaktív streamelési olvasást mutatnak be:
Python
display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SQL
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')