Usare i dati in un dataframe Spark

Completato

Spark usa in modalità nativa una struttura dei dati denominata RDD (Resilient Distributed Dataset), ma anche se è possibile scrivere codice che funziona direttamente con RDD, la struttura dei dati usata più comunemente per lavorare con dati strutturati in Spark è il dataframe, che viene fornito come parte della libreria Spark SQL. I dataframe in Spark sono simili a quelli disponibili nella libreria Python Pandas molto diffusa, ma sono ottimizzati per l'uso nell'ambiente di elaborazione distribuita di Spark.

Nota

Oltre all'API Dataframe, Spark SQL fornisce un'API Dataset fortemente tipizzata che è supportata in Java e Scala. In questo modulo ci si concentrerà sull'API Dataframe.

Caricamento dei dati in un dataframe

Verrà ora esaminato un esempio ipotetico per illustrare l'uso di un dataframe per lavorare con i dati. Si supponga di avere i dati seguenti in un file di testo delimitato da virgole denominato products.csv nella cartella Files/data in your lakehouse:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Inferenza di uno schema

In un notebook Spark è possibile usare il codice PySpark seguente per caricare i dati del file in un dataframe e visualizzare le prime 10 righe:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

La riga %%pyspark all'inizio è detta magic e indica a Spark che il linguaggio usato in questa cella è PySpark. È possibile selezionare il linguaggio da usare come predefinito nella barra degli strumenti dell'interfaccia del notebook e quindi usare una riga magic per eseguire l'override di tale scelta per una cella specifica. Di seguito è riportato, ad esempio, il codice Scala equivalente per i dati dei prodotti:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

La riga magic %%spark viene usata per specificare Scala.

Entrambi questi esempi di codice generano un output simile al seguente:

ProductID ProductName Category ListPrice
771 Mountain-100 argento, 38 Mountain Bikes 3399,9900
772 Mountain-100 Silver, 42 Mountain Bikes 3399,9900
773 Mountain-100 argento, 44 Mountain Bikes 3399,9900
... ... ... ...

Specifica di uno schema esplicito

Nell'esempio precedente la prima riga del file CSV contiene i nomi delle colonne e Spark è riuscito a dedurre il tipo di dati di ogni colonna dai dati contenuti. È anche possibile specificare uno schema esplicito per i dati, che risulta utile quando i nomi delle colonne non sono inclusi nel file di dati, come nell'esempio di file CSV seguente:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Nell'esempio di PySpark seguente viene illustrato come specificare uno schema per il dataframe da caricare da un file denominato product-data.csv in questo formato:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

I risultati saranno ancora una volta simili ai seguenti:

ProductID ProductName Category ListPrice
771 Mountain-100 argento, 38 Mountain Bikes 3399,9900
772 Mountain-100 Silver, 42 Mountain Bikes 3399,9900
773 Mountain-100 argento, 44 Mountain Bikes 3399,9900
... ... ... ...

Suggerimento

La specifica di uno schema esplicito migliora anche le prestazioni.

Filtro e raggruppamento di dataframe

È possibile usare i metodi della classe Dataframe per filtrare, ordinare, raggruppare e modificare in altro modo i dati inclusi. Nell'esempio di codice seguente, ad esempio, viene usato il metodo select per recuperare le colonne ProductID e ListPrice dal dataframe df contenente i dati del prodotto nell'esempio precedente:

pricelist_df = df.select("ProductID", "ListPrice")

I risultati di questo esempio di codice avranno un aspetto simile al seguente:

ProductID ListPrice
771 3399,9900
772 3399,9900
773 3399,9900
... ...

Analogamente alla maggior parte dei metodi di manipolazione dei dati, select restituisce un nuovo oggetto dataframe.

Suggerimento

La selezione di un subset di colonne da un dataframe è un'operazione comune, che può essere completata anche usando la sintassi più breve seguente:

pricelist_df = df["ProductID", "ListPrice"]

È possibile "concatenare" i metodi per eseguire una serie di manipolazioni che generano un dataframe trasformato. Questo codice di esempio concatena ad esempio i metodi select e where per creare un nuovo dataframe contenente le colonne ProductName e ListPrice per i prodotti con categoria Mountain Bikes o Road Bikes:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

I risultati di questo esempio di codice avranno un aspetto simile al seguente:

ProductName Category ListPrice
Mountain-100 argento, 38 Mountain Bikes 3399,9900
Road-750 Black, 52 Road Bikes 539.9900
... ... ...

Per raggruppare e aggregare i dati, è possibile usare il metodo groupBy e le funzioni di aggregazione. Il codice PySpark seguente conta ad esempio il numero di prodotti per ogni categoria:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

I risultati di questo esempio di codice avranno un aspetto simile al seguente:

Category numero
Headsets 3
Ruote 14
Mountain Bikes 32
... ...

Salvataggio di un dataframe

Spesso si vuole usare Spark per trasformare i dati non elaborati e salvare i risultati per un'ulteriore analisi o un'elaborazione downstream. Nell'esempio di codice seguente il dataFrame viene salvato in un file parquet nel data lake, sostituendo qualsiasi file esistente con lo stesso nome.

bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

Nota

Il formato Parquet è in genere preferito per i file di dati che verranno usati per ulteriori analisi o inserimento in un archivio analitico. Parquet è un formato molto efficiente supportato dalla maggior parte dei sistemi di analisi dei dati su larga scala. In realtà, a volte il requisito di trasformazione dei dati può essere semplicemente quello di convertire i dati da un altro formato (ad esempio CSV) a Parquet!

Partizionamento del file di output

Il partizionamento è una tecnica di ottimizzazione che consente a Spark di ottimizzare le prestazioni tra i nodi di lavoro. Puoi ottenere ulteriori miglioramenti delle prestazioni filtrando i dati nelle query eliminando l'I/O del disco che non è necessario.

Per salvare un dataframe come set di file partizionato, utilizza il metodo partitionBy durante la scrittura dei dati. L'esempio seguente salva il dataframe bikes_df (che contiene i dati del prodotto per le categorie mountain bike e biciclette da strada) e partiziona i dati per categoria:

bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

I nomi di cartella generati durante il partizionamento di un dataframe includono il nome e il valore della colonna di partizionamento in un formato column=value, quindi nell'esempio di codice viene creata una cartella denominata bike_data contenente le sottocartelle seguenti:

  • Category=Mountain Bikes
  • Category=Road Bikes

Ogni sottocartella contiene uno o più file parquet con i dati del prodotto per la categoria appropriata.

Nota

Puoi partizionare i dati in più colonne, creando una gerarchia di cartelle per ogni chiave di partizionamento. Ad esempio, è possibile partizionare i dati degli ordini di vendita per anno e mese, in modo che la gerarchia di cartelle includa una cartella per ogni valore di anno, che a sua volta contiene una sottocartella per ogni valore del mese.

Caricare dati partizionati

Quando si leggono dati partizionati in un dataframe, è possibile caricare dati da qualsiasi cartella all'interno della gerarchia specificando valori espliciti o caratteri jolly per i campi partizionati. L'esempio seguente carica i dati per i prodotti nella categoria Road Bikes :

road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))

Nota

Le colonne di partizionamento specificate nel percorso del file vengono omesse nel dataframe risultante. I risultati prodotti dalla query di esempio non includono una colonna Category . La categoria per tutte le righe sarà Road Bikes.