Tutorial: Azure Data Lake Storage Gen2, Azure Databricks und Spark
In diesem Tutorial erfahren Sie, wie Sie Ihren Azure Databricks-Cluster mit Daten in einem Azure-Speicherkonto verbinden, für das Azure Data Lake Storage Gen2 aktiviert ist. Diese Verbindung ermöglicht die native Ausführung von Datenabfragen und -analysen über Ihren Cluster.
In diesem Lernprogramm lernen Sie Folgendes:
- Erfassen von unstrukturierten Daten in einem Speicherkonto
- Ausführen von Analysen für Ihre Daten in Blob Storage
Wenn Sie kein Azure-Abonnement besitzen, können Sie ein kostenloses Konto erstellen, bevor Sie beginnen.
Voraussetzungen
Erstellen eines Speicherkontos mit einem hierarchischen Namespace (Azure Data Lake Storage Gen2)
Lesen Sie die Informationen unter Erstellen eines Speicherkontos für die Verwendung mit Azure Data Lake Storage Gen2.
Vergewissern Sie sich, dass Ihrem Benutzerkonto die Rolle Mitwirkender an Storage-Blobdaten zugewiesen ist.
Installieren Sie AzCopy v10. Weitere Informationen finden Sie unter Übertragen von Daten mit AzCopy v10 (Vorschau).
Erstellen Sie einen Dienstprinzipal, erstellen Sie einen geheimen Clientschlüssel, und gewähren Sie dem Dienstprinzipal dann Zugriff auf das Speicherkonto.
Informationen dazu erhalten Sie unter Tutorial: Herstellen einer Verbindung mit Azure Data Lake Storage Gen2 (Schritte 1 bis 3). Kopieren Sie nach dem Ausführen dieser Schritte unbedingt die Werte für Mandanten-ID, App-ID und geheimen Clientschlüssel in eine Textdatei. Sie benötigen sie später in diesem Tutorial.
Erstellen eines Azure Databricks-Arbeitsbereichs, Clusters und Notebooks
Erstellen eines Azure Databricks-Arbeitsbereichs Weitere Informationen finden Sie unter Erstellen eines Azure Databricks-Arbeitsbereichs.
Erstellen eines Clusters Weitere Informationen finden Sie unter Erstellen eines Clusters.
Erstellen Sie ein Notebook. Weitere Informationen finden Sie unter Erstellen eines Notebooks. Wählen Sie Python als Standardsprache für das Notebook aus.
Lassen Sie Ihr Notebook geöffnet. Es wird in den folgenden Abschnitten verwendet.
Herunterladen der Flugdaten
In diesem Tutorial werden Flugdaten hinsichtlich der termingerechten Durchführung für Januar 2016 des Bureau of Transportation Statistics verwendet, um die Durchführung eines ETL-Vorgangs zu veranschaulichen. Sie müssen diese Daten zum Durchführen des Tutorials herunterladen.
Laden Sie die Datei On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip herunter. Diese Datei enthält die Flugdaten.
Entzippen Sie den Inhalt der ZIP-Datei, und notieren Sie sich den Namen und den Pfad der Datei. Diese Informationen werden in einem späteren Schritt benötigt.
Wenn Sie mehr über die in den Leistungsdaten zur termingerechten Berichterstellung erfassten Informationen erfahren möchten, können Sie die Feldbeschreibungen auf der Website des Bureau of Transportation Statistics anzeigen.
Daten erfassen
In diesem Abschnitt laden Sie die CSV-Flugdaten in Ihr Azure Data Lake Storage Gen2-Konto hoch und stellen dann das Speicherkonto in Ihrem Databricks-Cluster bereit. Schließlich verwenden Sie Databricks, um die CSV-Flugdaten zu lesen und sie zurück in den Speicher im Apache-Parquet-Format zu schreiben.
Hochladen der Flugdaten in Ihr Speicherkonto
Kopieren Sie mithilfe von AzCopy Ihre CSV-Datei in Ihr Azure Data Lake Storage Gen2-Konto. Verwenden Sie den azcopy make
-Befehl, um einen Container für Ihr Speicherkonto zu erstellen. Anschließend verwenden Sie den azcopy copy
-Befehl, um die CSV-Daten zu kopieren, die Sie soeben in ein Verzeichnis in diesem Container heruntergeladen haben.
In den folgenden Schritten müssen Sie Namen für den zu erstellenden Container, sowie das Verzeichnis und das BLOB eingeben, in die Sie die Flugdaten im Container hochladen möchten. Sie können die vorgeschlagenen Namen in jedem Schritt verwenden oder Ihre eigenen gemäß den Namenskonventionen für Container, Verzeichnisse und Blobs angeben.
Öffnen Sie ein Eingabeaufforderungsfenster, und geben Sie den folgenden Befehl ein, um sich bei Azure Active Directory anzumelden und auf Ihr Speicherkonto zuzugreifen.
azcopy login
Folgen Sie den Anweisungen im Eingabeaufforderungsfenster, um Ihr Benutzerkonto zu authentifizieren.
Um einen Container in Ihrem Speicherkonto zum Speichern der Flugdaten zu erstellen, geben Sie den folgenden Befehl ein:
azcopy make "https://<storage-account-name>.dfs.core.windows.net/<container-name>"
Ersetzen Sie den Platzhalterwert
<storage-account-name>
durch den Namen Ihres Speicherkontos.Ersetzen Sie den
<container-name>
-Platzhalter durch einen Namen für den Container, den Sie erstellen möchten, um die CSV-Daten zu speichern; Beispiel: flight-data-container.
Um die CSV-Daten in Ihr Speicherkonto hochzuladen (kopieren), geben Sie den folgenden Befehl ein.
azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
Ersetzen Sie den Platzhalterwert
<csv-folder-path>
durch den Pfad zu der CSV-Datei.Ersetzen Sie den Platzhalterwert
<storage-account-name>
durch den Namen Ihres Speicherkontos.Ersetzen Sie den
<container-name>
-Platzhalter durch den Namen des Containers in Ihrem Speicherkonto.Ersetzen Sie den
<directory-name>
-Platzhalter durch den Namen eines Verzeichnisses, um Ihre Daten im Container zu speichern; Beispiel: jan2016.
Bereitstellen Ihres Speicherkontos in Ihrem Databricks-Cluster
In diesem Abschnitt stellen Sie Ihren Azure Data Lake Storage Gen2-Cloudobjektspeicher im Databricks-Dateisystem (Databricks File System, DBFS) bereit. Sie verwenden das Azure AD-Dienstprinzip, das Sie zuvor für die Authentifizierung mit dem Speicherkonto erstellt haben. Weitere Informationen finden Sie unter Mount Cloud-Objektspeicher auf Azure Databricks.
Fügen Sie Ihr Notebook an Ihren Cluster an.
Wählen Sie im zuvor erstellten Notebook die Schaltfläche Verbinden in der oberen rechten Ecke der Notebook-Symbolleiste aus. Mit dieser Schaltfläche wird die Computeauswahl geöffnet. (Wenn Sie Ihr Notebook bereits mit einem Cluster verbunden haben, wird der Name dieses Clusters im Schaltflächentext anstelle von Verbinden angezeigt).
Wählen Sie aus der Dropdown-Liste „Cluster“ den zuvor erstellten Cluster aus.
Beachten Sie, dass sich der Text in der Clusterauswahl in Wird gestartet ändert. Warten Sie, bis der Cluster gestartet ist und bis der Name des Clusters in der Schaltfläche angezeigt wird, bevor Sie fortfahren.
Kopieren Sie den folgenden Codeblock, und fügen Sie ihn in die erste Zelle ein, führen Sie den Code jedoch noch nicht aus.
configs = {"fs.azure.account.auth.type": "OAuth", "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", "fs.azure.account.oauth2.client.id": "<appId>", "fs.azure.account.oauth2.client.secret": "<clientSecret>", "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token", "fs.azure.createRemoteFileSystemDuringInitialization": "true"} dbutils.fs.mount( source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>", mount_point = "/mnt/flightdata", extra_configs = configs)
In diesem Codeblock:
Ersetzen Sie in
configs
die Platzhalterwerte<appId>
,<clientSecret>
und<tenantId>
durch die Anwendungs-ID, den geheimen Clientschlüssel und die Mandanten-ID, die Sie kopiert haben, wenn Sie den Dienstprinzipal in den Voraussetzungen erstellt haben.Ersetzen Sie im
source
-URI die Platzhalterwerte<storage-account-name>
,<container-name>
und<directory-name>
durch den Namen Ihres Azure Data Lake Storage Gen2-Speicherkontos und den Namen des Containers und Verzeichnisses, den Sie beim Hochladen der Flugdaten in das Speicherkonto angegeben haben.Hinweis
Der Schemabezeichner im URI,
abfss
, weist Databricks an, den Azure Blob File System-Treiber mit Transport Layer Security (TLS) zu verwenden. Weitere Informationen zum URI finden Sie unter Verwenden des Azure Data Lake Storage Gen2-URI.
Stellen Sie sicher, dass der Cluster gestartet ist, bevor Sie fortfahren.
Drücken Sie UMSCHALT+EINGABE, um den Code in diesem Block auszuführen.
Der Container und das Verzeichnis, in das Sie die Flugdaten in Ihrem Speicherkonto hochgeladen haben, ist jetzt über den Bereitstellungspunkt /mnt/flightdata in Ihrem Notebook verfügbar.
Verwenden des Databricks-Notebooks zum Konvertieren von CSV in Parquet
Nachdem auf die CSV-Flugdaten über einen DBFS-Bereitstellungspunkt zugegriffen werden kann, können Sie einen Apache Spark-DataFrame verwenden, um sie in Ihren Arbeitsbereich zu laden und sie zurück in Ihren Azure Data Lake Storage Gen2-Objektspeicher im Apache-Parkettformat zu schreiben.
Ein Spark-DataFrame ist eine zweidimensionale, bezeichnete Datenstruktur mit Spalten potenziell unterschiedlicher Typen. Mit einem DataFrame können Sie Daten in verschiedenen unterstützten Formaten problemlos lesen und schreiben. Mit einem DataFrame können Sie Daten aus dem Cloudobjektspeicher laden und Analysen und Transformationen darin innerhalb Ihres Computeclusters durchführen, ohne dass sich dies auf die zugrunde liegenden Daten im Cloudobjektspeicher auswirkt. Weitere Informationen finden Sie unter Arbeiten mit PySpark-DataFrames in Azure Databricks.
Apache-Parquet ist ein spaltenbasiertes Dateiformat mit Optimierungen zur Beschleunigung von Abfragen. Dieses Dateiformat ist effizienter als CSV oder JSON. Weitere Informationen finden Sie unter Parquet-Dateien.
Fügen Sie zunächst im Notebook eine neue Zelle hinzu und dann den folgenden Code ein.
# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/flightdata/*.csv")
# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")
Drücken Sie UMSCHALT+EINGABE, um den Code in diesem Block auszuführen.
Bevor Sie mit dem nächsten Abschnitt fortfahren, stellen Sie sicher, dass alle Parquet-Daten geschrieben wurden, und „Fertig“ in der Ausgabe angezeigt wird.
Durchsuchen von Daten
In diesem Abschnitt verwenden Sie das Databricks-Dateisystem-Hilfsprogramm, um Ihren Azure Data Lake Storage Gen2-Objektspeicher mithilfe des DBFS-Bereitstellungspunkts zu erkunden, den Sie im vorherigen Abschnitt erstellt haben.
Fügen Sie in einer neuen Zelle den folgenden Code ein, um eine Liste der Dateien am Bereitstellungspunkt abzurufen. Der erste Befehl gibt eine Liste von Dateien und Verzeichnissen aus. Der zweite Befehl zeigt die Ausgabe im tabellarischen Format an, um das Lesen zu erleichtern.
dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))
Drücken Sie UMSCHALT+EINGABE, um den Code in diesem Block auszuführen.
Beachten Sie, dass das Parquet-Verzeichnis in der Auflistung angezeigt wird. Sie haben die CSV-Flugdaten im Parquet-Format im Verzeichnis parquet/flights (Parquet/Flüge) im vorherigen Abschnitt gespeichert. Um Dateien im Verzeichnis parquet/flights (Parquet/Flüge) auflisten zu können, fügen Sie den folgenden Code in eine neue Zelle ein, und führen Sie ihn aus:
display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))
Um eine neue Datei zu erstellen und sie auflisten zu können, fügen Sie den folgenden Code in eine neue Zelle ein, und führen Sie ihn aus:
dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))
Da Sie die Datei 1.txt in diesem Tutorial nicht benötigen, können Sie den folgenden Code in eine Zelle einfügen und ausführen, um mydirectory rekursiv zu löschen. Der Parameter True
gibt einen rekursiven Löschvorgang an.
dbutils.fs.rm("/mnt/flightdata/mydirectory", True)
Zur Vereinfachung können Sie den help-Befehl verwenden, um Details zu anderen Befehlen zu erfahren.
dbutils.fs.help("rm")
Anhand dieser Codebeispiele haben Sie die hierarchische Struktur von HDFS unter Verwendung von Daten untersucht, die in einem Speicherkonto gespeichert sind, für das Azure Data Lake Storage Gen2 aktiviert ist.
Abfragen der Daten
Als Nächstes können Sie damit beginnen, die Daten abzufragen, die Sie in Ihr Speicherkonto hochgeladen haben. Geben Sie jeden der folgenden Codeblöcke in eine neue Zelle ein, und drücken Sie auf die UMSCHALTTASTE + EINGABETASTE, um das Python-Skript auszuführen.
DataFrames bieten zahlreiche Funktionen (Auswählen von Spalten, Filtern, Verknüpfen, Aggregieren), mit denen Sie häufige Probleme bei der Datenanalyse effizient beheben können.
Um ein DataFrame aus Ihren zuvor gespeicherten Parquet-Flugdaten zu laden und einige der unterstützten Funktionen zu erkunden, geben Sie dieses Skript in eine neue Zelle ein, und führen Sie es aus.
# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")
# Print the schema of the dataframe
flight_df.printSchema()
# Print the flight database size
print("Number of flights in the database: ", flight_df.count())
# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)
# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)
# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)
Geben Sie dieses Skript in eine neue Zelle ein, um einige grundlegende Analyseabfragen für die Daten auszuführen. Sie können sich dafür entscheiden, das gesamte Skript (UMSCHALTTASTE + EINGABETASTE) auszuführen, jede Abfrage zu markieren und sie separat mit STRG + UMSCHALTTASTE + EINGABETASTE auszuführen oder jede Abfrage in eine separate Zelle einzugeben und dort auszuführen.
# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')
# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())
# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()
# List out all the airports in Texas
airports_in_texas = spark.sql(
"SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)
# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
"SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)
# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
"SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()
# List airlines by the highest percentage of delayed flights. A delayed flight is one with a departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
"CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
"CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
"SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()
Zusammenfassung
In diesem Tutorial:
Azure-Ressourcen erstellt, einschließlich eines Azure Data Lake Storage Gen2-Speicherkontos und des Azure AD-Dienstprinzipals und der zugewiesenen Berechtigungen für den Zugriff auf das Speicherkonto.
Einen Azure Databricks-Arbeitsbereich, ein Notebook und einen Computecluster erstellt.
AzCopy verwendet, um unstrukturierte CSV-Flugdaten in das Azure Data Lake Storage Gen2-Speicherkonto hochzuladen.
Funktionen des Databricks-Dateisystem-Hilfsprogramms zum Bereitstellen Ihres Azure Data Lake Storage Gen2-Speicherkontos verwendet und erkunden das hierarchische Dateisystem.
Apache Spark-DataFrames verwendet, um Ihre CSV-Flugdaten in das Apache-Parquet-Format zu transformieren und sie wieder in Ihrem Azure Data Lake Storage Gen2-Speicherkonto zu speichern.
DataFrames verwendet, um die Flugdaten zu untersuchen und eine einfache Abfrage auszuführen.
Apache Spark SQL verwendet, um die Flugdaten für die Gesamtanzahl der Flüge für jede Fluggesellschaft im Januar 2016, die Flughäfen in Texas, die Fluggesellschaften, die von Texas aus fliegen, die durchschnittliche Ankunftsverzögerung in Minuten für jede Fluggesellschaft national und den Prozentsatz der Flüge jeder Fluggesellschaft, die verspätete Abflüge oder Ankünfte haben, abzufragen.
Bereinigen von Ressourcen
Wenn Sie das Notebook beibehalten und später darauf zurückkommen möchten, empfiehlt es sich, Ihren Cluster herunterzufahren (zu beenden), um Gebühren zu vermeiden. Um den Cluster zu beenden, wählen Sie ihn in der Computeauswahl oben rechts auf der Notebooksymbolleiste aus, klicken Sie im Menü auf Beenden, und bestätigen Sie Ihre Auswahl. (Standardmäßig wird der Cluster nach 120 Minuten Leerlauf automatisch beendet.)
Wenn Sie einzelne Arbeitsbereichsressourcen wie Notebooks und Cluster löschen möchten, können Sie dies über die linke Randleiste des Arbeitsbereichs tun. Ausführliche Anweisungen finden Sie unter Löschen eines Clusters oder Löschen eines Notebooks.
Löschen Sie die Ressourcengruppe und alle dazugehörigen Ressourcen, wenn Sie sie nicht mehr benötigen. Wählen Sie hierzu im Azure-Portal die Ressourcengruppe für das Speicherkonto und den Arbeitsbereich und dann Löschen aus.