Tutorial: Laden und Transformieren von Daten mithilfe von Apache Spark-DataFrames

In diesem Tutorial wird erläutert, wie Sie Daten mithilfe der Apache Spark Python-DataFrame-API (PySpark), der Apache Spark Scala-DataFrame-API und der SparkR-SparkDataFrame-API in Azure Databricks laden und transformieren.

Am Ende dieses Tutorials verstehen Sie, was ein DataFrame ist, und können die folgenden Aufgaben sicher ausführen:

Python

Lesen Sie auch die Referenz zur Apache Spark PySpark-API.

Scala

Lesen Sie auch die Referenz zur Apache Spark Scala-API.

R

Weitere Informationen finden Sie in der Apache SparkR-API-Referenz.

Was ist ein DataFrame?

Ein Dataframe ist eine zweidimensionale, bezeichnete Datenstruktur mit Spalten potenziell unterschiedlicher Typen. Sie können sich einen Dataframe wie eine Tabelle, eine SQL-Tabelle oder ein Wörterbuch mit Reihenobjekten vorstellen. Apache Spark DataFrames bieten zahlreiche Funktionen (Auswählen von Spalten, Filtern, Verknüpfen, Aggregieren), mit denen Sie häufige Probleme bei der Datenanalyse effizient beheben können.

Apache Spark DataFrames sind eine Abstraktion, die auf resilienten verteilten Datasets (Resilient Distributed Datasets, RDDs) basiert. Spark DataFrames und Spark SQL verwenden ein einheitliches Planungs- und Optimierungsmodul, mit dem Sie eine nahezu identische Leistung über alle unterstützten Sprachen in Azure Databricks (Python, SQL, Scala und R) hinweg erzielen können.

Anforderungen

Um das folgende Tutorial abzuschließen, müssen die folgenden Anforderungen erfüllt sein:

  • In Ihrem Arbeitsbereich muss Unity Catalog aktiviert sein, damit Sie die Beispiele in diesem Tutorial verwenden können.

  • In den Beispielen in diesem Tutorial wird ein Unity Catalog-Volume zum Speichern von Beispieldaten verwendet. Wenn Sie diese Beispiele verwenden möchten, erstellen Sie ein Volume, und verwenden Sie die Katalog-, Schema- und Volumenamen dieses Volumes, um den von den Beispielen verwendeten Volumepfad festzulegen.

  • Sie müssen in Unity Catalog über die folgenden Berechtigungen verfügen:

    • READ VOLUME und WRITE VOLUME oder ALL PRIVILEGES für das in diesem Tutorial verwendete Volume
    • USE SCHEMA oder ALL PRIVILEGES für das in diesem Tutorial verwendete Schema
    • USE CATALOG oder ALL PRIVILEGES für den in diesem Tutorial verwendeten Katalog

    Wenden Sie sich zum Festlegen dieser Berechtigungen an Ihren Databricks-Administrator, oder lesen Sie den Artikel Unity Catalog-Berechtigungen und sicherungsfähige Objekte.

Schritt 1: Definieren von Variablen und Laden der CSV-Datei

In diesem Schritt werden Variablen für die Verwendung in diesem Tutorial definiert. Anschließend wird eine CSV-Datei mit Babynamendaten aus health.data.ny.gov in Ihr Unity Catalog-Volume geladen.

  1. Öffnen Sie ein neues Notebook, indem Sie auf das Symbol Symbol „New” klicken. Informationen zum Navigieren in Azure Databricks-Notebooks finden Sie unter Databricks-Notebookschnittstelle und -steuerelemente.

  2. Kopieren Sie den folgenden Code, und fügen Sie ihn in die neue leere Notebookzelle ein. Ersetzen Sie <catalog-name>, <schema-name> und <volume-name> durch die Katalog-, Schema- und Volumenamen für ein Unity Catalog-Volume. Ersetzen Sie <table_name> durch einen Tabellennamen Ihrer Wahl. Im weiteren Verlauf dieses Tutorials werden Sie Babynamendaten in diese Tabelle laden.

  3. Drücken Sie Shift+Enter, um die Zelle auszuführen und eine neue leere Zelle zu erstellen.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. Kopieren Sie den folgenden Code, und fügen Sie ihn in die neue leere Notebookzelle ein. Dieser Code kopiert mithilfe des Databricks-Befehls „dbutuils“ die rows.csv-Datei aus health.data.ny.gov in Ihr Unity Catalog-Volume.

  5. Drücken Sie Shift+Enter, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

Schritt 2: Erstellen eines DataFrames

