Tutorial: Laden und Transformieren von Daten mithilfe von Apache Spark-DataFrames
In diesem Tutorial wird erläutert, wie Sie Daten mithilfe der Apache Spark Python-DataFrame-API (PySpark), der Apache Spark Scala-DataFrame-API und der SparkR-SparkDataFrame-API in Azure Databricks laden und transformieren.
Am Ende dieses Tutorials verstehen Sie, was ein DataFrame ist, und können die folgenden Aufgaben sicher ausführen:
Python
- Definieren von Variablen und Kopieren öffentlicher Daten in ein Unity Catalog-Volume
- Erstellen eines Datenrahmens mit Python
- Laden von Daten aus einer CSV-Datei in einen DataFrame
- Anzeigen von und Interagieren mit Datenrahmen
- Speichern eines DataFrames
- Ausführen von SQL-Abfragen in PySpark
Lesen Sie auch die Referenz zur Apache Spark PySpark-API.
Scala
- Definieren von Variablen und Kopieren öffentlicher Daten in ein Unity Catalog-Volume
- Erstellen eines DataFrames mit Scala
- Laden von Daten aus einer CSV-Datei in einen DataFrame
- Anzeigen von und Interagieren mit DataFrames
- Speichern eines DataFrames
- Ausführen von SQL-Abfragen in Apache Spark
Lesen Sie auch die Referenz zur Apache Spark Scala-API.
R
- Definieren von Variablen und Kopieren öffentlicher Daten in ein Unity Catalog-Volume
- Erstellen eines SparkR-SparkDataFrames
- Laden von Daten aus einer CSV-Datei in einen DataFrame
- Anzeigen von und Interagieren mit Datenrahmen
- Speichern eines DataFrames
- Ausführen von SQL-Abfragen in SparkR
Weitere Informationen finden Sie auch in der Apache SparkR-API-Referenz.
Was ist ein DataFrame?
Ein Dataframe ist eine zweidimensionale, bezeichnete Datenstruktur mit Spalten potenziell unterschiedlicher Typen. Sie können sich einen Dataframe wie eine Tabelle, eine SQL-Tabelle oder ein Wörterbuch mit Reihenobjekten vorstellen. Apache Spark DataFrames bieten zahlreiche Funktionen (Auswählen von Spalten, Filtern, Verknüpfen, Aggregieren), mit denen Sie häufige Probleme bei der Datenanalyse effizient beheben können.
Apache Spark DataFrames sind eine Abstraktion, die auf resilienten verteilten Datasets (Resilient Distributed Datasets, RDDs) basiert. Spark DataFrames und Spark SQL verwenden ein einheitliches Planungs- und Optimierungsmodul, mit dem Sie eine nahezu identische Leistung über alle unterstützten Sprachen in Azure Databricks (Python, SQL, Scala und R) hinweg erzielen können.
Anforderungen
Um das folgende Tutorial abzuschließen, müssen die folgenden Anforderungen erfüllt sein:
In Ihrem Arbeitsbereich muss Unity Catalog aktiviert sein, damit Sie die Beispiele in diesem Tutorial verwenden können.
In den Beispielen in diesem Tutorial wird ein Unity Catalog-Volume zum Speichern von Beispieldaten verwendet. Wenn Sie diese Beispiele verwenden möchten, erstellen Sie ein Volume, und verwenden Sie die Katalog-, Schema- und Volumenamen dieses Volumes, um den von den Beispielen verwendeten Volumepfad festzulegen.
Sie müssen in Unity Catalog über die folgenden Berechtigungen verfügen:
READ VOLUME
undWRITE VOLUME
oderALL PRIVILEGES
für das in diesem Tutorial verwendete VolumeUSE SCHEMA
oderALL PRIVILEGES
für das in diesem Tutorial verwendete SchemaUSE CATALOG
oderALL PRIVILEGES
für den in diesem Tutorial verwendeten Katalog
Wenden Sie sich zum Festlegen dieser Berechtigungen an Ihren Databricks-Administrator, oder lesen Sie den Artikel Unity Catalog-Berechtigungen und sicherungsfähige Objekte.
Tipp
Ein vollständiges Notebook für diesen Artikel finden Sie unter DataFrame-Tutorialnotebooks.
Schritt 1: Definieren von Variablen und Laden der CSV-Datei
In diesem Schritt werden Variablen für die Verwendung in diesem Tutorial definiert. Anschließend wird eine CSV-Datei mit Babynamendaten aus health.data.ny.gov in Ihr Unity Catalog-Volume geladen.
Öffnen Sie ein neues Notebook, indem Sie auf das Symbol klicken. Informationen zum Navigieren in Azure Databricks-Notebooks finden Sie unter Databricks-Notebookschnittstelle und -steuerelemente.
Kopieren Sie den folgenden Code, und fügen Sie ihn in die neue leere Notebookzelle ein. Ersetzen Sie
<catalog-name>
,<schema-name>
und<volume-name>
durch die Katalog-, Schema- und Volumenamen für ein Unity Catalog-Volume. Ersetzen Sie<table_name>
durch einen Tabellennamen Ihrer Wahl. Im weiteren Verlauf dieses Tutorials werden Sie Babynamendaten in diese Tabelle laden.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
Drücken Sie
Shift+Enter
, um die Zelle auszuführen und eine neue leere Zelle zu erstellen.Kopieren Sie den folgenden Code, und fügen Sie ihn in die neue leere Notebookzelle ein. Dieser Code kopiert mithilfe des Databricks-Befehls „dbutuils“ die
rows.csv
-Datei aus health.data.ny.gov in Ihr Unity Catalog-Volume.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Schritt 2: Erstellen eines DataFrames
In diesem Schritt wird ein DataFrame namens df1
mit Testdaten erstellt. Anschließend wird der Inhalt angezeigt.
Kopieren Sie den folgenden Code, und fügen Sie ihn in die neue leere Notebookzelle ein. Dieser Code erstellt den DataFrame mit Testdaten und zeigt dann den Inhalt und das Schema des DataFrames an.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Schritt 3: Laden von Daten aus einer CSV-Datei in einen DataFrame
In diesem Schritt wird ein DataFrame namens df_csv
aus der CSV-Datei erstellt, die Sie zuvor in Ihr Unity Catalog-Volume geladen haben. Weitere Informationen finden Sie unter spark.read.csv.
Kopieren Sie den folgenden Code, und fügen Sie ihn in die neue leere Notebookzelle ein. Dieser Code lädt Babynamendaten aus der CSV-Datei in den DataFrame
df_csv
und zeigt dann den Inhalt des DataFrames an.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Sie können Daten aus vielen unterstützten Dateiformaten laden.
Schritt 4: Anzeigen und Interagieren mit dem DataFrame
Mithilfe der folgenden Methoden können Sie die Babynamen-DataFrames anzeigen und mit diesen interagieren.
Ausgeben des DataFrame-Schemas
Hier erfahren Sie, wie Sie das Schema eines Apache Spark-DataFrames anzeigen. Apache Spark verwendet den Begriff Schema, um auf die Namen und Datentypen der Spalten im DataFrame zu verweisen.
Hinweis
Azure Databricks verwendet ebenfalls den Begriff „Schema“, um auf eine Sammlung der für einen Katalog registrierten Tabellen zu verweisen.
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code zeigt das Schema Ihrer DataFrames mit der
.printSchema()
-Methode zum Anzeigen der Schemas der beiden DataFrames, um die Vereinigung der beiden DataFrames vorzubereiten.Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Umbenennen einer Spalte im DataFrame
Hier erfahren Sie, wie Sie eine Spalte in einem DataFrame umbenennen.
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code benennt eine Spalte im
df1_csv
-DataFrame so um, dass sie mit der entsprechenden Spalte imdf1
-DataFrame übereinstimmt. Dieser Code verwendet die Apache Spark-MethodewithColumnRenamed()
.Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Kombinieren von DataFrames
Hier erfahren Sie, wie Sie einen neuen DataFrame erstellen, der die Zeilen eines DataFrames zu einem anderen hinzufügt.
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code verwendet die Apache Spark-Methode
union()
, um die Inhalte Ihres ersten DataFramesdf
mit DataFramedf_csv
zu kombinieren, der die aus der CSV-Datei geladenen Babynamendaten enthält.Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Filtern von Zeilen in einem DataFrame
Sie können die beliebtesten Babynamen in Ihrem Dataset ermitteln, indem Sie die Zeilen mithilfe der Apache Spark-Methoden .filter()
oder .where()
filtern. Mithilfe der Filterung können Sie eine Teilmenge von Zeilen auswählen, um sie in einem DataFrame zurückzugeben oder zu ändern. Wie die folgende Beispiele zeigen, gibt es keinen Unterschied bei der Leistung oder Syntax.
Verwenden der Methode „.filter()“
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code verwendet die Apache Spark-Methode
.filter()
, um diese Zeilen im DataFrame mit einem Wert über 50 anzuzeigen.Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Verwenden der Methode „.where()“
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code verwendet die Apache Spark-Methode
.where()
, um diese Zeilen im DataFrame mit einem Wert über 50 anzuzeigen.Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Auswählen von Spalten aus einem DataFrame und Sortieren nach Häufigkeit
Hier erfahren Sie, welche Babynamenhäufigkeit mit der select()
-Methode zum Angeben der Spalten aus dem DataFrame zurückgegeben werden soll. Verwenden Sie die Apache Spark-Funktionen orderby
und desc
, um die Ergebnisse zu sortieren.
Das pyspark.sql-Modul für Apache Spark bietet Unterstützung für SQL-Funktionen. Zu den in diesem Tutorial verwendeten Funktionen zählen die Apache Spark-Funktionen orderBy()
, desc()
und expr()
. Sie können die Verwendung dieser Funktionen aktivieren, indem Sie sie nach Bedarf in Ihre Sitzung importieren.
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code importiert die
desc()
-Funktion und verwendet dann die Apache Spark-Methodeselect()
sowie die Apache Spark-FunktionenorderBy()
unddesc()
, um die am häufigsten verwendeten Namen und deren Anzahl in absteigender Reihenfolge anzuzeigen.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Erstellen eines Teilmengen-DataFrames
Hier erfahren Sie, wie Sie einen Teilmenge-DataFrame aus einem vorhandenen DataFrame erstellen.
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code verwendet die Apache Spark-Methode
filter
, um einen neuen DataFrame zu erstellen, der die Daten nach Jahr, Anzahl und Geschlecht einschränkt. Er verwendet die Apache Spark-Methodeselect()
, um die Spalten einzuschränken. Außerdem werden die Apache Spark-FunktionenorderBy()
unddesc()
verwendet, um den neuen DataFrame nach Anzahl zu sortieren.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Schritt 5: Speichern des DataFrames
Hier erfahren Sie, wie Sie einen DataFrame speichern. Sie können Ihren DataFrame entweder in einer Tabelle speichern oder in eine oder mehrere Dateien schreiben.
Speichern des DataFrames in einer Tabelle
Azure Databricks verwendet standardmäßig das Delta Lake-Format für alle Tabellen. Um Ihren DataFrame zu speichern, benötigen Sie die Berechtigung CREATE
für die Tabelle im Katalog und das Schema.
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code speichert den Inhalt des DataFrame in einer Tabelle mithilfe der Variablen, die Sie am Anfang dieses Tutorials definiert haben.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Die meisten Apache Spark-Anwendungen funktionieren mit großen und verteilten Datasets. Apache Spark schreibt in ein Verzeichnis von Dateien anstelle einer einzelnen Datei. Delta Lake teilt die Parquet-Ordner und -Dateien auf. Viele Datensysteme können diese Verzeichnisse mit Dateien lesen. Azure Databricks empfiehlt für die meisten Anwendungen die Verwendung von Tabellen anstelle von Dateipfaden.
Speichern des DataFrames in JSON-Dateien
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code speichert den DataFrame in einem Verzeichnis von JSON-Dateien.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Lesen des DataFrames aus einer JSON-Datei
Hier erfahren Sie, wie Sie die Apache Spark-Methode spark.read.format()
verwenden, um JSON-Daten aus einem Verzeichnis in einen DataFrame einzulesen.
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code zeigt die JSON-Dateien an, die Sie im vorherigen Beispiel gespeichert haben.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Zusätzliche Aufgaben: Ausführen von SQL-Abfragen in PySpark, Scala und R
Apache Spark-DataFrames bieten die folgenden Optionen, um SQL mit PySpark, Scala und R zu kombinieren. Sie können den folgenden Code in dem Notebook ausführen, das Sie für dieses Tutorial erstellt haben.
Angeben einer Spalte als SQL-Abfrage
Hier erfahren Sie, wie Sie die Apache Spark-Methode selectExpr()
verwenden. Dies ist eine Variante der select()
-Methode, die SQL-Ausdrücke akzeptiert und einen aktualisierten DataFrame zurückgibt. Mit dieser Methode können Sie einen SQL-Ausdruck (z. B. upper
) verwenden.
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code verwendet die Apache Spark-Methode
selectExpr()
und den SQL-Ausdruckupper
, um eine Zeichenfolgenspalte in Großbuchstaben zu konvertieren (und die Spalte umzubenennen).Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Verwenden von expr()
, um die SQL-Syntax für eine Spalte zu verwenden
Hier erfahren Sie, wie Sie die Apache Spark-Funktion expr()
importieren und verwenden, um die SQL-Syntax an einer beliebigen Stelle zu verwenden, an der eine Spalte angegeben wird.
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code importiert die
expr()
-Funktion und verwendet dann die Apache Spark-Funktionexpr()
und den SQL-Ausdrucklower
, um eine Zeichenfolgenspalte in Kleinbuchstaben zu konvertieren (und die Spalte umzubenennen).Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
Ausführen einer beliebigen SQL-Abfrage mithilfe der Funktion „spark.sql()“
Hier erfahren Sie, wie Sie die Apache Spark-Funktion spark.sql()
verwenden, um beliebige SQL-Abfragen auszuführen.
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code verwendet die Apache Spark-Funktion
spark.sql()
, um eine SQL-Tabelle mithilfe der SQL-Syntax abzufragen.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Drücken Sie
Shift+Enter
, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.
DataFrame-Tutorialnotebooks
Die folgenden Notebooks enthalten die Beispielabfragen aus diesem Tutorial.