Usare Spark su file di dati

Completato

Uno dei vantaggi dell'uso di Spark consiste nella possibilità di scrivere ed eseguire codice in vari linguaggi di programmazione, consentendo quindi di usare le competenze di programmazione già disponibili e di usare il linguaggio più appropriato per una determinata attività. Il linguaggio predefinito in un nuovo notebook Spark di Azure Databricks è PySpark, una versione di Python ottimizzata per Spark, che viene comunemente usata da data scientist e analisti grazie al supporto avanzato per la manipolazione e la visualizzazione dei dati. È inoltre possibile usare linguaggi quali Scala, un linguaggio derivato da Java che può essere usato in modo interattivo, e SQL, una variante del linguaggio SQL comunemente usato inclusa nella libreria Spark SQL per l'uso di strutture dei dati relazionali. Gli ingegneri software possono anche creare soluzioni compilate in esecuzione su Spark con framework quali Java.

Esplorazione dei dati con dataframe

Spark usa in modalità nativa una struttura dei dati denominata RDD (Resilient Distributed Dataset), ma anche se è possibile scrivere codice che funziona direttamente con RDD, la struttura dei dati usata più comunemente per lavorare con dati strutturati in Spark è il dataframe, che viene fornito come parte della libreria Spark SQL. I dataframe in Spark sono simili a quelli disponibili nella libreria Python Pandas molto diffusa, ma sono ottimizzati per l'uso nell'ambiente di elaborazione distribuita di Spark.

Nota

Oltre all'API Dataframe, Spark SQL fornisce un'API Dataset fortemente tipizzata che è supportata in Java e Scala. In questo modulo ci si concentrerà sull'API Dataframe.

Caricamento dei dati in un dataframe

Verrà ora esaminato un esempio ipotetico per illustrare l'uso di un dataframe per lavorare con i dati. Si supponga di avere i dati seguenti in un file di testo delimitato da virgole denominato products.csv nella cartella data nello spazio di archiviazione di Databricks File System (DBFS):

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

In un notebook Spark è possibile usare il codice PySpark seguente per caricare i dati in un dataframe e visualizzare le prime 10 righe:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

La riga %pyspark all'inizio è detta magic e indica a Spark che il linguaggio usato in questa cella è PySpark. Di seguito è riportato il codice Scala equivalente per l'esempio dei dati dei prodotti:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

La riga magic %spark viene usata per specificare Scala.

Suggerimento

È anche possibile selezionare il linguaggio da usare per ogni cella nell'interfaccia Notebook.

Entrambi gli esempi illustrati in precedenza generano output come il seguente:

ProductID ProductName Categoria ListPrice
771 Mountain-100 argento, 38 Mountain Bikes 3399,9900
772 Mountain-100 Silver, 42 Mountain Bikes 3399,9900
773 Mountain-100 argento, 44 Mountain Bikes 3399,9900
... ... ... ...

Specificare uno schema del dataframe

Nell'esempio precedente la prima riga del file CSV contiene i nomi delle colonne e Spark è riuscito a dedurre il tipo di dati di ogni colonna dai dati contenuti. È anche possibile specificare uno schema esplicito per i dati, che risulta utile quando i nomi delle colonne non sono inclusi nel file di dati, come nell'esempio di file CSV seguente:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Nell'esempio di PySpark seguente viene illustrato come specificare uno schema per il dataframe da caricare da un file denominato product-data.csv in questo formato:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

I risultati saranno ancora una volta simili ai seguenti:

ProductID ProductName Categoria ListPrice
771 Mountain-100 argento, 38 Mountain Bikes 3399,9900
772 Mountain-100 Silver, 42 Mountain Bikes 3399,9900
773 Mountain-100 argento, 44 Mountain Bikes 3399,9900
... ... ... ...

Filtro e raggruppamento di dataframe

È possibile usare i metodi della classe Dataframe per filtrare, ordinare, raggruppare e modificare in altro modo i dati inclusi. L'esempio di codice seguente, ad esempio, usa il metodo select per recuperare le colonne ProductName e ListPrice dal dataframe df contenente dati relativi ai prodotti nell'esempio precedente:

pricelist_df = df.select("ProductID", "ListPrice")

I risultati di questo esempio di codice avranno un aspetto simile al seguente:

