Esercitazione: Azure Data Lake Storage Gen2, Azure Databricks e Spark

Questa esercitazione illustra come connettere un cluster di Azure Databricks ai dati archiviati in un account di archiviazione di Azure per cui è abilitato Azure Data Lake Storage Gen2. Questa connessione permette di eseguire in modo nativo query e analisi dal cluster sui dati.

Questa esercitazione illustra come:

  • Inserire dati non strutturati in un account di archiviazione
  • Eseguire analisi sui dati nell'archivio BLOB

Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.

Prerequisiti

Creare un'area di lavoro, un cluster e un notebook di Azure Databricks

  1. Creare un'area di lavoro di Azure Databricks. Vedere Creare un'area di lavoro di Azure Databricks.

  2. Creare un cluster. Vedere Creare un cluster.

  3. Creare un notebook. Vedere Creare un notebook. Scegliere Python come linguaggio predefinito del notebook.

Mantenere aperto il notebook. Viene usato nelle sezioni seguenti.

Scaricare i dati relativi ai voli

Questa esercitazione usa i dati sui voli sulle prestazioni in tempo reale per gennaio 2016 del Bureau of Transportation Statistics per illustrare come eseguire un'operazione ETL. Per completare l'esercitazione, è necessario scaricare questi dati.

  1. Scaricare il file On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip . Questo file contiene i dati di anteprima.

  2. Decomprimere il contenuto del file compresso e prendere nota del nome e del percorso del file. Queste informazioni saranno necessarie in un passaggio successivo.

Per informazioni sulle informazioni acquisite nei dati sulle prestazioni dei report in tempo reale, è possibile visualizzare le descrizioni dei campi nel sito Web Bureau of Transportation Statistics.

Inserire i dati

In questa sezione si caricano i dati di anteprima con estensione csv nell'account Azure Data Lake Archiviazione Gen2 e quindi si monta l'account di archiviazione nel cluster Databricks. Infine, si usa Databricks per leggere i dati di anteprima con estensione csv e scriverli di nuovo nell'archiviazione in formato Apache parquet.

Caricare i dati dei voli nell'account di archiviazione

Usare AzCopy per copiare il file con estensione csv nell'account Azure Data Lake Archiviazione Gen2. Usare il azcopy make comando per creare un contenitore nell'account di archiviazione. Usare quindi il azcopy copy comando per copiare i dati csv appena scaricati in una directory in tale contenitore.

Nei passaggi seguenti è necessario immettere i nomi per il contenitore da creare e la directory e il BLOB in cui si vogliono caricare i dati in anteprima nel contenitore. È possibile usare i nomi suggeriti in ogni passaggio o specificare le convenzioni di denominazione per contenitori, directory e BLOB.

  1. Aprire una finestra del prompt dei comandi e immettere il comando seguente per accedere ad Azure Active Directory per accedere all'account di archiviazione.

    azcopy login
    

    Seguire le istruzioni visualizzate nella finestra del prompt dei comandi per autenticare l'account utente.

  2. Per creare un contenitore nell'account di archiviazione per archiviare i dati di anteprima, immettere il comando seguente:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Sostituire il valore segnaposto <storage-account-name> con il nome del proprio account di archiviazione.

    • Sostituire il <container-name> segnaposto con un nome per il contenitore che si vuole creare per archiviare i dati csv, ad esempio flight-data-container.

  3. Per caricare (copiare) i dati csv nell'account di archiviazione, immettere il comando seguente.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • Sostituire il valore segnaposto <csv-folder-path> con il percorso del file CSV.

    • Sostituire il valore segnaposto <storage-account-name> con il nome del proprio account di archiviazione.

    • Sostituire il <container-name> segnaposto con il nome del contenitore nell'account di archiviazione.

    • Sostituire il <directory-name> segnaposto con il nome di una directory per archiviare i dati nel contenitore, ad esempio jan2016.

Montare l'account di archiviazione nel cluster Databricks

