Hämta strömmande data till Lakehouse med Spark-strukturerad strömning

Structured Streaming är en skalbar och feltolerant strömbearbetningsmotor som bygger på Spark. Spark tar hand om att köra strömningsåtgärden stegvis och kontinuerligt när data fortsätter att tas emot.

Strukturerad strömning blev tillgänglig i Spark 2.2. Sedan dess har det varit den rekommenderade metoden för dataströmning. Den grundläggande principen bakom strukturerad dataström är att behandla en livedataström som en tabell där nya data alltid läggs till kontinuerligt, som en ny rad i en tabell. Det finns några definierade inbyggda strömmande filkällor som CSV, JSON, ORC, Parquet och inbyggt stöd för meddelandetjänster som Kafka och Event Hubs.

Den här artikeln ger insikter om hur du optimerar bearbetning och inmatning av händelser via Spark-strukturströmning i produktionsmiljöer med högt dataflöde. De föreslagna metoderna omfattar:

  • Dataströmningsflödesoptimering
  • Optimera skrivåtgärder i deltatabellen och
  • Batchbearbetning av händelser

Spark-jobbdefinitioner och Spark-notebook-filer

Spark Notebooks är ett utmärkt verktyg för att validera idéer och utföra experiment för att få insikter från dina data eller din kod. Notebook-filer används ofta i dataförberedelser, visualisering, maskininlärning och andra stordatascenarier. Spark-jobbdefinitioner är icke-interaktiva kodorienterade uppgifter som körs i ett Spark-kluster under långa perioder. Spark-jobbdefinitioner ger robusthet och tillgänglighet.

Spark-notebook-filer är utmärkta källor för att testa kodens logik och uppfylla alla affärskrav. Men för att hålla den igång i ett produktionsscenario är Spark-jobbdefinitioner med återförsöksprincip aktiverade den bästa lösningen.

Återförsöksprincip för Spark-jobbdefinitioner

I Microsoft Fabric kan användaren ange en återförsöksprincip för Spark-jobbdefinitionsjobb. Även om skriptet i jobbet kan vara oändligt kan infrastrukturen som kör skriptet medföra ett problem som kräver att jobbet stoppas. Eller så kan jobbet elimineras på grund av underliggande infrastrukturkorrigeringsbehov. Med återförsöksprincipen kan användaren ange regler för att automatiskt starta om jobbet om det stoppas på grund av underliggande problem. Parametrarna anger hur ofta jobbet ska startas om, upp till oändliga återförsök och ange tid mellan återförsök. På så sätt kan användarna se till att deras Spark-jobbdefinitionsjobb fortsätter att köras oändligt tills användaren bestämmer sig för att stoppa dem.

Strömmande källor

För att konfigurera direktuppspelning med Event Hubs krävs grundläggande konfiguration, som inkluderar Namnområdesnamn för Event Hubs, hubbnamn, nyckelnamn för delad åtkomst och konsumentgruppen. En konsumentgrupp är en vy över en hel händelsehubb. Det gör det möjligt för flera förbrukande program att ha en separat vy över händelseströmmen och att läsa strömmen oberoende av varandra i sin egen takt och med sina förskjutningar.

Partitioner är en viktig del av att kunna hantera en stor mängd data. En enskild processor har en begränsad kapacitet för att hantera händelser per sekund, medan flera processorer kan göra ett bättre jobb när de körs parallellt. Med partitioner kan du bearbeta stora mängder händelser parallellt.

Om för många partitioner används med låg inmatningshastighet hanterar partitionsläsare en liten del av dessa data, vilket orsakar icke-optimal bearbetning. Det ideala antalet partitioner beror direkt på önskad bearbetningshastighet. Om du vill skala händelsebearbetningen kan du överväga att lägga till fler partitioner. Det finns ingen specifik dataflödesgräns för en partition. Det aggregerade dataflödet i namnområdet begränsas dock av antalet dataflödesenheter. När du ökar antalet dataflödesenheter i namnområdet kanske du vill ha extra partitioner så att samtidiga läsare kan uppnå sitt maximala dataflöde.

Rekommendationen är att undersöka och testa det bästa antalet partitioner för ditt dataflödesscenario. Men det är vanligt att se scenarier med högt dataflöde med 32 eller fler partitioner.

Azure Event Hubs Anslut or för Apache Spark (azure-event-hubs-spark) rekommenderas för att ansluta Spark-programmet till Azure Event Hubs.

Lakehouse som strömmande mottagare

Delta Lake är ett lagringslager med öppen källkod som tillhandahåller ACID-transaktioner (atomicitet, konsekvens, isolering och hållbarhet) ovanpå datasjölagringslösningar. Delta Lake stöder även skalbar metadatahantering, schemautveckling, tidsresor (dataversionshantering), öppet format och andra funktioner.

I Fabric Datateknik används Delta Lake för att:

  • Använd Spark SQL för att enkelt upsert (infoga/uppdatera) och ta bort data.
  • Komprimera data för att minimera den tid som ägnas åt att köra frågor mot data.
  • Visa tillståndet för tabeller före och efter att åtgärder har utförts.
  • Hämta en historik över åtgärder som utförs på tabeller.

Delta läggs till som ett av de möjliga formaten för utdatamottagare som används i writeStream. Mer information om befintliga utdatamottagare finns i Programmeringsguide för Spark Structured Streaming.

I följande exempel visas hur du kan strömma data till Delta Lake.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 

