Werken met gegevens in een Spark-dataframe

Voltooid

In de vorige les hebt u geleerd hoe u verbinding maakt met een gegevensbron, gegevens in een dataframe laadt en het dataframe optioneel opslaat in een lakehouse als een bestand of tabel. Laten we nu het dataframe eens nader bekijken.

Spark maakt standaard gebruik van een gegevensstructuur die een tolerante gedistribueerde gegevensset (RDD) wordt genoemd, maar hoewel u code kunt schrijven die rechtstreeks met RDD's werkt, is de meest gebruikte gegevensstructuur voor het werken met gestructureerde gegevens in Spark het dataframe, dat wordt geleverd als onderdeel van de Spark SQL-bibliotheek . Dataframes in Spark zijn vergelijkbaar met die in de alomtegenwoordige Pandas Python-bibliotheek, maar geoptimaliseerd voor gebruik in de gedistribueerde verwerkingsomgeving van Spark.

Notitie

Naast de Dataframe-API biedt Spark SQL een sterk getypeerde gegevensset-API die wordt ondersteund in Java en Scala. In deze module richten we ons op de Dataframe-API.

Gegevens laden in een dataframe

Laten we een hypothetisch voorbeeld bekijken om te zien hoe u een dataframe kunt gebruiken om met gegevens te werken. Stel dat u de volgende gegevens hebt in een door komma's gescheiden tekstbestand met de naam products.csv in de map Bestanden/gegevens in uw lakehouse:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Een schema uitstellen

In een Spark-notebook kunt u de volgende PySpark-code gebruiken om de bestandsgegevens in een dataframe te laden en de eerste tien rijen weer te geven:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Zoals u eerder hebt geleerd, wordt de %%pyspark lijn aan het begin een magie genoemd en wordt Spark verteld dat de taal die in deze cel wordt gebruikt PySpark is. In de meeste gevallen is PySpark de standaardtaal; en we blijven er over het algemeen aan vast in de voorbeelden in deze module. Voor volledigheid is hier echter de equivalente Scala-code voor het voorbeeld van de productgegevens:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

De magie %%spark wordt gebruikt om Scala op te geven. U ziet dat de Scala-implementatie van het dataframe zich op dezelfde manier gedraagt als de PySpark-versie.

Beide codevoorbeelden produceren uitvoer als volgt:

ProductID ProductName Categorie ListPrice
771 Berg-100 zilver, 38 Mountainbikes 3399.9900
772 Berg-100 zilver, 42 Mountainbikes 3399.9900
773 Berg-100 zilver, 44 Mountainbikes 3399.9900
... ... ... ...

Een expliciet schema opgeven

In het vorige voorbeeld bevatte de eerste rij van het CSV-bestand de kolomnamen en kon Spark het gegevenstype van elke kolom afleiden uit de gegevens die het bestand bevat. U kunt ook een expliciet schema opgeven voor de gegevens, wat handig is wanneer de kolomnamen niet zijn opgenomen in het gegevensbestand, zoals in dit CSV-voorbeeld:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

In het volgende PySpark-voorbeeld ziet u hoe u een schema opgeeft voor het dataframe dat moet worden geladen vanuit een bestand met de naam product-data.csv in deze indeling:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

De resultaten zouden weer vergelijkbaar zijn met:

ProductID ProductName Categorie ListPrice
771 Berg-100 zilver, 38 Mountainbikes 3399.9900
772 Berg-100 zilver, 42 Mountainbikes 3399.9900
773 Berg-100 zilver, 44 Mountainbikes 3399.9900
... ... ... ...

Tip

Als u een expliciet schema opgeeft, worden ook de prestaties verbeterd.

Gegevensframes filteren en groeperen

U kunt de methoden van de Dataframe-klasse gebruiken om de gegevens te filteren, sorteren, groeperen en anderszins te bewerken. In het volgende codevoorbeeld wordt bijvoorbeeld de select-methode gebruikt om de kolommen ProductID en ListPrice op te halen uit het df-gegevensframe met productgegevens in het vorige voorbeeld:

pricelist_df = df.select("ProductID", "ListPrice")

De resultaten uit dit codevoorbeeld zien er ongeveer als volgt uit:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Bij de meeste methoden voor gegevensbewerking wordt een nieuw dataframeobject geretourneerd.

Tip

Het selecteren van een subset kolommen uit een dataframe is een algemene bewerking, die ook kan worden bereikt met behulp van de volgende kortere syntaxis:

pricelist_df = df["ProductID", "ListPrice"]

U kunt methoden 'koppelen' om een reeks bewerkingen uit te voeren die resulteert in een getransformeerd dataframe. Met deze voorbeeldcode wordt bijvoorbeeld de selectie gekoppeld en waar methoden voor het maken van een nieuw dataframe met de kolommen ProductName en ListPrice voor producten met een categorie Mountain Bikes of Road Bikes:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

De resultaten uit dit codevoorbeeld zien er ongeveer als volgt uit:

ProductName Categorie ListPrice
Berg-100 zilver, 38 Mountainbikes 3399.9900
Road-750 zwart, 52 Racefietsen 539.9900
... ... ...

Als u gegevens wilt groeperen en aggregeren, kunt u de groupBy-methode en statistische functies gebruiken. Met de volgende PySpark-code wordt bijvoorbeeld het aantal producten voor elke categorie geteld:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

De resultaten uit dit codevoorbeeld zien er ongeveer als volgt uit:

Categorie aantal
Headsets 3
Wielen 14
Mountainbikes 32
... ...