Verwenden von Spark zum Arbeiten mit Datendateien

Abgeschlossen

Nachdem Sie ein Notizbuch eingerichtet und an einen Cluster angefügt haben, können Sie Spark verwenden, um Datendateien zu lesen und zu verarbeiten. Spark unterstützt eine breite Palette von Formaten wie CSV, JSON, Parquet, ORC, Avro und Delta, und Databricks bietet eingebaute Connectors für den Zugriff auf Dateien, die im Arbeitsbereich gespeichert sind, im Azure Data Lake oder im Blob Storage oder in anderen externen Systemen.

Der Workflow folgt normalerweise drei Schritten:

  1. Lesen Sie eine Datei in einen Spark DataFrame mithilfe von spark.read mit dem richtigen Format und Pfad. Beim Lesen von Rohtextformaten wie CSV oder JSON kann Spark das Schema (Spaltennamen und Datentypen) ableiten, dies ist jedoch manchmal langsam oder unzuverlässig. Eine bessere Methode in der Produktion besteht darin, das Schema explizit zu definieren, damit die Daten konsistent und effizient geladen werden.

  2. Erkunden und transformieren Sie dataFrame mithilfe von SQL- oder DataFrame-Vorgängen (z. B. Filtern von Zeilen, Auswählen von Spalten, Aggregieren von Werten).

  3. Schreiben Sie die Ergebnisse in ein ausgewähltes Format zurück in den Speicher.

Das Arbeiten mit Dateien in Spark ist so konzipiert, dass sie für kleine und große Datasets konsistent sind. Derselbe Code, der zum Testen einer kleinen CSV-Datei verwendet wird, funktioniert auch an viel größeren Datasets, da Spark die Arbeit über den Cluster verteilt. Dies erleichtert die Skalierung von der schnellen Erkundung bis hin zur komplexeren Datenverarbeitung.

Laden von Daten in einen Dataframe

Lassen Sie uns an einem hypothetischen Beispiel untersuchen, wie Sie einen Dataframe für die Arbeit mit Daten verwenden können. Angenommen, Sie haben die folgenden Daten in einer durch Trennzeichen getrennten Textdatei namens products.csv im Datenordner in Ihrem Databricks File System (DBFS)-Speicher:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

In einem Spark-Notebook könnten Sie den folgenden PySpark-Code verwenden, um die Daten in einen Dataframe zu laden und die ersten zehn Zeilen anzuzeigen:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Die %pyspark Linie am Anfang wird als Magie bezeichnet und sagt Spark, dass die in dieser Zelle verwendete Sprache PySpark ist. Hier ist der entsprechende Scala-Code für das Beispiel mit den Produktdaten:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

Der Magic-Befehl %spark wird verwendet, um Scala anzugeben.

Tipp

Sie können auch die Sprache auswählen, die Sie für jede Zelle in der Notebookschnittstelle verwenden möchten.

Beide zuvor gezeigten Beispiele würden die Ausgabe wie folgt erzeugen:

Produkt-ID Produktname Kategorie Listenpreis
771 Berg-100 Silber, 38 Mountainbikes 3399.9900
772 Mountain-100, Silber, 42 Mountainbikes 3399.9900
773 Mountain-100 Silber, 44 Mountainbikes 3399.9900
... ... ... ...

Angeben eines Dataframeschemas

Im vorherigen Beispiel enthielt die erste Zeile der CSV-Datei die Spaltennamen, und Spark war in der Lage, den Datentyp jeder Spalte aus den darin enthaltenen Daten abzuleiten. Sie können auch ein explizites Schema für die Daten angeben, was nützlich ist, wenn die Spaltennamen nicht in der Datendatei enthalten sind, wie in diesem CSV-Beispiel:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Das folgende PySpark-Beispiel zeigt, wie Sie ein Schema für den Datenframe angeben, das aus einer Datei mit dem Namen product-data.csv in diesem Format geladen werden soll:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Die Ergebnisse wären wieder ähnlich zu:

Produkt-ID Produktname Kategorie Listenpreis
771 Berg-100 Silber, 38 Mountainbikes 3399.9900
772 Mountain-100, Silber, 42 Mountainbikes 3399.9900
773 Mountain-100 Silber, 44 Mountainbikes 3399.9900
... ... ... ...

Filtern und Gruppieren von Dataframes

Sie können die Methoden der Dataframe-Klasse verwenden, um die darin enthaltenen Daten zu filtern, zu sortieren, zu gruppieren und anderweitig zu bearbeiten. Im folgenden Codebeispiel wird beispielsweise die select Methode verwendet, um die Spalten "ProductName " und "ListPrice " aus dem DF-Datenframe abzurufen, der Produktdaten im vorherigen Beispiel enthält:

pricelist_df = df.select("ProductID", "ListPrice")

Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:

Produkt-ID Listenpreis
771 3399.9900
772 3399.9900
773 3399.9900
... ...

In Übereinstimmung mit den meisten Datenmanipulationsmethoden gibt select ein neues Dataframe-Objekt zurück.

Tipp

