Spark gebruiken om te werken met gegevensbestanden

Voltooid

Een van de voordelen van het gebruik van Spark is dat u code kunt schrijven en uitvoeren in verschillende programmeertalen, zodat u de programmeervaardigheden kunt gebruiken die u al hebt en de meest geschikte taal voor een bepaalde taak kunt gebruiken. De standaardtaal in een nieuw Azure Databricks Spark-notebook is PySpark : een door Spark geoptimaliseerde versie van Python, die vaak wordt gebruikt door gegevenswetenschappers en analisten vanwege de sterke ondersteuning voor gegevensmanipulatie en visualisatie. Daarnaast kunt u talen zoals Scala (een java-afgeleide taal die interactief kan worden gebruikt) en SQL (een variant van de veelgebruikte SQL-taal in de Spark SQL-bibliotheek gebruiken om te werken met relationele gegevensstructuren). Softwaretechnici kunnen ook gecompileerde oplossingen maken die worden uitgevoerd op Spark met behulp van frameworks zoals Java.

Gegevens verkennen met dataframes

Spark maakt standaard gebruik van een gegevensstructuur die een tolerante gedistribueerde gegevensset (RDD) wordt genoemd, maar hoewel u code kunt schrijven die rechtstreeks met RDD's werkt, is de meest gebruikte gegevensstructuur voor het werken met gestructureerde gegevens in Spark het dataframe, dat wordt geleverd als onderdeel van de Spark SQL-bibliotheek . Dataframes in Spark zijn vergelijkbaar met die in de alomtegenwoordige Pandas Python-bibliotheek, maar geoptimaliseerd voor gebruik in de gedistribueerde verwerkingsomgeving van Spark.

Notitie

Naast de Dataframe-API biedt Spark SQL een sterk getypeerde gegevensset-API die wordt ondersteund in Java en Scala. In deze module richten we ons op de Dataframe-API.

Gegevens laden in een dataframe

Laten we een hypothetisch voorbeeld bekijken om te zien hoe u een dataframe kunt gebruiken om met gegevens te werken. Stel dat u de volgende gegevens in een door komma's gescheiden tekstbestand hebt met de naam products.csv in de gegevensmap in uw DbFS-opslag (Databricks File System):

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

In een Spark-notebook kunt u de volgende PySpark-code gebruiken om de gegevens in een dataframe te laden en de eerste tien rijen weer te geven:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

De %pyspark lijn aan het begin wordt een magie genoemd en vertelt Spark dat de taal die in deze cel wordt gebruikt PySpark is. Hier volgt de equivalente Scala-code voor het voorbeeld van de productgegevens:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

De magie %spark wordt gebruikt om Scala op te geven.

Tip

U kunt ook de taal selecteren die u wilt gebruiken voor elke cel in de notebook-interface.

Beide voorbeelden die eerder werden weergegeven, zouden uitvoer als volgt produceren:

ProductID ProductName Categorie ListPrice
771 Berg-100 zilver, 38 Mountainbikes 3399.9900
772 Berg-100 zilver, 42 Mountainbikes 3399.9900
773 Berg-100 zilver, 44 Mountainbikes 3399.9900
... ... ... ...

Een dataframeschema opgeven

In het vorige voorbeeld bevatte de eerste rij van het CSV-bestand de kolomnamen en kon Spark het gegevenstype van elke kolom afleiden uit de gegevens die het bestand bevat. U kunt ook een expliciet schema opgeven voor de gegevens, wat handig is wanneer de kolomnamen niet zijn opgenomen in het gegevensbestand, zoals in dit CSV-voorbeeld:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

In het volgende PySpark-voorbeeld ziet u hoe u een schema opgeeft voor het dataframe dat moet worden geladen vanuit een bestand met de naam product-data.csv in deze indeling:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

De resultaten zouden weer vergelijkbaar zijn met:

ProductID ProductName Categorie ListPrice
771 Berg-100 zilver, 38 Mountainbikes 3399.9900
772 Berg-100 zilver, 42 Mountainbikes 3399.9900
773 Berg-100 zilver, 44 Mountainbikes 3399.9900
... ... ... ...

Gegevensframes filteren en groeperen

U kunt de methoden van de Dataframe-klasse gebruiken om de gegevens te filteren, sorteren, groeperen en anderszins te bewerken. In het volgende codevoorbeeld wordt bijvoorbeeld de select-methode gebruikt om de kolommen ProductName en ListPrice op te halen uit het df-gegevensframe met productgegevens in het vorige voorbeeld:

pricelist_df = df.select("ProductID", "ListPrice")

