Delen via


Streaminggegevens ophalen in Lakehouse met gestructureerd streamen van Spark

Structured Streaming is een schaalbare en fouttolerante stroomverwerkingsengine die is gebouwd op Spark. Spark zorgt ervoor dat de streamingbewerking stapsgewijs en continu wordt uitgevoerd wanneer gegevens blijven binnenkomen.

Gestructureerd streamen werd beschikbaar in Spark 2.2. Sindsdien is het de aanbevolen methode voor het streamen van gegevens. Het fundamentele principe achter gestructureerde stream is het behandelen van een livegegevensstroom als een tabel waarin nieuwe gegevens altijd continu worden toegevoegd, zoals een nieuwe rij in een tabel. Er zijn enkele gedefinieerde ingebouwde streamingbestandsbronnen, zoals CSV, JSON, ORC, Parquet en ingebouwde ondersteuning voor berichtenservices zoals Kafka en Event Hubs.

Dit artikel biedt inzicht in het optimaliseren van de verwerking en opname van gebeurtenissen via gestructureerd streamen van Spark in productieomgevingen met hoge doorvoer. De voorgestelde benaderingen zijn:

  • Optimalisatie van doorvoer voor gegevensstreaming
  • Schrijfbewerkingen optimaliseren in de deltatabel en
  • Groepering van gebeurtenissen

Spark-taakdefinities en Spark-notebooks

Spark-notebooks zijn een uitstekend hulpprogramma voor het valideren van ideeën en het uitvoeren van experimenten om inzicht te krijgen in uw gegevens of code. Notebooks worden veel gebruikt in gegevensvoorbereiding, visualisatie, machine learning en andere big data-scenario's. Spark-taakdefinities zijn niet-interactieve codegerichte taken die gedurende lange perioden op een Spark-cluster worden uitgevoerd. Spark-taakdefinities bieden robuustheid en beschikbaarheid.

Spark-notebooks zijn uitstekende bron om de logica van uw code te testen en te voldoen aan alle zakelijke vereisten. Om het echter in een productiescenario te laten draaien, zijn Spark-taakdefinities met de Retry Policy ingeschakeld de beste oplossing.

Herhaalbeleid voor Spark-taakdefinities

In Microsoft Fabric kan de gebruiker een beleid voor opnieuw proberen instellen voor Spark-taakdefinitietaken. Hoewel het script in de taak mogelijk oneindig is, kan de infrastructuur waarop het script wordt uitgevoerd, een probleem ondervinden waarbij de taak moet worden gestopt. Of de taak kan worden geëlimineerd vanwege de behoeften aan onderhoud van de onderliggende infrastructuur. Met het beleid voor opnieuw proberen kan de gebruiker regels instellen voor het automatisch opnieuw opstarten van de taak als deze stopt vanwege onderliggende problemen. De parameters geven aan hoe vaak de taak opnieuw moet worden opgestart, tot oneindige nieuwe pogingen en het instellen van de tijd tussen nieuwe pogingen. Op die manier kunnen de gebruikers ervoor zorgen dat hun Spark-taakdefinitietaken oneindig blijven worden uitgevoerd totdat de gebruiker besluit deze te stoppen.

Streamingbronnen

Voor het instellen van streaming met Event Hubs is een basisconfiguratie vereist, waaronder de naamruimte van Event Hubs, de hubnaam, de naam van de gedeelde toegangssleutel en de consumentengroep. Een consumentengroep is een weergave van een hele Event Hub. Hiermee kunnen meerdere verbruikende toepassingen een afzonderlijke weergave van de eventstream hebben en de stream onafhankelijk in hun eigen tempo en met hun offsets lezen.

Partities vormen een essentieel onderdeel van het verwerken van een groot aantal gegevens. Eén processor heeft een beperkte capaciteit voor het verwerken van gebeurtenissen per seconde, terwijl meerdere processors een betere taak kunnen uitvoeren wanneer ze parallel worden uitgevoerd. Partities bieden de mogelijkheid om grote hoeveelheden gebeurtenissen parallel te verwerken.