In diesem Schritt wird ein DataFrame namens df1 mit Testdaten erstellt. Anschließend wird der Inhalt angezeigt.

  1. Kopieren Sie den folgenden Code, und fügen Sie ihn in die neue leere Notebookzelle ein. Dieser Code erstellt den DataFrame mit Testdaten und zeigt dann den Inhalt und das Schema des DataFrames an.

  2. Drücken Sie Shift+Enter, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

Schritt 3: Laden von Daten aus einer CSV-Datei in einen DataFrame

In diesem Schritt wird ein DataFrame namens df_csv aus der CSV-Datei erstellt, die Sie zuvor in Ihr Unity Catalog-Volume geladen haben. Weitere Informationen finden Sie unter spark.read.csv.

  1. Kopieren Sie den folgenden Code, und fügen Sie ihn in die neue leere Notebookzelle ein. Dieser Code lädt Babynamendaten aus der CSV-Datei in den DataFrame df_csv und zeigt dann den Inhalt des DataFrames an.

  2. Drücken Sie Shift+Enter, um die Zelle auszuführen, und wechseln Sie dann zur nächsten Zelle.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

Sie können Daten aus vielen unterstützten Dateiformaten laden.

Schritt 4: Anzeigen und Interagieren mit dem DataFrame

Mithilfe der folgenden Methoden können Sie die Babynamen-DataFrames anzeigen und mit diesen interagieren.

Hier erfahren Sie, wie Sie das Schema eines Apache Spark-DataFrames anzeigen. Apache Spark verwendet den Begriff Schema, um auf die Namen und Datentypen der Spalten im DataFrame zu verweisen.

Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code zeigt das Schema Ihrer DataFrames mit der .printSchema()-Methode zum Anzeigen der Schemas der beiden DataFrames, um die Vereinigung der beiden DataFrames vorzubereiten.

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

Hinweis

Azure Databricks verwendet ebenfalls den Begriff „Schema“, um auf eine Sammlung der für einen Katalog registrierten Tabellen zu verweisen.

Umbenennen einer Spalte im DataFrame

Hier erfahren Sie, wie Sie eine Spalte in einem DataFrame umbenennen.

Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code benennt eine Spalte im df1_csv-DataFrame so um, dass sie mit der entsprechenden Spalte im df1-DataFrame übereinstimmt. Dieser Code verwendet die Apache Spark-Methode withColumnRenamed().

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

Kombinieren von DataFrames

Hier erfahren Sie, wie Sie einen neuen DataFrame erstellen, der die Zeilen eines DataFrames zu einem anderen hinzufügt.

Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code verwendet die Apache Spark-Methode union(), um die Inhalte Ihres ersten DataFrames df mit DataFrame df_csv zu kombinieren, der die aus der CSV-Datei geladenen Babynamendaten enthält.

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

Filtern von Zeilen in einem DataFrame

Sie können die beliebtesten Babynamen in Ihrem Dataset ermitteln, indem Sie die Zeilen mithilfe der Apache Spark-Methoden .filter() oder .where() filtern. Mithilfe der Filterung können Sie eine Teilmenge von Zeilen auswählen, um sie in einem DataFrame zurückzugeben oder zu ändern. Wie die folgende Beispiele zeigen, gibt es keinen Unterschied bei der Leistung oder Syntax.

Verwenden der Methode „.filter()“

Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code verwendet die Apache Spark-Methode .filter(), um diese Zeilen im DataFrame mit einem Wert über 50 anzuzeigen.

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

Verwenden der Methode „.where()“

Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code verwendet die Apache Spark-Methode .where(), um diese Zeilen im DataFrame mit einem Wert über 50 anzuzeigen.

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

Auswählen von Spalten aus einem DataFrame und Sortieren nach Häufigkeit

Hier erfahren Sie, welche Babynamenhäufigkeit mit der select()-Methode zum Angeben der Spalten aus dem DataFrame zurückgegeben werden soll. Verwenden Sie die Apache Spark-Funktionen orderby und desc, um die Ergebnisse zu sortieren.

Das pyspark.sql-Modul für Apache Spark bietet Unterstützung für SQL-Funktionen. Zu den in diesem Tutorial verwendeten Funktionen zählen die Apache Spark-Funktionen orderBy(), desc() und expr(). Sie können die Verwendung dieser Funktionen aktivieren, indem Sie sie nach Bedarf in Ihre Sitzung importieren.

Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code importiert die desc()-Funktion und verwendet dann die Apache Spark-Methode select() sowie die Apache Spark-Funktionen orderBy() und desc(), um die am häufigsten verwendeten Namen und deren Anzahl in absteigender Reihenfolge anzuzeigen.

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

Erstellen eines Teilmengen-DataFrames

