Analizzare i dati con Spark

Completato

Uno dei vantaggi dell'uso di Spark consiste nella possibilità di scrivere ed eseguire codice in vari linguaggi di programmazione, consentendo quindi di usare le competenze di programmazione già disponibili e di usare il linguaggio più appropriato per una determinata attività. Il linguaggio predefinito in un nuovo notebook Spark di Azure Synapse Analytics è PySpark , una versione ottimizzata per Spark di Python, comunemente usata da data scientist e analisti grazie al forte supporto per la manipolazione e la visualizzazione dei dati. Inoltre, è possibile usare linguaggi come Scala (un linguaggio derivato da Java che può essere usato in modo interattivo) e SQL (una variante del linguaggio SQL comunemente usato incluso nella libreria SPARK SQL per lavorare con strutture di dati relazionali). I tecnici software possono anche creare soluzioni compilate eseguite in Spark usando framework come Java e Microsoft .NET.

Esplorazione dei dati con dataframe

In modo nativo, Spark usa una struttura di dati denominata set di dati distribuito resiliente (RDD); ma anche se è possibile scrivere codice che funziona direttamente con i set di dati RDD, la struttura dei dati più comunemente usata per l'uso di dati strutturati in Spark è il dataframe, fornito come parte della libreria SPARK SQL . I dataframe in Spark sono simili a quelli della libreria Python Pandas diffusa, ma ottimizzati per funzionare nell'ambiente di elaborazione distribuito di Spark.

Nota

Oltre all'API Dataframe, Spark SQL offre un'API set di dati fortemente tipizzata supportata in Java e Scala. In questo modulo ci si concentrerà sull'API Dataframe.

Caricamento dei dati in un dataframe

Verrà ora esaminato un esempio ipotetico per illustrare l'uso di un dataframe per lavorare con i dati. Si supponga di avere i dati seguenti in un file di testo delimitato da virgole denominato products.csv nell'account di archiviazione primario per un'area di lavoro di Azure Synapse Analytics:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

In un notebook Spark è possibile usare il codice PySpark seguente per caricare i dati in un dataframe e visualizzare le prime 10 righe:

%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

La %%pyspark riga all'inizio viene chiamata magic e indica a Spark che il linguaggio usato in questa cella è PySpark. È possibile selezionare il linguaggio da usare come predefinito nella barra degli strumenti dell'interfaccia del notebook e quindi usare una riga magic per eseguire l'override di tale scelta per una cella specifica. Di seguito è riportato, ad esempio, il codice Scala equivalente per i dati dei prodotti:

%%spark
val df = spark.read.format("csv").option("header", "true").load("abfss://container@store.dfs.core.windows.net/products.csv")
display(df.limit(10))

La riga magic %%spark viene usata per specificare Scala.

Entrambi questi esempi di codice generano un output simile al seguente:

ProductID NomeDelProdotto Categoria Prezzo di listino
771 Mountain-100 argento, 38 Biciclette da montagna 3399,9900
772 Mountain-100 Argento, 42 Biciclette da montagna 3399,9900
773 Mountain-100 argento, 44 Biciclette da montagna 3399,9900
... ... ... ...

Specificare uno schema del dataframe

Nell'esempio precedente la prima riga del file CSV contiene i nomi delle colonne e Spark è riuscito a dedurre il tipo di dati di ogni colonna dai dati contenuti. È anche possibile specificare uno schema esplicito per i dati, che risulta utile quando i nomi delle colonne non sono inclusi nel file di dati, come nell'esempio di file CSV seguente:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

L'esempio di PySpark seguente illustra come specificare uno schema per il dataframe da caricare da un file denominato product-data.csv in questo formato:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

I risultati saranno ancora una volta simili ai seguenti:

ProductID NomeDelProdotto Categoria Prezzo di listino
771 Mountain-100 argento, 38 Biciclette da montagna 3399,9900
772 Mountain-100 Argento, 42 Biciclette da montagna 3399,9900
773 Mountain-100 argento, 44 Biciclette da montagna 3399,9900
... ... ... ...

Filtro e raggruppamento di dataframe

È possibile usare i metodi della classe Dataframe per filtrare, ordinare, raggruppare e modificare in altro modo i dati inclusi. Nell'esempio di codice seguente, ad esempio, viene usato il metodo select per recuperare le colonne ProductName e ListPrice dal dataframe df contenente i dati del prodotto nell'esempio precedente:

pricelist_df = df.select("ProductID", "ListPrice")