In questa sezione si monta l'archiviazione di oggetti cloud di Azure Data Lake Archiviazione Gen2 nel file system di Databricks (DBFS). Usare l'entità del servizio Azure AD creata in precedenza per l'autenticazione con l'account di archiviazione. Per altre informazioni, vedere Montaggio dell'archiviazione di oggetti cloud in Azure Databricks.

  1. Collegare il notebook al cluster.

    1. Nel notebook creato in precedenza selezionare il pulsante Connessione nell'angolo superiore destro della barra degli strumenti del notebook. Questo pulsante apre il selettore di calcolo. Se il notebook è già stato connesso a un cluster, il nome del cluster viene visualizzato nel testo del pulsante anziché Connessione).

    2. Nel menu a discesa del cluster selezionare il cluster creato in precedenza.

    3. Si noti che il testo nel selettore del cluster cambia per l'avvio. Attendere che il cluster finisca l'avvio e che il nome del cluster venga visualizzato nel pulsante prima di continuare.

  2. Copiare e incollare il blocco di codice seguente nella prima cella, ma non eseguirlo ancora.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. In questo blocco di codice:

    • In configssostituire i valori segnaposto <appId>, <clientSecret>e <tenantId> con l'ID applicazione, il segreto client e l'ID tenant copiati al momento della creazione dell'entità servizio nei prerequisiti.

    • Nell'URI source sostituire i <storage-account-name>valori segnaposto , <container-name>e <directory-name> con il nome dell'account di archiviazione di Azure Data Lake Archiviazione Gen2 e il nome del contenitore e della directory specificati quando sono stati caricati i dati di anteprima nell'account di archiviazione.

      Nota

      L'identificatore dello schema nell'URI, abfss, indica a Databricks di usare il driver del file system BLOB di Azure con Transport Layer Security (TLS). Per altre informazioni sull'URI, vedere Usare l'URI di Azure Data Lake Archiviazione Gen2.

  4. Assicurarsi che l'avvio del cluster sia terminato prima di procedere.

  5. Premere MAIUSC + INVIO per eseguire il codice in questo blocco.

Il contenitore e la directory in cui sono stati caricati i dati di anteprima nell'account di archiviazione sono ora accessibili nel notebook tramite il punto di montaggio / mnt/flightdata.

Usare un notebook di Databricks per convertire il formato CSV in Parquet

Ora che i dati di anteprima csv sono accessibili tramite un punto di montaggio DBFS, è possibile usare un dataframe Apache Spark per caricarli nell'area di lavoro e scriverli di nuovo in formato Apache Parquet nell'archivio oggetti di Azure Data Lake Archiviazione Gen2.

  • Un dataframe Spark è una struttura di dati con etichetta bidimensionale con colonne di tipi potenzialmente diversi. È possibile usare un dataframe per leggere e scrivere facilmente i dati in vari formati supportati. Con un dataframe è possibile caricare dati dall'archiviazione di oggetti cloud ed eseguire analisi e trasformazioni all'interno del cluster di calcolo senza influire sui dati sottostanti nell'archiviazione di oggetti cloud. Per altre informazioni, vedere Usare i dataframe PySpark in Azure Databricks.

  • Apache parquet è un formato di file a colonne con ottimizzazioni che velocizzano le query. Si tratta di un formato di file più efficiente rispetto a CSV o JSON. Per altre informazioni, vedere File Parquet.

Nel notebook aggiungere una nuova cella e incollarvi il codice seguente.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Premere MAIUSC + INVIO per eseguire il codice in questo blocco.

Prima di procedere alla sezione successiva, verificare che tutti i dati parquet siano stati scritti e che nell'output venga visualizzato "Done".

Esplorare i dati

In questa sezione si usa l'utilità file system Databricks per esplorare l'archiviazione di oggetti di Azure Data Lake Archiviazione Gen2 usando il punto di montaggio DBFS creato nella sezione precedente.

In una nuova cella incollare il codice seguente per ottenere un elenco dei file nel punto di montaggio. Il primo comando restituisce un elenco di file e directory. Il secondo comando visualizza l'output in formato tabulare per semplificare la lettura.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Premere MAIUSC + INVIO per eseguire il codice in questo blocco.

