Delta Lake gebruiken voor streaminggegevens

Voltooid

Alle gegevens die we tot nu toe hebben verkend, zijn statische gegevens in bestanden. Veel scenario's voor gegevensanalyse omvatten echter streaminggegevens die in bijna realtime moeten worden verwerkt. U moet bijvoorbeeld mogelijk leesbewerkingen vastleggen die worden verzonden door IoT-apparaten (Internet of Things) en deze opslaan in een tabel wanneer ze optreden.

Spark Structured Streaming

Een typische oplossing voor stroomverwerking omvat het voortdurend lezen van een gegevensstroom uit een bron, optioneel verwerken om specifieke velden te selecteren, aggregatie- en groepswaarden te selecteren, of de gegevens op een andere manier te manipuleren en de resultaten naar een sink te schrijven.

Spark bevat systeemeigen ondersteuning voor streaminggegevens via Spark Structured Streaming, een API die is gebaseerd op een gebonden dataframe waarin streaminggegevens worden vastgelegd voor verwerking. Een Spark Structured Streaming-gegevensframe kan gegevens lezen uit veel verschillende soorten streamingbron, waaronder netwerkpoorten, realtime berichtenbrokerservices zoals Azure Event Hubs of Kafka, of locaties van het bestandssysteem.

Streamen met Delta Lake-tabellen

U kunt een Delta Lake-tabel gebruiken als bron of een sink voor Spark Structured Streaming. U kunt bijvoorbeeld een stroom realtimegegevens van een IoT-apparaat vastleggen en de stream rechtstreeks naar een Delta Lake-tabel schrijven als sink, zodat u een query kunt uitvoeren op de tabel om de meest recente gestreamde gegevens te zien. U kunt ook een Delta-tabel lezen als streamingbron, zodat u voortdurend nieuwe gegevens kunt rapporteren terwijl deze aan de tabel worden toegevoegd.

Een Delta Lake-tabel gebruiken als streamingbron

In het volgende PySpark-voorbeeld wordt een Delta Lake-tabel gebruikt voor het opslaan van details van internetverkooporders. Er wordt een stroom gemaakt waarmee gegevens uit de tabelmap Delta Lake worden gelezen wanneer er nieuwe gegevens worden toegevoegd.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .load("/delta/internetorders")

# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.show()

Notitie

Wanneer u een Delta Lake-tabel als streamingbron gebruikt, kunnen alleen toevoegbewerkingen worden opgenomen in de stream. Gegevenswijzigingen veroorzaken een fout, tenzij u de ignoreChanges of ignoreDeletes optie opgeeft.

Nadat u de gegevens uit de Delta Lake-tabel hebt gelezen in een streaming-dataframe, kunt u de Structured Streaming-API van Spark gebruiken om deze te verwerken. In het bovenstaande voorbeeld wordt het dataframe gewoon weergegeven; Maar u kunt Spark Structured Streaming gebruiken om de gegevens over tijdelijke vensters te aggregeren (bijvoorbeeld om het aantal orders dat elke minuut is geplaatst) te tellen en de geaggregeerde resultaten naar een downstreamproces te verzenden voor bijna realtime visualisatie.

Een Delta Lake-tabel gebruiken als een streaming-sink

In het volgende PySpark-voorbeeld wordt een gegevensstroom gelezen uit JSON-bestanden in een map. De JSON-gegevens in elk bestand bevatten de status van een IoT-apparaat in de indeling {"device":"Dev1","status":"ok"} Nieuwe gegevens worden aan de stream toegevoegd wanneer een bestand wordt toegevoegd aan de map. De invoerstroom is een gegevensframe zonder grenzen, dat vervolgens in delta-indeling wordt geschreven naar een maplocatie voor een Delta Lake-tabel.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
streamFolder = '/streamingdata/'
jsonSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)

Notitie

De checkpointLocation optie wordt gebruikt om een controlepuntbestand te schrijven waarmee de status van de stroomverwerking wordt bijgehouden. Met dit bestand kunt u herstellen van een fout op het punt waar de stroomverwerking is gebleven.

Nadat het streamingproces is gestart, kunt u een query uitvoeren op de Delta Lake-tabel waarnaar de streaming-uitvoer wordt geschreven om de meest recente gegevens te zien. Met de volgende code maakt u bijvoorbeeld een catalogustabel voor de delta lake-tabelmap en voert u er query's op uit:

%sql

CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';

SELECT device, status
FROM DeviceTable;

Als u wilt stoppen met de stroom gegevens die naar de Delta Lake-tabel worden geschreven, kunt u de stop methode van de streamingquery gebruiken:

delta_stream.stop()