I risultati di questo esempio di codice avranno un aspetto simile al seguente:

ProductID Prezzo di listino
771 3399,9900
772 3399,9900
773 3399,9900
... ...

In comune con la maggior parte dei metodi di manipolazione dei dati, selezionare restituisce un nuovo oggetto dataframe.

Suggerimento

La selezione di un subset di colonne da un dataframe è un'operazione comune, che può essere completata anche usando la sintassi più breve seguente:

pricelist_df = df["ProductID", "ListPrice"]

È possibile "concatenare" i metodi per eseguire una serie di manipolazioni che generano un dataframe trasformato. Ad esempio, questo codice di esempio concatena i metodi select e where per creare un nuovo dataframe contenente le colonne ProductName e ListPrice per i prodotti con una categoria di Mountain Bikes o Road Bikes:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

I risultati di questo esempio di codice avranno un aspetto simile al seguente:

NomeDelProdotto Prezzo di listino
Mountain-100 argento, 38 3399,9900
Road-750 Nero, taglia 52 539.9900
... ...

Per raggruppare e aggregare i dati, è possibile usare il metodo groupBy e le funzioni di aggregazione. Il codice PySpark seguente conta ad esempio il numero di prodotti per ogni categoria:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

I risultati di questo esempio di codice avranno un aspetto simile al seguente:

Categoria count
Cuffie 3
Ruote 14
Biciclette da montagna 32
... ...

Uso delle espressioni SQL in Spark

L'API Dataframe fa parte di una libreria Spark denominata Spark SQL, che consente agli analisti dei dati di usare espressioni SQL per eseguire query e modificare i dati.

Creazione di oggetti di database nel catalogo Spark

Il catalogo Spark è un metastore per oggetti dati relazionali, ad esempio viste e tabelle. Il runtime di Spark può usare il catalogo per integrare facilmente il codice scritto in qualsiasi linguaggio supportato da Spark con espressioni SQL che potrebbero risultare più naturali per alcuni analisti o sviluppatori di dati.

Uno dei modi più semplici per rendere disponibili i dati in un dataframe per l'esecuzione di query nel catalogo Spark consiste nel creare una visualizzazione temporanea, come illustrato nell'esempio di codice seguente:

df.createOrReplaceTempView("products")

Una visualizzazione è temporanea, ovvero viene eliminata automaticamente alla fine della sessione corrente. È anche possibile creare tabelle persistenti nel catalogo per definire un database su cui è possibile eseguire query usando Spark SQL.

Nota

Le tabelle del catalogo Spark non verranno esaminate in modo approfondito in questo modulo, ma vale la pena evidenziare alcuni punti chiave:

  • È possibile creare una tabella vuota usando il metodo spark.catalog.createTable. Le tabelle sono strutture di metadati che archiviano i dati sottostanti nel percorso di archiviazione associato al catalogo. L'eliminazione di una tabella comporta anche l'eliminazione dei dati sottostanti.
  • È possibile salvare un dataframe come tabella usando il relativo metodo saveAsTable.
  • È possibile creare una tabella esterna usando il spark.catalog.createExternalTable metodo . Le tabelle esterne definiscono i metadati nel catalogo, ma ottengono i dati sottostanti da una posizione di archiviazione esterna; in genere una cartella in un data lake. L'eliminazione di una tabella esterna non comporta l'eliminazione dei dati sottostanti.

Uso dell'API Spark SQL per l'esecuzione di query sui dati

È possibile usare l'API Spark SQL nel codice scritto in qualsiasi linguaggio per eseguire query sui dati nel catalogo. Ad esempio, il codice PySpark seguente usa una query SQL per restituire dati dalla vista prodotti come dataframe.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

I risultati dell'esempio di codice sono simili alla tabella seguente:

ProductID NomeDelProdotto Prezzo di listino
38 Mountain-100 argento, 38 3399,9900
52 Road-750 Nero, taglia 52 539.9900
... ... ...

Uso di codice SQL

L'esempio precedente ha illustrato come usare l'API Spark SQL per incorporare espressioni SQL nel codice Spark. In un notebook è anche possibile usare la riga magic %%sql per eseguire codice SQL che esegue query sugli oggetti nel catalogo, come illustrato di seguito:

%%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

L'esempio di codice SQL restituisce un set di risultati visualizzato automaticamente nel notebook come tabella, analogamente a quanto riportato di seguito:

Categoria ProductCount
Bib-Shorts 3
Rack per biciclette 1
Supporti per biciclette 1
... ...