Si noti che la directory parquet viene visualizzata nell'elenco. I dati di anteprima con estensione csv sono stati salvati in formato parquet nella directory parquet/flights nella sezione precedente. Per elencare i file nella directory parquet/flights , incollare il codice seguente in una nuova cella ed eseguirlo:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Per creare un nuovo file ed elencarlo, incollare il codice seguente in una nuova cella ed eseguirlo:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Poiché non è necessario il file 1.txt in questa esercitazione, è possibile incollare il codice seguente in una cella ed eseguirlo per eliminare in modo ricorsivo mydirectory. Il True parametro indica un'eliminazione ricorsiva.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

Per praticità, è possibile usare il comando della Guida per ottenere informazioni dettagliate su altri comandi.

dbutils.fs.help("rm")

Con questi esempi di codice, è stata esaminata la natura gerarchica di HDFS usando i dati archiviati in un account di archiviazione con Azure Data Lake Archiviazione Gen2 abilitato.

Eseguire una query sui dati

È quindi possibile iniziare a eseguire query sui dati caricati nell'account di archiviazione. Immettere ognuno dei blocchi di codice seguenti in una nuova cella e premere MAIUSC + INVIO per eseguire lo script Python.

I dataframe offrono un set completo di funzioni (selezionare colonne, filtrare, unire, aggregare) che consentono di risolvere in modo efficiente i problemi comuni di analisi dei dati.

Per caricare un dataframe dai dati di anteprima Parquet salvati in precedenza ed esplorare alcune delle funzionalità supportate, immettere questo script in una nuova cella ed eseguirlo.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Immettere questo script in una nuova cella per eseguire alcune query di analisi di base sui dati. È possibile scegliere di eseguire l'intero script (MAIUSC + INVIO), evidenziare ogni query ed eseguirla separatamente con CTRL + MAIUSC + INVIO oppure immettere ogni query in una cella separata ed eseguirla in tale posizione.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Riepilogo

In questa esercitazione:

  • Risorse di Azure create, tra cui un account di archiviazione di Azure Data Lake Archiviazione Gen2 e un'entità servizio di Azure AD e autorizzazioni assegnate per accedere all'account di archiviazione.

  • Creare un'area di lavoro, un notebook e un cluster di calcolo di Azure Databricks.

  • Usato AzCopy per caricare dati di anteprima con estensione csv non strutturati nell'account di archiviazione di Azure Data Lake Archiviazione Gen2.

  • Usare le funzioni dell'utilità File System di Databricks per montare l'account di archiviazione di Azure Data Lake Archiviazione Gen2 ed esplorare il file system gerarchico.

  • Usato i dataframe apache Spark per trasformare i dati di anteprima con estensione csv in formato Apache Parquet e archiviarli di nuovo nell'account di archiviazione di Azure Data Lake Archiviazione Gen2.

  • Usata dataframe per esplorare i dati di anteprima ed eseguire una semplice query.

  • Usato Apache Spark SQL per eseguire query sui dati dei voli per il numero totale di voli per ogni compagnia aerea nel gennaio 2016, negli aeroporti in Texas, nelle compagnie aeree che volano dal Texas, nel ritardo medio di arrivo in minuti per ogni compagnia aerea a livello nazionale e nella percentuale dei voli di ogni compagnia aerea che hanno ritardato le partenza o gli arrivi.

Pulire le risorse

Se si vuole mantenere il notebook e tornare in un secondo momento, è consigliabile arrestare (terminare) il cluster per evitare addebiti. Per terminare il cluster, selezionarlo nel selettore di calcolo in alto a destra della barra degli strumenti del notebook, selezionare Termina dal menu e confermare la selezione. Per impostazione predefinita, il cluster terminerà automaticamente dopo 120 minuti di inattività.

Per eliminare singole risorse dell'area di lavoro, ad esempio notebook e cluster, è possibile farlo dalla barra laterale sinistra dell'area di lavoro. Per istruzioni dettagliate, vedere Eliminare un cluster o Eliminare un notebook.

Quando non sono più necessari, eliminare il gruppo di risorse e tutte le risorse correlate. A tale scopo, in portale di Azure selezionare il gruppo di risorse per l'account di archiviazione e l'area di lavoro e selezionare Elimina.

Passaggi successivi