Hier erfahren Sie, wie Sie einen Teilmenge-DataFrame aus einem vorhandenen DataFrame erstellen.

Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code verwendet die Apache Spark-Methode filter, um einen neuen DataFrame zu erstellen, der die Daten nach Jahr, Anzahl und Geschlecht einschränkt. Er verwendet die Apache Spark-Methode select(), um die Spalten einzuschränken. Außerdem werden die Apache Spark-Funktionen orderBy() und desc() verwendet, um den neuen DataFrame nach Anzahl zu sortieren.

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

Schritt 5: Speichern des DataFrames

Hier erfahren Sie, wie Sie einen DataFrame speichern. Sie können Ihren DataFrame entweder in einer Tabelle speichern oder in eine oder mehrere Dateien schreiben.

Speichern des DataFrames in einer Tabelle

Azure Databricks verwendet standardmäßig das Delta Lake-Format für alle Tabellen. Um Ihren DataFrame zu speichern, benötigen Sie die Berechtigung CREATE für die Tabelle im Katalog und das Schema.

Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code speichert den Inhalt des DataFrame in einer Tabelle mithilfe der Variablen, die Sie am Anfang dieses Tutorials definiert haben.

Python

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(fs"$path_tables" + "." + s"$table_name")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$path_volume" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

Die meisten Apache Spark-Anwendungen funktionieren mit großen und verteilten Datasets. Apache Spark schreibt in ein Verzeichnis von Dateien anstelle einer einzelnen Datei. Delta Lake teilt die Parquet-Ordner und -Dateien auf. Viele Datensysteme können diese Verzeichnisse mit Dateien lesen. Azure Databricks empfiehlt für die meisten Anwendungen die Verwendung von Tabellen anstelle von Dateipfaden.

Speichern des DataFrames in JSON-Dateien

Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code speichert den DataFrame in einem Verzeichnis von JSON-Dateien.

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

Lesen des DataFrames aus einer JSON-Datei

Hier erfahren Sie, wie Sie die Apache Spark-Methode spark.read.format() verwenden, um JSON-Daten aus einem Verzeichnis in einen DataFrame einzulesen.

Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code zeigt die JSON-Dateien an, die Sie im vorherigen Beispiel gespeichert haben.

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

Zusätzliche Aufgaben: Ausführen von SQL-Abfragen in PySpark, Scala und R

Apache Spark-DataFrames bieten die folgenden Optionen, um SQL mit PySpark, Scala und R zu kombinieren. Sie können den folgenden Code in dem Notebook ausführen, das Sie für dieses Tutorial erstellt haben.

Angeben einer Spalte als SQL-Abfrage

Hier erfahren Sie, wie Sie die Apache Spark-Methode selectExpr() verwenden. Dies ist eine Variante der select()-Methode, die SQL-Ausdrücke akzeptiert und einen aktualisierten DataFrame zurückgibt. Mit dieser Methode können Sie einen SQL-Ausdruck (z. B. upper) verwenden.

Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code verwendet die Apache Spark-Methode selectExpr() und den SQL-Ausdruck upper, um eine Zeichenfolgenspalte in Großbuchstaben zu konvertieren (und die Spalte umzubenennen).

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

Verwenden von expr(), um die SQL-Syntax für eine Spalte zu verwenden

Hier erfahren Sie, wie Sie die Apache Spark-Funktion expr() importieren und verwenden, um die SQL-Syntax an einer beliebigen Stelle zu verwenden, an der eine Spalte angegeben wird.

Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code importiert die expr()-Funktion und verwendet dann die Apache Spark-Funktion expr() und den SQL-Ausdruck lower, um eine Zeichenfolgenspalte in Kleinbuchstaben zu konvertieren (und die Spalte umzubenennen).

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

Ausführen einer beliebigen SQL-Abfrage mithilfe der Funktion „spark.sql()“

Hier erfahren Sie, wie Sie die Apache Spark-Funktion spark.sql() verwenden, um beliebige SQL-Abfragen auszuführen.

Kopieren Sie den folgenden Code, und fügen Sie ihn in eine leere Notebookzelle ein. Dieser Code verwendet die Apache Spark-Funktion spark.sql(), um eine SQL-Tabelle mithilfe der SQL-Syntax abzufragen.

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

DataFrame-Tutorialnotebook

Das folgende Notebook enthält die Beispielabfragen aus diesem Tutorial.

Python

DataFrames-Tutorialnotebook

Notebook abrufen

Scala

DataFrames-Tutorialnotebook

Notebook abrufen

R

DataFrames-Tutorialnotebook

Notebook abrufen

Zusätzliche Ressourcen