Als er te veel partities worden gebruikt met een lage opnamesnelheid, verwerken partitielezers een klein deel van deze gegevens, wat niet-optimale verwerking veroorzaakt. Het ideale aantal partities is rechtstreeks afhankelijk van de gewenste verwerkingssnelheid. Als u de verwerking van gebeurtenissen wilt schalen, kunt u overwegen om meer partities toe te voegen. Er is geen specifieke doorvoerlimiet voor een partitie. De geaggregeerde doorvoer in uw naamruimte wordt echter beperkt door het aantal doorvoereenheden. Wanneer u het aantal doorvoereenheden in uw naamruimte verhoogt, kunt u extra partities gebruiken om gelijktijdige lezers de maximale doorvoer te laten bereiken.

De aanbeveling is om het beste aantal partities voor uw doorvoerscenario te onderzoeken en te testen. Maar het is gebruikelijk om scenario's met een hoge doorvoer te zien met behulp van 32 of meer partities.

Azure Event Hubs Connector voor Apache Spark (azure-event-hubs-spark) wordt aanbevolen om de Spark-toepassing te verbinden met Azure Event Hubs.

Lakehouse als streaming-afvoer

Delta Lake is een opensource-opslaglaag die ACID-transacties (atomiciteit, consistentie, isolatie en duurzaamheid) biedt boven op Data Lake Storage-oplossingen. Delta Lake biedt ook ondersteuning voor schaalbare verwerking van metagegevens, schemaontwikkeling, tijdreizen (gegevensversiebeheer), open indeling en andere functies.

In Fabric Data Engineering wordt Delta Lake gebruikt om te:

  • Voeg eenvoudig gegevens in of werk ze bij en verwijder ze met behulp van Spark SQL.
  • Compacte gegevens om de tijd die nodig is om query's uit te voeren op gegevens te minimaliseren.
  • Bekijk de status van tabellen voor en nadat bewerkingen zijn uitgevoerd.
  • Haal een geschiedenis op van bewerkingen die worden uitgevoerd op tabellen.

Delta wordt toegevoegd als een van de mogelijke indelingen voor sinks die worden gebruikt in writeStream. Voor meer informatie over de bestaande uitvoersinks, zie Spark Structured Streaming Programming Guide.

In het volgende voorbeeld ziet u hoe gegevens kunnen worden gestreamd naar Delta Lake.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 

df = spark \ 
  .readStream \ 
  .format("eventhubs") \ 
  .options(**ehConf) \ 
  .load()  

Schema = StructType([StructField("<column_name_01>", StringType(), False), 
                     StructField("<column_name_02>", StringType(), False), 
                     StructField("<column_name_03>", DoubleType(), True), 
                     StructField("<column_name_04>", LongType(), True), 
                     StructField("<column_name_05>", LongType(), True)]) 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .toTable("deltaeventstable") 

Informatie over de code die in het voorbeeld is geknipt:

  • format() is de instructie waarmee de uitvoerindeling van de gegevens wordt gedefinieerd.
  • outputMode() definieert op welke manier de nieuwe rijen in de streaming worden geschreven (dat wil gezegd, toevoegen, overschrijven).
  • toTable() bewaart de gestreamde gegevens in een Delta-tabel die is gemaakt met behulp van de waarde die als parameter is doorgegeven.

Delta-schrijfbewerkingen optimaliseren

Gegevenspartitionering is een essentieel onderdeel van het maken van een robuuste streamingoplossing: partitionering verbetert de manier waarop gegevens worden georganiseerd en verbetert ook de doorvoer. Bestanden worden eenvoudig gefragmenteerd na Delta-bewerkingen, wat resulteert in te veel kleine bestanden. En te grote bestanden zijn ook een probleem, vanwege de lange tijd om ze op de schijf te schrijven. De uitdaging met gegevenspartitionering is het vinden van het juiste evenwicht dat resulteert in optimale bestandsgrootten. Spark ondersteunt partitionering in het geheugen en op schijf. Goed gepartitioneerde gegevens kunnen de beste prestaties bieden bij het behouden van gegevens naar Delta Lake en het opvragen van gegevens uit Delta Lake.

  • Wanneer u gegevens partitioneert op schijf, kunt u kiezen hoe u de gegevens partitioneert op basis van kolommen met behulp van partitionBy(). partitionBy() is een functie die wordt gebruikt voor het partitioneren van een groot semantisch model in kleinere bestanden op basis van een of meerdere kolommen die tijdens het schrijven naar de schijf worden geleverd. Partitioneren is een manier om de prestaties van query's te verbeteren bij het werken met een groot semantisch model. Vermijd het kiezen van een kolom die te kleine of te grote partities genereert. Definieer een partitie op basis van een set kolommen met een goede kardinaliteit en splits de gegevens in bestanden met een optimale grootte.
  • Het partitioneren van gegevens in het geheugen kan worden uitgevoerd met behulp van repartition() of coalesce() transformaties, het distribueren van gegevens op meerdere werkknooppunten en het maken van meerdere taken die gegevens parallel kunnen lezen en verwerken met behulp van de basisprincipes van Resilient Distributed Dataset (RDD). Hiermee kunt u semantisch model verdelen in logische partities, die kunnen worden berekend op verschillende knooppunten van het cluster.
    • repartition() wordt gebruikt om het aantal partities in het geheugen te verhogen of te verlagen. Met repartition worden alle gegevens over het netwerk hersteld en uitgebalanceerd over alle partities.
    • coalesce() wordt alleen gebruikt om het aantal partities efficiënt te verlagen. Dat is een geoptimaliseerde versie van repartition() waarbij de verplaatsing van gegevens in alle partities lager is met behulp van coalesce().

