Dela via


Fråga efter strömmande data

Du kan använda Azure Databricks för att fråga strömmande datakällor med structured streaming. Azure Databricks har omfattande stöd för strömningsarbetsbelastningar i Python och Scala och har stöd för de flesta funktioner för strukturerad direktuppspelning med SQL.

Följande exempel visar hur du använder en minnesmottagare för manuell kontroll av strömmande data under interaktiv utveckling i notebook-filer. På grund av radutdatabegränsningar i notebook-användargränssnittet kanske du inte ser alla data som läss av strömmande frågor. I produktionsarbetsbelastningar bör du bara utlösa strömmande frågor genom att skriva dem till en måltabell eller ett externt system.

Kommentar

SQL-stöd för interaktiva frågor om strömmande data är begränsat till notebook-filer som körs på all-purpose compute. Du kan också använda SQL när du deklarerar strömmande tabeller i Databricks SQL eller Delta Live Tables. Se Läsa in data med hjälp av strömmande tabeller i Databricks SQL och Vad är Delta Live Tables?.

Fråga efter data från strömningssystem

Azure Databricks tillhandahåller strömmande dataläsare för följande strömningssystem:

  • Kafka
  • Kinesis
  • PubSub
  • Pulsar

Du måste ange konfigurationsinformation när du initierar frågor mot dessa system, som varierar beroende på din konfigurerade miljö och det system som du väljer att läsa från. Se Konfigurera strömmande datakällor.

Vanliga arbetsbelastningar som omfattar strömningssystem är datainmatning till lakehouse och dataströmbearbetning för att sänka data till externa system. Mer information om strömmande arbetsbelastningar finns i Direktuppspelning på Azure Databricks.

Följande exempel visar en interaktiv strömningsläsning från Kafka:

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Fråga en tabell som en direktuppspelningsläsning

Azure Databricks skapar alla tabeller med Delta Lake som standard. När du utför en direktuppspelningsfråga mot en Delta-tabell hämtar frågan automatiskt nya poster när en version av tabellen har checkats in. Som standard förväntar sig strömmande frågor att källtabeller endast innehåller bifogade poster. Om du behöver arbeta med strömmande data som innehåller uppdateringar och borttagningar rekommenderar Databricks att du använder Delta Live Tables och APPLY CHANGES INTO. Se API:er för TILLÄMPA ÄNDRINGAR: Förenkla insamling av ändringsdata med Delta Live Tables.

Följande exempel visar hur du utför en interaktiv strömningsläsning från en tabell:

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Fråga efter data i molnobjektlagring med automatisk inläsning

Du kan strömma data från molnobjektlagring med hjälp av Automatisk inläsning, Azure Databricks-molndataanslutningen. Du kan använda anslutningsappen med filer som lagras i Unity Catalog-volymer eller andra lagringsplatser för molnobjekt. Databricks rekommenderar att du använder volymer för att hantera åtkomst till data i molnobjektlagring. Se Ansluta till datakällor.

Azure Databricks optimerar den här anslutningsappen för strömmande inmatning av data i molnobjektlagring som lagras i populära strukturerade, halvstrukturerade och ostrukturerade format. Databricks rekommenderar att du lagrar inmatade data i ett nästan rådataformat för att maximera dataflödet och minimera potentiell dataförlust på grund av skadade poster eller schemaändringar.

Fler rekommendationer om hur du matar in data från molnobjektlagring finns i Mata in data i ett Databricks lakehouse.

Följande exempel visar en interaktiv strömningsläsning från en katalog med JSON-filer i en volym:

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')