Usare Spark su file di dati

Completato

Dopo aver configurato un notebook e collegato a un cluster, è possibile usare Spark per leggere ed elaborare i file di dati. Spark supporta un'ampia gamma di formati, ad esempio CSV, JSON, Parquet, ORC, Avro e Delta, e Databricks offre connettori predefiniti per accedere ai file archiviati nell'area di lavoro, in Azure Data Lake o in Archiviazione BLOB o in altri sistemi esterni.

Il flusso di lavoro segue in genere tre passaggi:

  1. Leggere un file in un dataframe Spark usando spark.read con il formato e il percorso corretti. Durante la lettura di formati di testo non elaborati come CSV o JSON, Spark può dedurre lo schema (nomi di colonna e tipi di dati), ma questo è talvolta lento o inaffidabile. Una procedura migliore nell'ambiente di produzione consiste nel definire lo schema in modo esplicito in modo che i dati vengano caricati in modo coerente ed efficiente.

  2. Esplorare e trasformare il dataframe usando operazioni SQL o DataFrame, ad esempio filtrando le righe, selezionando colonne, aggregando valori.

  3. Scrivere nuovamente i risultati nella risorsa di archiviazione in un formato scelto.

L'uso dei file in Spark è progettato per essere coerente tra set di dati di piccole e grandi dimensioni. Lo stesso codice usato per testare un piccolo file CSV funzionerà anche su set di dati molto più grandi, poiché Spark distribuisce il lavoro nel cluster. In questo modo è più semplice aumentare le prestazioni dall'esplorazione rapida all'elaborazione dei dati più complessa.

Caricamento dei dati in un dataframe

Verrà ora esaminato un esempio ipotetico per illustrare l'uso di un dataframe per lavorare con i dati. Si supponga di avere i dati seguenti in un file di testo delimitato da virgole denominato products.csv nella cartella dati nella risorsa di archiviazione di Databricks File System (DBFS):

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

In un notebook Spark è possibile usare il codice PySpark seguente per caricare i dati in un dataframe e visualizzare le prime 10 righe:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

La %pyspark riga all'inizio viene chiamata magic e indica a Spark che il linguaggio usato in questa cella è PySpark. Di seguito è riportato il codice Scala equivalente per l'esempio dei dati dei prodotti:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

La riga magic %spark viene usata per specificare Scala.

Suggerimento

È anche possibile selezionare il linguaggio da usare per ogni cella nell'interfaccia Notebook.

Entrambi gli esempi illustrati in precedenza generano output come il seguente:

ID prodotto NomeDelProdotto Categoria Prezzo di listino
771 Mountain-100 argento, 38 Biciclette da montagna 3399,9900
772 Mountain-100 Argento, 42 Biciclette da montagna 3399,9900
773 Mountain-100 argento, 44 Biciclette da montagna 3399,9900
... ... ... ...

Specificare uno schema del dataframe

Nell'esempio precedente la prima riga del file CSV contiene i nomi delle colonne e Spark è riuscito a dedurre il tipo di dati di ogni colonna dai dati contenuti. È anche possibile specificare uno schema esplicito per i dati, che risulta utile quando i nomi delle colonne non sono inclusi nel file di dati, come nell'esempio di file CSV seguente:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

L'esempio di PySpark seguente illustra come specificare uno schema per il dataframe da caricare da un file denominato product-data.csv in questo formato:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

I risultati saranno ancora una volta simili ai seguenti:

ID prodotto NomeDelProdotto Categoria Prezzo di listino
771 Mountain-100 argento, 38 Biciclette da montagna 3399,9900
772 Mountain-100 Argento, 42 Biciclette da montagna 3399,9900
773 Mountain-100 argento, 44 Biciclette da montagna 3399,9900
... ... ... ...

Filtro e raggruppamento di dataframe

È possibile usare i metodi della classe Dataframe per filtrare, ordinare, raggruppare e modificare in altro modo i dati inclusi. Nell'esempio di codice seguente, ad esempio, viene usato il select metodo per recuperare le colonne ProductName e ListPrice dal dataframe df contenente i dati del prodotto nell'esempio precedente:

pricelist_df = df.select("ProductID", "ListPrice")

I risultati di questo esempio di codice avranno un aspetto simile al seguente:

ID prodotto Prezzo di listino
771 3399,9900
772 3399,9900
773 3399,9900
... ...

In comune con la maggior parte dei metodi di manipolazione dei dati, select restituisce un nuovo oggetto dataframe.

Suggerimento

