Anmerkung
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen, dich anzumelden oder die Verzeichnisse zu wechseln.
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen , die Verzeichnisse zu wechseln.
Eine Streamingtabelle ist eine Delta-Tabelle mit zusätzlicher Unterstützung für Streaming oder inkrementelle Datenverarbeitung. Eine Streamingtabelle kann von einem oder mehreren Datenströmen in einer Pipeline adressiert werden.
Streamingtabellen sind aus folgenden Gründen eine gute Wahl für die Datenaufnahme:
- Jede Eingabezeile wird nur einmal behandelt, wodurch die überwiegende Mehrheit der Erfassungsworkloads modelliert wird (d. a. indem Zeilen an eine Tabelle angefügt oder heraufgestrigt werden).
- Sie können große Mengen von Daten verarbeiten, die nur hinzugefügt werden können.
Streamingtabellen sind aus folgenden Gründen auch eine gute Wahl für Streamingtransformationen mit geringer Latenz:
- Grund für Zeilen und Zeitfenster
- Große Datenmengen verarbeiten
- Geringe Latenz
Das folgende Diagramm veranschaulicht, wie Streamingtabellen funktionieren.
Bei jeder Aktualisierung lesen die Flüsse, die einer Streamingtabelle zugeordnet sind, die geänderten Informationen in einer Streamingquelle und fügen neue Informationen an diese Tabelle an.
Streamingtabellen werden von einer einzelnen Pipeline definiert und aktualisiert. Sie definieren Streamingtabellen explizit im Quellcode der Pipeline. Tabellen, die von einer Pipeline definiert sind, können von keiner anderen Pipeline geändert oder aktualisiert werden. Sie können mehrere Flüsse definieren, die an eine einzelne Streamingtabelle angefügt werden sollen.
Hinweis
Wenn Sie eine Streamingtabelle außerhalb einer Pipeline erstellen, erstellt Azure Databricks mithilfe von Databricks SQL eine Pipeline, die zum Aktualisieren der Tabelle verwendet wird. Sie können die Pipeline anzeigen, indem Sie Aufträge und Pipelines aus dem linken Navigationsbereich in Ihrem Arbeitsbereich auswählen. Sie können der Ansicht die Spalte " Pipelinetyp" hinzufügen. Streamingtabellen, die in einer Pipeline definiert sind, haben den Typ ETL. Streamingtabellen, die in Databricks SQL erstellt werden, haben den Typ MV/ST.
Weitere Informationen zu Flows finden Sie unter Daten inkrementell laden und verarbeiten mit Lakeflow Spark deklarativen Pipeline-Flows.
Streamingtabellen für die Aufnahme
Streamingtabellen sind nur für Anfügedatenquellen vorgesehen und verarbeiten Eingaben nur einmal.
Das folgende Beispiel zeigt, wie Sie eine Streamingtabelle verwenden, um neue Dateien aus dem Cloudspeicher aufzunehmen.
Python
from pyspark import pipelines as dp
# create a streaming table
@dp.table
def customers_bronze():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/path/to/files")
)
Zum Erstellen einer Streamingtabelle muss die Datasetdefinition ein Datenstromtyp sein. Wenn Sie die spark.readStream Funktion in einer Datasetdefinition verwenden, wird ein Streaming-Dataset zurückgegeben.
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files(
"/volumes/path/to/files",
format => "json"
);
Streamingtabellen erfordern Streaming-Datensätze. Das STREAM Schlüsselwort vor read_files der Abfrage weist die Abfrage an, das Dataset als Datenstrom zu behandeln.
Weitere Informationen zum Laden von Daten in die Streamingtabelle finden Sie unter Laden von Daten in Pipelines.
Das folgende Diagramm veranschaulicht, wie Nur-Anfüge-Streamingtabellen funktionieren.
Eine Zeile, die bereits an eine Streamingtabelle angefügt wurde, wird nicht mit späteren Aktualisierungen der Pipeline erneut abgefragt. Wenn Sie die Abfrage ändern (z. B. von SELECT LOWER (name) zu SELECT UPPER (name)), werden vorhandene Zeilen nicht in Großbuchstaben aktualisiert, aber neue Zeilen sind Großbuchstaben. Sie können eine vollständige Aktualisierung auslösen, um alle vorherigen Daten aus der Quelltabelle neu abzufragen. Dadurch werden alle Zeilen in der Streamingtabelle aktualisiert.
Streamingtabellen und Streaming mit geringer Latenz
Streamingtabellen sind für Streaming mit geringer Latenz über den gebundenen Zustand ausgelegt. Streamingtabellen verwenden die Prüfpunktverwaltung, wodurch sie gut für Streaming mit geringer Latenz geeignet sind. Sie erwarten jedoch Ströme, die natürlich gebunden oder mit einem Wasserzeichen begrenzt sind.
Ein natürlich gebundener Datenstrom wird von einer Streamingdatenquelle erzeugt, die einen klar definierten Start- und End-Vorgang aufweist. Ein Beispiel für einen natürlich gebundenen Datenstrom ist das Lesen von Daten aus einem Verzeichnis mit Dateien, in denen keine neuen Dateien hinzugefügt werden, nachdem ein anfänglicher Batch von Dateien platziert wurde. Der Datenstrom wird als gebunden betrachtet, da die Anzahl der Dateien endlich ist und dann der Datenstrom endet, nachdem alle Dateien verarbeitet wurden.
Sie können auch ein Wasserzeichen verwenden, um einen Strom zu binden. Ein Wasserzeichen in Spark Structured Streaming ist ein Mechanismus, mit dem späte Daten verarbeitet werden können, indem angegeben wird, wie lange das System auf verzögerte Ereignisse warten soll, bevor das Zeitfenster als abgeschlossen betrachtet wird. Ein ungebundener Datenstrom ohne Wasserzeichen kann dazu führen, dass eine Pipeline aufgrund von Speicherdruck fehlschlägt.
Weitere Informationen zur zustandsbehafteten Datenstromverarbeitung finden Sie unter Optimieren der zustandsbehafteten Verarbeitung mit Wasserzeichen.
Stream-Snapshot-Verknüpfungen
Stream-Snapshot-Verknüpfungen sind Verknüpfungen zwischen einem Datenstrom und einer Dimension, von der beim Start von Datenströmen eine Momentaufnahme erstellt wird. Diese Verknüpfungen werden nicht neu kompensiert, wenn sich die Dimension nach dem Start des Datenstroms ändert, da die Dimensionstabelle als Momentaufnahme behandelt wird und Änderungen an der Dimensionstabelle nach dem Start des Datenstroms nicht wiedergegeben werden, es sei denn, Sie laden die Dimensionstabelle neu oder aktualisieren sie. Dies ist ein vernünftiges Verhalten, wenn Sie kleine Abweichungen in einer Verknüpfung akzeptieren können. Beispielsweise ist eine ungefähre Verknüpfung akzeptabel, wenn die Anzahl der Transaktionen viele Größenordnungen größer als die Anzahl der Kunden ist.
Im folgenden Codebeispiel verknüpfen wir eine Dimensionstabelle, Kunden mit zwei Zeilen mit einem immer größer werdenden Dataset, Transaktionen. Wir materialisieren eine Verknüpfung zwischen diesen beiden Datasets in einer Tabelle mit dem Namen .We materialize a join between these two datasets in a table called sales_report. Beachten Sie, dass, wenn ein externer Prozess die Kundentabelle aktualisiert, indem eine neue Zeile (customer_id=3, name=Zoya) hinzugefügt wird, diese neue Zeile NICHT in der Verknüpfung vorhanden ist, weil die statische Dimensionstabelle zu dem Zeitpunkt aufgenommen wurde, als die Datenströme gestartet wurden.
from pyspark import pipelines as dp
@dp.temporary_view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
return spark.readStream.table("transactions")
# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dp.temporary_view
def v_customers():
return spark.read.table("customers")
@dp.table
def sales_report():
facts = spark.readStream.table("v_transactions")
dims = spark.read.table("v_customers")
return facts.join(dims, on="customer_id", how="inner")
Einschränkungen der Streamingtabelle
Streamingtabellen haben die folgenden Einschränkungen:
-
Begrenzte Entwicklung: Sie können die Abfrage ändern, ohne das gesamte Dataset neu zu komputieren. Ohne vollständige Aktualisierung sieht eine Streamingtabelle nur einmal jede Zeile, sodass unterschiedliche Abfragen unterschiedliche Zeilen verarbeitet haben. Wenn Sie beispielsweise zu einem Feld in der Abfrage hinzufügen
UPPER(), werden nur Zeilen nach der Änderung in Großbuchstaben verarbeitet. Dies bedeutet, dass Sie alle vorherigen Versionen der Abfrage kennen müssen, die in Ihrem Dataset ausgeführt werden. Um vorhandene Zeilen erneut zu verarbeiten, die vor der Änderung verarbeitet wurden, ist eine vollständige Aktualisierung erforderlich. - Zustandsverwaltung: Streamingtabellen haben eine geringe Latenz, sodass Sie sicherstellen müssen, dass die Datenströme, die sie betreiben, natürlich gebunden oder mit Wasserzeichen gebunden sind. Weitere Informationen finden Sie unter Optimierung der zustandsbehafteten Verarbeitung mit Wasserzeichen.
- Verknüpfungen werden nicht neu berechnet: Verknüpfungen in Streamingtabellen werden nicht neu berechnet, wenn sich Dimensionen ändern. Diese Eigenschaft kann für "schnelle, aber falsche" Szenarien gut sein. Wenn Ihre Ansicht immer korrekt sein soll, können Sie eine materialisierte Ansicht verwenden. Materialisierte Ansichten sind immer richtig, da sie Verknüpfungen automatisch neu kompensieren, wenn sich Dimensionen ändern. Weitere Informationen finden Sie unter Materialisierte Ansichten.