Het combineren van beide partitioneringsmethoden is een goede oplossing in scenario's met hoge doorvoer. repartition() maakt een specifiek aantal partities in het geheugen, terwijl partitionBy() bestanden naar schijf schrijft voor elke geheugenpartitie en partitioneringskolom. Het volgende voorbeeld illustreert het gebruik van beide partitioneringsstrategieën in dezelfde Spark-taak: gegevens worden eerst gesplitst in 48 partities in het geheugen (ervan uitgaande dat we totaal 48 CPU-kernen hebben) en vervolgens gepartitioneerd op schijf op basis van twee bestaande kolommen in de nettolading.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 
import json 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Geoptimaliseerde schrijfbewerking

Een andere optie voor het optimaliseren van schrijfbewerkingen naar Delta Lake is het gebruik van geoptimaliseerde schrijfbewerkingen. Geoptimaliseerd schrijven is een optionele functie die de manier verbetert waarop gegevens naar de Delta-tabel worden geschreven. Spark voegt de partities samen of splitst de partities voordat u de gegevens schrijft, waardoor de doorvoer van gegevens die naar de schijf worden geschreven, wordt gemaximaliseerd. Er wordt echter een volledige shuffle uitgevoerd, dus voor sommige workloads kan dit de prestaties verslechteren. Taken die gebruikmaken van coalesce() en/of repartition() om gegevens op schijf te partitioneren, kunnen worden geherstructureerd om in plaats daarvan geoptimaliseerde schrijfbewerkingen te gebruiken.

De volgende code is een voorbeeld van het gebruik van geoptimaliseerde schrijfbewerkingen. PartitionBy() wordt nog steeds gebruikt.

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", true) 
 
rawData = df \ 
 .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Gebeurtenissen samenvoegen

Om het aantal bewerkingen te minimaliseren en de tijd voor het opnemen van gegevens in Delta Lake te verbeteren, is batchverwerking van gebeurtenissen een praktisch alternatief.

Triggers bepalen hoe vaak een streamingquery moet worden uitgevoerd (geactiveerd) en nieuwe gegevens moeten verzenden. Als u deze instelt, definieert u een periodiek verwerkingstijdinterval voor microbatches, verzamelt u gegevens en batchgebeurtenissen in enkele persistente bewerkingen, in plaats van de hele tijd op schijf te schrijven.

In het volgende voorbeeld ziet u een streamingquery waarin gebeurtenissen periodiek worden verwerkt in intervallen van één minuut.

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .trigger(processingTime="1 minute") \ 
  .toTable("deltaeventstable") 

Het voordeel van het combineren van batchverwerking van gebeurtenissen in schrijfbewerkingen voor Delta-tabellen is dat er grotere Delta-bestanden worden gemaakt met meer gegevens, waardoor kleine bestanden worden vermeden. U moet de hoeveelheid gegevens analyseren die worden opgenomen en de beste verwerkingstijd vinden om de grootte van de Parquet-bestanden te optimaliseren die door de Delta-bibliotheek zijn gemaakt.

Controleren

Spark 3.1 en hogere versies hebben een ingebouwde gestructureerde streaminggebruikersinterface met de volgende metrische streaminggegevens:

  • Invoersnelheid
  • Processnelheid
  • Invoerrijen
  • Batchduur
  • Duur van bewerking