df = spark \ 
  .readStream \ 
  .format("eventhubs") \ 
  .options(**ehConf) \ 
  .load()  

Schema = StructType([StructField("<column_name_01>", StringType(), False), 
                     StructField("<column_name_02>", StringType(), False), 
                     StructField("<column_name_03>", DoubleType(), True), 
                     StructField("<column_name_04>", LongType(), True), 
                     StructField("<column_name_05>", LongType(), True)]) 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .toTable("deltaeventstable") 

Om koden som har klippts i exemplet:

  • format() är instruktionen som definierar utdataformatet för data.
  • outputMode() definierar på vilket sätt de nya raderna i direktuppspelningen skrivs (d.v.s. tillägg, överskrivning).
  • toTable() bevarar strömmade data till en Delta-tabell som skapats med hjälp av värdet som skickas som parameter.

Optimera Delta-skrivningar

Datapartitionering är en viktig del i skapandet av en robust strömningslösning: partitionering förbättrar hur data organiseras och förbättrar även dataflödet. Filer blir enkelt fragmenterade efter Delta-åtgärder, vilket resulterar i för många små filer. Och för stora filer är också ett problem, på grund av den långa tiden att skriva dem på disken. Utmaningen med datapartitionering är att hitta rätt balans som resulterar i optimala filstorlekar. Spark stöder partitionering i minne och på disk. Korrekt partitionerade data kan ge bästa möjliga prestanda när du bevarar data till Delta Lake och frågar efter data från Delta Lake.

  • När du partitionerar data på disken kan du välja hur du partitionerar data baserat på kolumner med hjälp av partitionBy(). partitionBy() är en funktion som används för att partitionera en stor semantisk modell i mindre filer baserat på en eller flera kolumner som tillhandahålls vid skrivning till disk. Partitionering är ett sätt att förbättra frågeprestanda när du arbetar med en stor semantisk modell. Undvik att välja en kolumn som genererar för små eller för stora partitioner. Definiera en partition baserat på en uppsättning kolumner med god kardinalitet och dela upp data i filer med optimal storlek.
  • Partitionering av data i minnet kan göras med ompartition () eller sammanslagning() transformeringar, distribuera data på flera arbetsnoder och skapa flera uppgifter som kan läsa och bearbeta data parallellt med grunderna i Resilient Distributed Dataset (RDD). Det gör det möjligt att dela upp semantisk modell i logiska partitioner, som kan beräknas på olika noder i klustret.
    • repartition() används för att öka eller minska antalet partitioner i minnet. Ompartitionen omdelar hela data över nätverket och balanserar dem över alla partitioner.
    • coalesce() används bara för att minska antalet partitioner effektivt. Det är en optimerad version av repartition() där dataflytten mellan alla partitioner är lägre med hjälp av coalesce().

Att kombinera båda partitioneringsmetoderna är en bra lösning i scenariot med högt dataflöde. repartition() skapar ett visst antal partitioner i minnet, medan partitionBy() skriver filer till disk för varje minnespartition och partitioneringskolumn. I följande exempel visas användningen av båda partitioneringsstrategierna i samma Spark-jobb: data delas först upp i 48 partitioner i minnet (förutsatt att vi har totalt 48 CPU-kärnor) och partitioneras sedan på disk baserat på två befintliga kolumner i nyttolasten.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 
import json 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Optimerad skrivning

Ett annat alternativ för att optimera skrivningar till Delta Lake är att använda optimerad skrivning. Optimerad skrivning är en valfri funktion som förbättrar hur data skrivs till Delta-tabellen. Spark sammanfogar eller delar partitionerna innan du skriver data, vilket maximerar dataflödet som skrivs till disken. Det medför dock fullständig blandning, så för vissa arbetsbelastningar kan det orsaka en prestandaförsämring. Jobb som använder coalesce() och/eller repartition() för att partitionera data på disk kan omstruktureras för att börja använda optimerad skrivning i stället.

Följande kod är ett exempel på användningen av optimerad skrivning. Observera att partitionBy() fortfarande används.

spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", true) 
 
rawData = df \ 
 .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Batchbearbetningshändelser

För att minimera antalet åtgärder för att förbättra den tid som ägnas åt att mata in data i Delta Lake är batchningshändelser ett praktiskt alternativ.

Utlösare definierar hur ofta en strömmande fråga ska köras (utlöses) och genererar nya data. Om du konfigurerar dem definieras ett periodiskt bearbetningstidsintervall för mikrobatcher, ackumulera data och batchbearbetningshändelser till få bestående åtgärder, i stället för att skriva till disken hela tiden.

I följande exempel visas en direktuppspelningsfråga där händelser bearbetas regelbundet med intervall på en minut.

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .trigger(processingTime="1 minute") \ 
  .toTable("deltaeventstable") 

Fördelen med att kombinera batchbearbetning av händelser i Delta-tabellskrivningsåtgärder är att den skapar större Delta-filer med mer data i dem, vilket undviker små filer. Du bör analysera mängden data som matas in och hitta den bästa bearbetningstiden för att optimera storleken på Parquet-filerna som skapats av Delta-biblioteket.

Övervakning

Spark 3.1 och senare versioner har ett inbyggt strukturerat direktuppspelningsgränssnitt som innehåller följande strömningsmått:

  • Indatahastighet
  • Processhastighet
  • Indatarader
  • Batchvaraktighet
  • Varaktighet för åtgärden