Megosztás:


Streamelési adatok lekérdezése

Az Azure Databricks használatával lekérdezheti a streamelési adatforrásokat a strukturált streamelés használatával. Az Azure Databricks széles körű támogatást nyújt a Python és a Scala streamelési számítási feladataihoz, és támogatja az SQL legtöbb strukturált streamelési funkcióját.

Az alábbi példák azt mutatják be, hogy a jegyzetfüzetek interaktív fejlesztése során a streamelési adatok manuális ellenőrzéséhez memória-fogadót használnak. A jegyzetfüzet felhasználói felületén lévő sorkimeneti korlátok miatt előfordulhat, hogy nem figyeli meg a streamelési lekérdezések által beolvasott összes adatot. Éles számítási feladatokban a streamelési lekérdezéseket csak egy céltáblába vagy külső rendszerbe való írással kell aktiválnia.

Feljegyzés

A streamelési adatok interaktív lekérdezéseinek SQL-támogatása a teljes körű számításon futó jegyzetfüzetekre korlátozódik. Az SQL-t akkor is használhatja, ha streamelési táblákat deklarál a Databricks SQL-ben vagy a Lakeflow Spark Deklaratív folyamatokban. Lásd a streamelési táblákat és a Lakeflow Spark deklaratív folyamatait.

Adatok lekérdezése streamelési rendszerekből

Az Azure Databricks streamelési adatolvasókat biztosít a következő streamelési rendszerekhez:

  • Kafka
  • Kinézis
  • PubSub
  • Pulzár

A lekérdezések ezen rendszereken való inicializálásakor meg kell adnia a konfiguráció részleteit, amelyek a konfigurált környezettől és az olvasni kívánt rendszertől függően változnak. Lásd: Standard összekötők a Lakeflow Connectben.

A streamelési rendszereket magában foglaló gyakori számítási feladatok közé tartozik az adatbetöltés a lakehouse-ba, és a streamfeldolgozás a külső rendszerekbe való adatbetöltéshez. A streamelési számítási feladatokról további információt a strukturált streamelési fogalmakban talál.

Az alábbi példák a Kafkából származó interaktív streamelést szemléltetik:

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Táblázat lekérdezése folyamatos olvasási módban

Az Azure Databricks alapértelmezés szerint a Delta Lake használatával hozza létre az összes táblát. Ha streamelési lekérdezést futtat egy Delta táblán, a lekérdezés automatikusan hozzáadja az új rekordokat, amikor a tábla egy verziója véglegesítésre kerül. Alapértelmezés szerint a streamelési lekérdezések arra számítanak, hogy a forrástáblák csak hozzáfűzött rekordokat tartalmaznak. Ha frissítéseket és törléseket tartalmazó streamelési adatokkal kell dolgoznia, a Databricks javasolja a Lakeflow Spark deklaratív csővezetékek és AUTO CDC ... INTO használatát. Lásd az AUTO CDC API-k: Egyszerűsítse a változáskövető adatrögzítést a csővezetékekkel.

Az alábbi példák egy táblázatból származó interaktív streamelési olvasást mutatnak be:

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Adatok lekérdezése a felhőobjektum-tárolóban az Automatikus betöltővel

A felhőbeli objektumtárolóból az Azure Databricks felhőalapú adatösszekötővel, az Auto Loaderrel streamelheti az adatokat. Az összekötőt Unity Catalog-kötetekben vagy más felhőobjektum-tárolóhelyeken tárolt fájlokkal használhatja. A Databricks kötetek használatát javasolja a felhőobjektum-tárolóban lévő adatokhoz való hozzáférés kezeléséhez. Lásd: Csatlakozás adatforrásokhoz és külső szolgáltatásokhoz.

Az Azure Databricks ezt az összekötőt a népszerű strukturált, részben strukturált és strukturálatlan formátumban tárolt felhőobjektum-tárolókban tárolt adatok streamelésére optimalizálja. A Databricks azt javasolja, hogy a betöltött adatokat szinte nyers formátumban tárolja az átviteli sebesség maximalizálása és a sérült rekordok vagy sémamódosítások miatti adatvesztés minimalizálása érdekében.

Az adatok felhőbeli objektumtárolóból való betöltésére vonatkozó további javaslatokért lásd: Standard összekötők a Lakeflow Connectben.

Az alábbi példák egy kötet JSON-fájljainak könyvtárából származó interaktív streamelési olvasást mutatnak be:

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')