Freigeben über


Abfragen von Streamingdaten

Sie können Azure Databricks verwenden, um Streamingdatenquellen mithilfe von strukturiertem Streaming abzufragen. Azure Databricks bietet umfassende Unterstützung für Streaming-Workloads in Python und Scala und unterstützt die meisten strukturierten Streaming-Funktionen mit SQL.

Die folgenden Beispiele veranschaulichen die Verwendung einer Speichersenke zur manuellen Überprüfung von Streamingdaten während der interaktiven Entwicklung in Notebooks. Da die Zeilenausgabe in der Notebook-Benutzeroberfläche begrenzt ist, können Sie möglicherweise nicht alle Daten beobachten, die von Streamingabfragen gelesen werden. In Produktionsworkloads sollten Sie Streamingabfragen nur auslösen, indem Sie sie in eine Zieltabelle oder ein externes System schreiben.

Hinweis

Die SQL-Unterstützung für interaktive Abfragen von Streamingdaten ist auf Notebooks beschränkt, die auf universellen Rechnern laufen. Sie können SQL auch verwenden, wenn Sie Streamingtabellen in Databricks SQL oder Delta Live Tables deklarieren. Lesen Sie Laden von Daten mithilfe von Streamingtabellen in Databricks SQL und Was ist Delta Live Tables?.

Abfragen von Daten aus Streamingsystemen

Azure Databricks bietet Streamingdatenleser für die folgenden Streamingsysteme:

  • Kafka
  • Kinesis
  • PubSub
  • Pulsar

Sie müssen Konfigurationsdetails angeben, wenn Sie Abfragen für diese Systeme initialisieren, die je nach konfigurierter Umgebung und dem System variieren, aus dem Sie lesen möchten. Weitere Informationen finden Sie unter Konfigurieren von Streamingdatenquellen.

Allgemeine Workloads, die Streamingsysteme beinhalten, umfassen die Erfassung von Daten in das Lakehouse und die Streamingverarbeitung, um Daten an externe Systeme zu senken. Weitere Informationen zu Streaming-Workloads finden Sie unter Streaming auf Azure Databricks.

Die folgenden Beispiele veranschaulichen ein interaktives Streaming von Kafka:

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Abfragen einer Tabelle als Streaminglesevorgang

Azure Databricks erstellt standardmäßig alle Tabellen mit Delta Lake. Wenn Sie eine Streamingabfrage auf eine Delta-Tabelle durchführen, nimmt die Abfrage automatisch neue Datensätze auf, wenn eine Version der Tabelle übertragen wird. Standardmäßig erwarten Streamingabfragen, dass Quelltabellen nur angefügte Datensätze enthalten. Wenn Sie mit Streamingdaten arbeiten müssen, die Aktualisierungen und Löschungen enthalten, empfiehlt Databricks die Verwendung von Delta Live Tables und APPLY CHANGES INTO. Weitere Informationen finden Sie unter APPLY CHANGES-API: Vereinfachtes CDC (Change Data Capture) in Delta Live Tables.

Die folgenden Beispiele veranschaulichen das Ausführen eines interaktiven Streaming-Lesevorgangs aus einer Tabelle:

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Abfragen von Daten im Cloudobjektspeicher mit automatischem Ladeprogramm

Sie können Daten aus dem Cloudobjektspeicher mithilfe von Auto Loader, dem Azure Databricks-Clouddatenconnector streamen. Sie können den Connector mit Dateien verwenden, die in Unity Catalogvolumes oder anderen Speicherorten für Cloudobjekte gespeichert sind. Databricks empfiehlt die Verwendung von Volumes zum Verwalten des Zugriffs auf Daten im Cloudobjektspeicher. Weitere Informationen finden Sie unter Herstellen von Verbindungen mit Datenquellen.

Azure Databricks optimiert diesen Connector für die Streamingaufnahme von Daten im Cloudobjektspeicher, der in gängigen strukturierten, halbstrukturierten und unstrukturierten Formaten gespeichert ist. Databricks empfiehlt, aufgenommene Daten in einem nahezu unformatierten Format zu speichern, um den Durchsatz zu maximieren und potenzielle Datenverluste aufgrund beschädigter Datensätze oder Schemaänderungen zu minimieren.

Weitere Empfehlungen zum Aufnehmen von Daten aus dem Cloudobjektspeicher finden Sie unter Erfassen von Daten in ein Databricks Lakehouse.

Die folgenden Beispiele veranschaulichen ein interaktives Streaming aus einem Verzeichnis von JSON-Dateien in einem Volume:

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')