ProductID ListPrice
771 3399,9900
772 3399,9900
773 3399,9900
... ...

Analogamente alla maggior parte dei metodi di manipolazione dei dati, select restituisce un nuovo oggetto dataframe.

Suggerimento

La selezione di un subset di colonne da un dataframe è un'operazione comune, che può essere completata anche usando la sintassi più breve seguente:

pricelist_df = df["ProductID", "ListPrice"]

È possibile "concatenare" i metodi per eseguire una serie di manipolazioni che generano un dataframe trasformato. Questo codice di esempio concatena ad esempio i metodi select e where per creare un nuovo dataframe contenente le colonne ProductName e ListPrice per i prodotti con categoria Mountain Bikes o Road Bikes:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

I risultati di questo esempio di codice avranno un aspetto simile al seguente:

ProductName ListPrice
Mountain-100 argento, 38 3399,9900
Road-750 Black, 52 539.9900
... ...

Per raggruppare e aggregare i dati, è possibile usare il metodo groupBy e le funzioni di aggregazione. Il codice PySpark seguente conta ad esempio il numero di prodotti per ogni categoria:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

I risultati di questo esempio di codice avranno un aspetto simile al seguente:

Categoria numero
Headsets 3
Ruote 14
Mountain Bikes 32
... ...

Uso delle espressioni SQL in Spark

L'API Dataframe fa parte di una libreria Spark denominata Spark SQL, che consente agli analisti dei dati di usare espressioni SQL per eseguire query e modificare i dati.

Creazione di oggetti di database nel catalogo Spark

Il catalogo Spark è un metastore per oggetti dati relazionali, ad esempio viste e tabelle. Il runtime di Spark può usare il catalogo per integrare facilmente il codice scritto in qualsiasi linguaggio supportato da Spark con espressioni SQL che potrebbero risultare più naturali per alcuni analisti o sviluppatori di dati.

Uno dei modi più semplici per rendere disponibili i dati in un dataframe per l'esecuzione di query nel catalogo Spark consiste nel creare una visualizzazione temporanea, come illustrato nell'esempio di codice seguente:

df.createOrReplaceTempView("products")

Una vista è temporanea, ovvero viene eliminata automaticamente alla fine della sessione corrente. È anche possibile creare tabelle che vengono salvate in modo permanente nel catalogo per definire un database su cui è possibile eseguire query usando Spark SQL.

Nota

Le tabelle del catalogo Spark non verranno esaminate in modo approfondito in questo modulo, ma vale la pena evidenziare alcuni punti chiave:

  • È possibile creare una tabella vuota usando il metodo spark.catalog.createTable. Le tabelle sono strutture di metadati che archiviano i dati sottostanti nel percorso di archiviazione associato al catalogo. L'eliminazione di una tabella comporta anche l'eliminazione dei dati sottostanti.
  • È possibile salvare un dataframe come tabella usando il relativo metodo saveAsTable.
  • È possibile creare una tabella esterna usando il metodo spark.catalog.createExternalTable. Le tabelle esterne definiscono i metadati nel catalogo, ma ottengono i dati sottostanti da una posizione di archiviazione esterna; in genere una cartella in un data lake. L'eliminazione di una tabella esterna non comporta l'eliminazione dei dati sottostanti.

Uso dell'API Spark SQL per l'esecuzione di query sui dati

È possibile usare l'API Spark SQL nel codice scritto in qualsiasi linguaggio per eseguire query sui dati nel catalogo. Il codice PySpark seguente usa ad esempio una query SQL per restituire dati dalla vista products come dataframe.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

I risultati dell'esempio di codice sono simili alla tabella seguente:

ProductName ListPrice
Mountain-100 argento, 38 3399,9900
Road-750 Black, 52 539.9900
... ...

Uso di codice SQL

L'esempio precedente ha illustrato come usare l'API Spark SQL per incorporare espressioni SQL nel codice Spark. In un notebook è anche possibile usare la riga magic %sql per eseguire codice SQL che esegue query sugli oggetti nel catalogo, come illustrato di seguito:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

L'esempio di codice SQL restituisce un set di risultati visualizzato automaticamente nel notebook come tabella, analogamente a quanto riportato di seguito:

Categoria ProductCount
Bib-Shorts 3
Bike Racks 1
Bike Stands 1
... ...