Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Der Echtzeitmodus ermöglicht das Ultra-Low-Latenz-Streaming mit End-to-End-Latenz so wenig wie fünf Millisekunden, wodurch es ideal für betriebsfähige Workloads wie Betrugserkennung und Echtzeitpersonalisierung ist. Dieses Lernprogramm führt Sie durch das Einrichten Ihrer ersten Echtzeitstreamingabfrage mithilfe eines einfachen Beispiels.
Konzeptionelle Informationen zum Echtzeitmodus, wann sie verwendet werden sollen, und unterstützte Features finden Sie im Echtzeitmodus im strukturierten Streaming. Konfigurationsanforderungen finden Sie unter Einrichten des Echtzeitmodus.
Anforderungen
Bevor Sie beginnen, stellen Sie sicher, dass Sie über Berechtigungen zum Erstellen eines klassischen Computeclusters verfügen, der die im Echtzeitmodus angegebene Konfiguration verwendet. Wenden Sie sich alternativ an Ihren Arbeitsbereichsadministrator, um einen Echtzeitmoduscluster für Sie zu erstellen.
Schritt 1: Erstellen eines Notizbuchs
Notizbücher bieten eine interaktive Umgebung zum Entwickeln und Testen von Streamingabfragen. Sie verwenden dieses Notizbuch, um Ihre Echtzeitabfrage zu schreiben und die kontinuierliche Aktualisierung der Ergebnisse zu beobachten.
So erstellen Sie ein Notizbuch:
- Klicken Sie in der Randleiste auf "Neu ", und klicken Sie dann auf das
Notizbuch.
- Wählen Sie im Dropdownmenü "Berechnen" Ihren Cluster für den Echtzeitmodus aus.
- Wählen Sie Python oder Scala als Standardsprache aus.
Schritt 2: Ausführen einer Echtzeitmodusabfrage
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine Notizbuchzelle ein, und führen Sie ihn aus. In diesem Beispiel wird eine Zinsquelle verwendet, die Zeilen mit einer bestimmten Rate generiert und die Ergebnisse in Echtzeit anzeigt.
Hinweis
Die display Funktion mit realTime Trigger ist in Databricks Runtime 17.1 und höher verfügbar.
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())
Nach dem Ausführen des Codes wird eine Tabelle angezeigt, die in Echtzeit aktualisiert wird, wenn neue Zeilen generiert werden. Die Tabelle zeigt eine timestamp Spalte und eine value Spalte an, die mit jeder Zeile erhöht wird.
Grundlegendes zum Code
Der obige Code veranschaulicht die wesentlichen Komponenten einer Echtzeitstreamingabfrage. In den folgenden Tabellen werden die wichtigsten Parameter und deren Steuerung erläutert:
Python
| Parameter | Beschreibung |
|---|---|
format("rate") |
Verwendet die Ratequelle, eine integrierte Quelle, die Zeilen mit einer konfigurierbaren Rate generiert. Dies ist nützlich für Tests ohne externe Abhängigkeiten. |
numPartitions |
Legt die Anzahl der Partitionen für die generierten Daten fest. |
rowsPerSecond |
Steuert, wie viele Zeilen pro Sekunde generiert werden. |
realTime="5 minutes" |
Aktiviert den Echtzeitmodus. Das Intervall gibt an, wie oft der Fortschritt der Abfrage-Checkpoints nachverfolgt wird. Längere Intervalle bedeuten weniger häufige Prüfpunkte, aber potenziell längere Wiederherstellungszeiten nach Fehlern. |
outputMode="update" |
Der Echtzeitmodus erfordert den Updateausgabemodus. |
Scala
| Parameter | Beschreibung |
|---|---|
format("rate") |
Verwendet die Ratequelle, eine integrierte Quelle, die Zeilen mit einer konfigurierbaren Rate generiert. Dies ist nützlich für Tests ohne externe Abhängigkeiten. |
numPartitions |
Legt die Anzahl der Partitionen für die generierten Daten fest. |
rowsPerSecond |
Steuert, wie viele Zeilen pro Sekunde generiert werden. |
Trigger.RealTime() |
Aktiviert den Echtzeitmodus mit dem Standardprüfpunktintervall. Sie können auch ein Intervall angeben, z. B Trigger.RealTime("5 minutes"). . |
OutputMode.Update() |
Der Echtzeitmodus erfordert den Updateausgabemodus. |
Schritt 3: Überprüfen der Ergebnisse
Wenn Sie die Abfrage ausführen, erstellt die display Funktion eine Tabelle, die in Echtzeit aktualisiert wird, während die Zinsquelle neue Zeilen generiert. Jede Zeile enthält:
- Ein Zeitstempel für den Zeitpunkt, zu dem die Zeile von der Zinsquelle generiert wurde.
- Ein monotonisch ansteigender Zähler, der mit jeder neuen Zeile inkrementiert wird.
Die Tabelle wird kontinuierlich mit minimaler Latenz aktualisiert und veranschaulicht, wie Echtzeitmodus Daten verarbeitet, sobald sie verfügbar ist. Dies ist der Kernvorteil des Echtzeitmodus – die Möglichkeit, Daten sofort zu sehen und zu reagieren, anstatt auf die Batchverarbeitung zu warten.
Weitere Ressourcen
Nachdem Sie ihre erste Echtzeitabfrage ausgeführt haben, erkunden Sie diese Ressourcen zum Erstellen von Produktionsstreaminganwendungen mit Kafka, Kinesis und anderen unterstützten Quellen: