Použití Delta Lake pro streamování dat

Dokončeno

Všechna data, která jsme v tomto okamžiku prozkoumali, jsou statická data v souborech. Mnoho scénářů analýzy dat ale zahrnuje streamovaná data, která se musí zpracovávat téměř v reálném čase. Můžete například potřebovat zachytit čtení vygenerovaná zařízeními IoT (Internet-of-things) a uložit je do tabulky, když k nim dojde.

Strukturované streamování Sparku

Typickým řešením pro zpracování datových proudů je neustálé čtení datového proudu ze zdroje, případně jeho zpracování za účelem výběru konkrétních polí, agregace a seskupení nebo jiné manipulace s daty a zápis výsledků do jímky.

Spark zahrnuje nativní podporu streamování dat prostřednictvím strukturovaného streamování Sparku, rozhraní API založené na neomezeném datovém rámci, ve kterém se streamovaná data zaznamenávají ke zpracování. Datový rámec strukturovaného streamování Sparku může číst data z mnoha různých druhů zdroje streamování, včetně síťových portů, služeb zprostředkování zpráv v reálném čase, jako jsou Azure Event Hubs nebo Kafka, nebo umístění systému souborů.

Tip

Další informace o strukturovaném streamování Sparku najdete v průvodci programováním strukturovaného streamování v dokumentaci sparku.

Streamování s tabulkami Delta Lake

Tabulku Delta Lake můžete použít jako zdroj nebo jímku pro strukturované streamování Sparku. Můžete například zachytit datový proud dat v reálném čase ze zařízení IoT a zapsat stream přímo do tabulky Delta Lake jako jímku – umožníte dotazování na tabulku, abyste viděli nejnovější streamovaná data. Nebo můžete číst tabulku Delta jako zdroj streamování, která vám umožní neustále hlásit nová data při jejich přidání do tabulky.

Použití tabulky Delta Lake jako zdroje streamování

V následujícím příkladu PySpark se k ukládání podrobností o internetových prodejních objednávkách používá tabulka Delta Lake. Vytvoří se datový proud, který čte data ze složky tabulky Delta Lake, protože se přidají nová data.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .load("/delta/internetorders")

# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.show()

Poznámka:

Při použití tabulky Delta Lake jako zdroje streamování je možné do streamu zahrnout pouze operace připojení . Úpravy dat způsobí chybu, pokud nezadáte ignoreChanges nebo ignoreDeletes možnost.

Po načtení dat z tabulky Delta Lake do streamovaného datového rámce můžete ke zpracování použít rozhraní API strukturovaného streamování Sparku. V předchozím příkladu se datový rámec jednoduše zobrazí; Strukturované streamování Sparku ale můžete použít k agregaci dat v časových oknech (například ke spočítání počtu objednávek zadaných každou minutu) a odeslání agregovaných výsledků do podřízeného procesu pro vizualizaci téměř v reálném čase.

Použití tabulky Delta Lake jako jímky streamování

V následujícím příkladu PySpark se stream dat načítá ze souborů JSON ve složce. Data JSON v každém souboru obsahují stav pro zařízení IoT ve formátu {"device":"Dev1","status":"ok"} Nová data se při každém přidání souboru do složky přidají do datového proudu. Vstupní datový proud je bezhraničí datový rámec, který se pak zapisuje v rozdílovém formátu do umístění složky pro tabulku Delta Lake.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
streamFolder = '/streamingdata/'
jsonSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)

Poznámka:

Tato checkpointLocation možnost se používá k zápisu souboru kontrolního bodu, který sleduje stav zpracování datových proudů. Tento soubor vám umožní zotavit se z selhání v okamžiku, kdy zpracování datového proudu skončilo.

Po spuštění procesu streamování můžete dotazovat tabulku Delta Lake, do které se zapisuje výstup streamování, abyste viděli nejnovější data. Například následující kód vytvoří tabulku katalogu pro složku tabulky Delta Lake a dotazuje se na ni:

%sql

CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';

SELECT device, status
FROM DeviceTable;

Pokud chcete zastavit zápis streamu dat do tabulky Delta Lake, můžete použít stop metodu streamovacího dotazu:

delta_stream.stop()

Tip

Další informace o používání tabulek Delta Lake pro streamovaná data najdete v tématu Čtení a zápisy streamování tabulek v dokumentaci k Delta Lake.