De resultaten uit dit codevoorbeeld zien er ongeveer als volgt uit:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Bij de meeste methoden voor gegevensbewerking wordt een nieuw dataframeobject geretourneerd.

Tip

Het selecteren van een subset kolommen uit een dataframe is een algemene bewerking, die ook kan worden bereikt met behulp van de volgende kortere syntaxis:

pricelist_df = df["ProductID", "ListPrice"]

U kunt methoden 'koppelen' om een reeks bewerkingen uit te voeren die resulteert in een getransformeerd dataframe. Met deze voorbeeldcode wordt bijvoorbeeld de selectie gekoppeld en waar methoden voor het maken van een nieuw dataframe met de kolommen ProductName en ListPrice voor producten met een categorie Mountain Bikes of Road Bikes:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

De resultaten uit dit codevoorbeeld zien er ongeveer als volgt uit:

ProductName ListPrice
Berg-100 zilver, 38 3399.9900
Road-750 zwart, 52 539.9900
... ...

Als u gegevens wilt groeperen en aggregeren, kunt u de groupBy-methode en statistische functies gebruiken. Met de volgende PySpark-code wordt bijvoorbeeld het aantal producten voor elke categorie geteld:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

De resultaten uit dit codevoorbeeld zien er ongeveer als volgt uit:

Categorie aantal
Headsets 3
Wielen 14
Mountainbikes 32
... ...

SQL-expressies gebruiken in Spark

De Dataframe-API maakt deel uit van een Spark-bibliotheek met de naam Spark SQL, waarmee gegevensanalisten SQL-expressies kunnen gebruiken om gegevens op te vragen en te bewerken.

Databaseobjecten maken in de Spark-catalogus

De Spark-catalogus is een metastore voor relationele gegevensobjecten, zoals weergaven en tabellen. De Spark-runtime kan de catalogus gebruiken om code die is geschreven in elke door Spark ondersteunde taal naadloos te integreren met SQL-expressies die mogelijk natuurlijker zijn voor sommige gegevensanalisten of ontwikkelaars.

Een van de eenvoudigste manieren om gegevens beschikbaar te maken in een dataframe voor het uitvoeren van query's in de Spark-catalogus is het maken van een tijdelijke weergave, zoals wordt weergegeven in het volgende codevoorbeeld:

df.createOrReplaceTempView("products")

Een weergave is tijdelijk, wat betekent dat deze automatisch wordt verwijderd aan het einde van de huidige sessie. U kunt ook tabellen maken die in de catalogus worden bewaard om een database te definiƫren waarop query's kunnen worden uitgevoerd met behulp van Spark SQL.

Notitie

We verkennen de Spark-catalogustabellen niet uitgebreid in deze module, maar het is de moeite waard om enkele belangrijke punten te markeren:

  • U kunt een lege tabel maken met behulp van de spark.catalog.createTable methode. Tabellen zijn metagegevensstructuren waarmee de onderliggende gegevens worden opgeslagen op de opslaglocatie die is gekoppeld aan de catalogus. Als u een tabel verwijdert, worden ook de onderliggende gegevens verwijderd.
  • U kunt een dataframe opslaan als een tabel met behulp van de saveAsTable bijbehorende methode.
  • U kunt een externe tabel maken met behulp van de spark.catalog.createExternalTable methode. Externe tabellen definiĆ«ren metagegevens in de catalogus, maar halen hun onderliggende gegevens op uit een externe opslaglocatie; meestal een map in een data lake. Als u een externe tabel verwijdert, worden de onderliggende gegevens niet verwijderd.

De Spark SQL-API gebruiken om query's uit te voeren op gegevens

U kunt de Spark SQL-API gebruiken in code die in elke taal is geschreven om query's uit te voeren op gegevens in de catalogus. De volgende PySpark-code maakt bijvoorbeeld gebruik van een SQL-query om gegevens uit de weergave producten als een dataframe te retourneren.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

De resultaten uit het codevoorbeeld lijken op de volgende tabel:

ProductName ListPrice
Berg-100 zilver, 38 3399.9900
Road-750 zwart, 52 539.9900
... ...

SQL-code gebruiken

In het vorige voorbeeld is gedemonstreerd hoe u de Spark SQL-API gebruikt om SQL-expressies in te sluiten in Spark-code. In een notebook kunt u ook de %sql magic gebruiken om SQL-code uit te voeren waarmee query's worden uitgevoerd op objecten in de catalogus, zoals deze:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

Het SQL-codevoorbeeld retourneert een resultatenset die automatisch wordt weergegeven in het notebook als een tabel, zoals hieronder:

Categorie ProductCount
Bib-Shorts 3
Fietsenrekken 1
Fietsstandaarden 1
... ...