Die Auswahl einer Teilmenge von Spalten aus einem Dataframe ist ein gängiger Vorgang, der auch mithilfe der folgenden kürzeren Syntax erreicht werden kann:

pricelist_df = df["ProductID", "ListPrice"]

Sie können Methoden miteinander „verketten“, um eine Reihe von Bearbeitungen durchzuführen, die zu einem transformierten Dataframe führen. In diesem Beispielcode werden beispielsweise die select Und where Methoden zum Erstellen eines neuen Datenrahmens mit den Spalten "ProductName " und "ListPrice " für Produkte mit einer Kategorie von Mountainbikes oder Road Bikes verkettet:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:

Produktname Listenpreis
Berg-100 Silber, 38 3399.9900
Road-750 Schwarz, 52 539,9900
... ...

Zum Gruppieren und Aggregieren von Daten können Sie die groupby Methoden- und Aggregatfunktionen verwenden. Der folgende PySpark-Code zählt z. B. die Anzahl der Produkte für jede Kategorie:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:

Kategorie zählen
Lenkköpfe 3
Räder 14
Mountainbikes 32
... ...

Hinweis

Spark DataFrames sind deklarativ und unveränderlich. Jede Transformation (wie select, filter oder groupBy) erstellt einen neuen DataFrame, der darstellt, was Sie möchten, nicht wie es ausgeführt wird. Dadurch wird Code wiederverwendbar, optimiert und frei von Nebenwirkungen. Aber keine dieser Transformationen wird tatsächlich ausgeführt, bis Sie eine display auslösen (z. B. , collect, write), an dem Spark den vollständig optimierten Plan ausführt.

Verwenden von SQL-Ausdrücken in Spark

Die Dataframe-API ist Teil einer Spark-Bibliothek namens Spark SQL, mit der Datenanalysten SQL-Ausdrücke verwenden können, um Daten abzufragen und zu bearbeiten.

Erstellen von Datenbankobjekten im Spark-Katalog

Der Spark-Katalog ist ein Metastore für relationale Datenobjekte wie Sichten und Tabellen. Die Spark-Runtime kann den Katalog verwenden, um Code, der in einer von Spark unterstützten Sprache geschrieben wurde, problemlos mit SQL-Ausdrücken zu integrieren, die für einige Datenanalysten oder Entwickler natürlicher sind.

Eine der einfachsten Möglichkeiten, Daten in einem Dataframe für die Abfrage im Spark-Katalog verfügbar zu machen, ist das Erstellen einer temporären Sicht, wie im folgenden Codebeispiel gezeigt:

df.createOrReplaceTempView("products")

Eine Ansicht ist temporär, was bedeutet, dass sie am Ende der aktuellen Sitzung automatisch gelöscht wird. Sie können auch Tabellen erstellen, die im Katalog beibehalten werden, um eine Datenbank zu definieren, die mithilfe von Spark SQL abgefragt werden kann.

Hinweis

Wir werden uns in diesem Modul nicht eingehend mit Spark-Katalogtabellen befassen, aber es lohnt sich, ein paar wichtige Punkte zu erwähnen:

  • Sie können eine leere Tabelle mithilfe der spark.catalog.createTable-Methode erstellen. Tabellen sind Metadatenstrukturen, die ihre zugrunde liegenden Daten an dem mit dem Katalog verbundenen Speicherort speichern. Wenn Sie eine Tabelle löschen, werden auch die zugrunde liegenden Daten gelöscht.
  • Sie können einen Dataframe als Tabelle speichern, indem Sie seine saveAsTable-Methode verwenden.
  • Sie können eine externe Tabelle mithilfe der spark.catalog.createExternalTable Methode erstellen. Externe Tabellen definieren Metadaten im Katalog, beziehen ihre zugrunde liegenden Daten jedoch von einem externen Speicherort, in der Regel einem Ordner in einem Data Lake. Wenn Sie eine externe Tabelle löschen, werden die zugrunde liegenden Daten nicht gelöscht.

Verwenden der Spark SQL-API zur Abfrage von Daten

Sie können die Spark SQL-API in einem in einer beliebigen Sprache geschriebenen Code verwenden, um Daten im Katalog abzufragen. Der folgende PySpark-Code verwendet beispielsweise eine SQL-Abfrage, um Daten aus der Produktansicht als Datenframe zurückzugeben.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

Die Ergebnisse des Codebeispiels würden ähnlich wie in der folgenden Tabelle aussehen:

Produktname Listenpreis
Berg-100 Silber, 38 3399.9900
Road-750 Schwarz, 52 539,9900
... ...

Verwenden von SQL-Code

Das vorherige Beispiel hat gezeigt, wie Sie die Spark SQL-API verwenden können, um SQL-Ausdrücke in Spark-Code einzubetten. In einem Notebook können Sie den Magic-Befehl %sql auch dazu verwenden, SQL-Code auszuführen, der Objekte wie folgt im Katalog abfragt:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

Das SQL-Codebeispiel gibt ein Resultset zurück, das automatisch im Notebook als Tabelle angezeigt wird, wie die folgende:

Kategorie ProduktAnzahl
Trägershorts 3
Fahrradträger 1
Fahrradständer 1
... ...