Sdílet prostřednictvím


Dotazování streamovaných dat

Azure Databricks můžete použít k dotazování streamovaných zdrojů dat pomocí strukturovaného streamování. Azure Databricks poskytuje rozsáhlou podporu pro úlohy streamování v Pythonu a Scalě a podporuje většinu funkcí strukturovaného streamování pomocí SQL.

Následující příklady ukazují použití jímky paměti pro ruční kontrolu streamovaných dat během interaktivního vývoje v poznámkových blocích. Vzhledem k omezením výstupu řádků v uživatelském rozhraní poznámkového bloku nemusíte sledovat všechna data přečtená streamovanými dotazy. V produkčních úlohách byste měli aktivovat pouze dotazy streamování tím, že je zapíšete do cílové tabulky nebo externího systému.

Poznámka:

Podpora SQL pro interaktivní dotazy na streamovaná data je omezená na poznámkové bloky běžící na výpočetních prostředcích pro všechny účely. SQL můžete použít také při deklarování streamovaných tabulek v Databricks SQL nebo Delta Live Tables. Viz Načtení dat pomocí streamovaných tabulek v Databricks SQL a Co je Delta Live Tables?

Dotazování dat ze streamovaných systémů

Azure Databricks poskytuje čtečky streamovaných dat pro následující systémy streamování:

  • Kafka
  • Kineze
  • PubSub
  • Pulsar

Při inicializaci dotazů na tyto systémy musíte zadat podrobnosti konfigurace, které se liší v závislosti na nakonfigurovaném prostředí a systému, ze kterého se rozhodnete číst. Viz Konfigurace streamovaných zdrojů dat.

Mezi běžné úlohy, které zahrnují systémy streamování, patří příjem dat do jezera a zpracování datových proudů za účelem jímky dat do externích systémů. Další informace o úlohách streamování najdete v tématu Streamování v Azure Databricks.

Následující příklady ukazují interaktivní čtení streamování ze systému Kafka:

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Dotazování tabulky jako čtení streamování

Azure Databricks ve výchozím nastavení vytvoří všechny tabulky pomocí Delta Lake. Když provedete streamovací dotaz na tabulku Delta, dotaz automaticky převezme nové záznamy při potvrzení verze tabulky. Ve výchozím nastavení streamované dotazy očekávají, že zdrojové tabulky budou obsahovat pouze připojené záznamy. Pokud potřebujete pracovat se streamovanými daty, která obsahují aktualizace a odstranění, doporučuje Databricks používat rozdílové živé tabulky a APPLY CHANGES INTO. Viz rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí rozdílových živých tabulek.

Následující příklady ukazují provádění interaktivního streamování načteného z tabulky:

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Dotazování dat v cloudovém úložišti objektů pomocí automatického zavaděče

Data z cloudového úložiště objektů můžete streamovat pomocí automatického zavaděče, cloudového datového konektoru Azure Databricks. Konektor můžete použít se soubory uloženými ve svazcích katalogu Unity nebo v jiných umístěních cloudového úložiště objektů. Databricks doporučuje používat svazky ke správě přístupu k datům v cloudovém úložišti objektů. Viz Připojení ke zdrojům dat.

Azure Databricks optimalizuje tento konektor pro příjem streamovaných dat v cloudovém úložišti objektů, které jsou uložené v oblíbených strukturovaných, částečně strukturovaných a nestrukturovaných formátech. Databricks doporučuje ukládat ingestovaná data v téměř nezpracované podobě, aby se maximalizovala propustnost a minimalizovala potenciální ztráta dat kvůli poškozeným záznamům nebo změnám schématu.

Další doporučení týkající se ingestování dat z cloudového úložiště objektů najdete v tématu Ingestování dat do datového jezera Databricks Lakehouse.

Následující příklady ukazují interaktivní streamování načtené z adresáře souborů JSON ve svazku:

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')