La selezione di un subset di colonne da un dataframe è un'operazione comune, che può essere completata anche usando la sintassi più breve seguente:

pricelist_df = df["ProductID", "ListPrice"]

È possibile "concatenare" i metodi per eseguire una serie di manipolazioni che generano un dataframe trasformato. Ad esempio, questo codice di esempio concatena i select metodi e where per creare un nuovo dataframe contenente le colonne ProductName e ListPrice per i prodotti con una categoria di Mountain Bikes o Road Bikes:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

I risultati di questo esempio di codice avranno un aspetto simile al seguente:

NomeDelProdotto Prezzo di listino
Mountain-100 argento, 38 3399,9900
Road-750 Nero, taglia 52 539.9900
... ...

Per raggruppare e aggregare i dati, è possibile usare il metodo e le groupby funzioni di aggregazione. Il codice PySpark seguente conta ad esempio il numero di prodotti per ogni categoria:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

I risultati di questo esempio di codice avranno un aspetto simile al seguente:

Categoria count
Cuffie 3
Ruote 14
Biciclette da montagna 32
... ...

Nota

I dataframe Spark sono dichiarativi e non modificabili. Ogni trasformazione ,ad esempio select, filtero groupBy, crea un nuovo dataframe che rappresenta ciò che si desidera, non come viene eseguito. Questo rende il codice riutilizzabile, ottimizzabile e privo di effetti collaterali. Tuttavia, nessuna di queste trasformazioni viene eseguita fino a quando non si attiva un'azione (ad esempio, display, collect, write), a questo punto Spark esegue il piano ottimizzato completo.

Uso delle espressioni SQL in Spark

L'API Dataframe fa parte di una libreria Spark denominata Spark SQL, che consente agli analisti dei dati di usare espressioni SQL per eseguire query e modificare i dati.

Creazione di oggetti di database nel catalogo Spark

Il catalogo Spark è un metastore per oggetti dati relazionali, ad esempio viste e tabelle. Il runtime di Spark può usare il catalogo per integrare facilmente il codice scritto in qualsiasi linguaggio supportato da Spark con espressioni SQL che potrebbero risultare più naturali per alcuni analisti o sviluppatori di dati.

Uno dei modi più semplici per rendere disponibili i dati in un dataframe per l'esecuzione di query nel catalogo Spark consiste nel creare una visualizzazione temporanea, come illustrato nell'esempio di codice seguente:

df.createOrReplaceTempView("products")

Una visualizzazione è temporanea, ovvero viene eliminata automaticamente alla fine della sessione corrente. È anche possibile creare tabelle persistenti nel catalogo per definire un database su cui è possibile eseguire query usando Spark SQL.

Nota

Le tabelle del catalogo Spark non verranno esaminate in modo approfondito in questo modulo, ma vale la pena evidenziare alcuni punti chiave:

  • È possibile creare una tabella vuota usando il metodo spark.catalog.createTable. Le tabelle sono strutture di metadati che archiviano i dati sottostanti nel percorso di archiviazione associato al catalogo. L'eliminazione di una tabella comporta anche l'eliminazione dei dati sottostanti.
  • È possibile salvare un dataframe come tabella usando il relativo metodo saveAsTable.
  • È possibile creare una tabella esterna usando il spark.catalog.createExternalTable metodo . Le tabelle esterne definiscono i metadati nel catalogo, ma ottengono i dati sottostanti da una posizione di archiviazione esterna; in genere una cartella in un data lake. L'eliminazione di una tabella esterna non elimina i dati sottostanti.

Uso dell'API Spark SQL per l'esecuzione di query sui dati

È possibile usare l'API Spark SQL nel codice scritto in qualsiasi linguaggio per eseguire query sui dati nel catalogo. Ad esempio, il codice PySpark seguente usa una query SQL per restituire dati dalla vista prodotti come dataframe.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

I risultati dell'esempio di codice sono simili alla tabella seguente:

NomeDelProdotto Prezzo di listino
Mountain-100 argento, 38 3399,9900
Road-750 Nero, taglia 52 539.9900
... ...

Uso di codice SQL

L'esempio precedente ha illustrato come usare l'API Spark SQL per incorporare espressioni SQL nel codice Spark. In un notebook è anche possibile usare la riga magic %sql per eseguire codice SQL che esegue query sugli oggetti nel catalogo, come illustrato di seguito:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

L'esempio di codice SQL restituisce un set di risultati visualizzato automaticamente nel notebook come tabella, come quello seguente:

Categoria ProductCount
Bib-Shorts 3
Rack per biciclette 1
Supporti per biciclette 1
... ...