Verwenden von Spark zum Arbeiten mit Datendateien
Nachdem Sie ein Notizbuch eingerichtet und an einen Cluster angefügt haben, können Sie Spark verwenden, um Datendateien zu lesen und zu verarbeiten. Spark unterstützt eine breite Palette von Formaten wie CSV, JSON, Parquet, ORC, Avro und Delta, und Databricks bietet eingebaute Connectors für den Zugriff auf Dateien, die im Arbeitsbereich gespeichert sind, im Azure Data Lake oder im Blob Storage oder in anderen externen Systemen.
Der Workflow folgt normalerweise drei Schritten:
Lesen Sie eine Datei in einen Spark DataFrame mithilfe von spark.read mit dem richtigen Format und Pfad. Beim Lesen von Rohtextformaten wie CSV oder JSON kann Spark das Schema (Spaltennamen und Datentypen) ableiten, dies ist jedoch manchmal langsam oder unzuverlässig. Eine bessere Methode in der Produktion besteht darin, das Schema explizit zu definieren, damit die Daten konsistent und effizient geladen werden.
Erkunden und transformieren Sie dataFrame mithilfe von SQL- oder DataFrame-Vorgängen (z. B. Filtern von Zeilen, Auswählen von Spalten, Aggregieren von Werten).
Schreiben Sie die Ergebnisse in ein ausgewähltes Format zurück in den Speicher.
Das Arbeiten mit Dateien in Spark ist so konzipiert, dass sie für kleine und große Datasets konsistent sind. Derselbe Code, der zum Testen einer kleinen CSV-Datei verwendet wird, funktioniert auch an viel größeren Datasets, da Spark die Arbeit über den Cluster verteilt. Dies erleichtert die Skalierung von der schnellen Erkundung bis hin zur komplexeren Datenverarbeitung.
Laden von Daten in einen Dataframe
Lassen Sie uns an einem hypothetischen Beispiel untersuchen, wie Sie einen Dataframe für die Arbeit mit Daten verwenden können. Angenommen, Sie haben die folgenden Daten in einer durch Trennzeichen getrennten Textdatei namens products.csv im Datenordner in Ihrem Databricks File System (DBFS)-Speicher:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
In einem Spark-Notebook könnten Sie den folgenden PySpark-Code verwenden, um die Daten in einen Dataframe zu laden und die ersten zehn Zeilen anzuzeigen:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Die %pyspark Linie am Anfang wird als Magie bezeichnet und sagt Spark, dass die in dieser Zelle verwendete Sprache PySpark ist. Hier ist der entsprechende Scala-Code für das Beispiel mit den Produktdaten:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
Der Magic-Befehl %spark wird verwendet, um Scala anzugeben.
Tipp
Sie können auch die Sprache auswählen, die Sie für jede Zelle in der Notebookschnittstelle verwenden möchten.
Beide zuvor gezeigten Beispiele würden die Ausgabe wie folgt erzeugen:
| Produkt-ID | Produktname | Kategorie | Listenpreis |
|---|---|---|---|
| 771 | Berg-100 Silber, 38 | Mountainbikes | 3399.9900 |
| 772 | Mountain-100, Silber, 42 | Mountainbikes | 3399.9900 |
| 773 | Mountain-100 Silber, 44 | Mountainbikes | 3399.9900 |
| ... | ... | ... | ... |
Angeben eines Dataframeschemas
Im vorherigen Beispiel enthielt die erste Zeile der CSV-Datei die Spaltennamen, und Spark war in der Lage, den Datentyp jeder Spalte aus den darin enthaltenen Daten abzuleiten. Sie können auch ein explizites Schema für die Daten angeben, was nützlich ist, wenn die Spaltennamen nicht in der Datendatei enthalten sind, wie in diesem CSV-Beispiel:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Das folgende PySpark-Beispiel zeigt, wie Sie ein Schema für den Datenframe angeben, das aus einer Datei mit dem Namen product-data.csv in diesem Format geladen werden soll:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Die Ergebnisse wären wieder ähnlich zu:
| Produkt-ID | Produktname | Kategorie | Listenpreis |
|---|---|---|---|
| 771 | Berg-100 Silber, 38 | Mountainbikes | 3399.9900 |
| 772 | Mountain-100, Silber, 42 | Mountainbikes | 3399.9900 |
| 773 | Mountain-100 Silber, 44 | Mountainbikes | 3399.9900 |
| ... | ... | ... | ... |
Filtern und Gruppieren von Dataframes
Sie können die Methoden der Dataframe-Klasse verwenden, um die darin enthaltenen Daten zu filtern, zu sortieren, zu gruppieren und anderweitig zu bearbeiten. Im folgenden Codebeispiel wird beispielsweise die select Methode verwendet, um die Spalten "ProductName " und "ListPrice " aus dem DF-Datenframe abzurufen, der Produktdaten im vorherigen Beispiel enthält:
pricelist_df = df.select("ProductID", "ListPrice")
Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:
| Produkt-ID | Listenpreis |
|---|---|
| 771 | 3399.9900 |
| 772 | 3399.9900 |
| 773 | 3399.9900 |
| ... | ... |
In Übereinstimmung mit den meisten Datenmanipulationsmethoden gibt select ein neues Dataframe-Objekt zurück.
Tipp
Die Auswahl einer Teilmenge von Spalten aus einem Dataframe ist ein gängiger Vorgang, der auch mithilfe der folgenden kürzeren Syntax erreicht werden kann:
pricelist_df = df["ProductID", "ListPrice"]
Sie können Methoden miteinander „verketten“, um eine Reihe von Bearbeitungen durchzuführen, die zu einem transformierten Dataframe führen. In diesem Beispielcode werden beispielsweise die select Und where Methoden zum Erstellen eines neuen Datenrahmens mit den Spalten "ProductName " und "ListPrice " für Produkte mit einer Kategorie von Mountainbikes oder Road Bikes verkettet:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:
| Produktname | Listenpreis |
|---|---|
| Berg-100 Silber, 38 | 3399.9900 |
| Road-750 Schwarz, 52 | 539,9900 |
| ... | ... |
Zum Gruppieren und Aggregieren von Daten können Sie die groupby Methoden- und Aggregatfunktionen verwenden. Der folgende PySpark-Code zählt z. B. die Anzahl der Produkte für jede Kategorie:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:
| Kategorie | zählen |
|---|---|
| Lenkköpfe | 3 |
| Räder | 14 |
| Mountainbikes | 32 |
| ... | ... |
Hinweis
Spark DataFrames sind deklarativ und unveränderlich. Jede Transformation (wie select, filter oder groupBy) erstellt einen neuen DataFrame, der darstellt, was Sie möchten, nicht wie es ausgeführt wird. Dadurch wird Code wiederverwendbar, optimiert und frei von Nebenwirkungen. Aber keine dieser Transformationen wird tatsächlich ausgeführt, bis Sie eine display auslösen (z. B. , collect, write), an dem Spark den vollständig optimierten Plan ausführt.
Verwenden von SQL-Ausdrücken in Spark
Die Dataframe-API ist Teil einer Spark-Bibliothek namens Spark SQL, mit der Datenanalysten SQL-Ausdrücke verwenden können, um Daten abzufragen und zu bearbeiten.
Erstellen von Datenbankobjekten im Spark-Katalog
Der Spark-Katalog ist ein Metastore für relationale Datenobjekte wie Sichten und Tabellen. Die Spark-Runtime kann den Katalog verwenden, um Code, der in einer von Spark unterstützten Sprache geschrieben wurde, problemlos mit SQL-Ausdrücken zu integrieren, die für einige Datenanalysten oder Entwickler natürlicher sind.
Eine der einfachsten Möglichkeiten, Daten in einem Dataframe für die Abfrage im Spark-Katalog verfügbar zu machen, ist das Erstellen einer temporären Sicht, wie im folgenden Codebeispiel gezeigt:
df.createOrReplaceTempView("products")
Eine Ansicht ist temporär, was bedeutet, dass sie am Ende der aktuellen Sitzung automatisch gelöscht wird. Sie können auch Tabellen erstellen, die im Katalog beibehalten werden, um eine Datenbank zu definieren, die mithilfe von Spark SQL abgefragt werden kann.
Hinweis
Wir werden uns in diesem Modul nicht eingehend mit Spark-Katalogtabellen befassen, aber es lohnt sich, ein paar wichtige Punkte zu erwähnen:
- Sie können eine leere Tabelle mithilfe der
spark.catalog.createTable-Methode erstellen. Tabellen sind Metadatenstrukturen, die ihre zugrunde liegenden Daten an dem mit dem Katalog verbundenen Speicherort speichern. Wenn Sie eine Tabelle löschen, werden auch die zugrunde liegenden Daten gelöscht. - Sie können einen Dataframe als Tabelle speichern, indem Sie seine
saveAsTable-Methode verwenden. - Sie können eine externe Tabelle mithilfe der
spark.catalog.createExternalTableMethode erstellen. Externe Tabellen definieren Metadaten im Katalog, beziehen ihre zugrunde liegenden Daten jedoch von einem externen Speicherort, in der Regel einem Ordner in einem Data Lake. Wenn Sie eine externe Tabelle löschen, werden die zugrunde liegenden Daten nicht gelöscht.
Verwenden der Spark SQL-API zur Abfrage von Daten
Sie können die Spark SQL-API in einem in einer beliebigen Sprache geschriebenen Code verwenden, um Daten im Katalog abzufragen. Der folgende PySpark-Code verwendet beispielsweise eine SQL-Abfrage, um Daten aus der Produktansicht als Datenframe zurückzugeben.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Die Ergebnisse des Codebeispiels würden ähnlich wie in der folgenden Tabelle aussehen:
| Produktname | Listenpreis |
|---|---|
| Berg-100 Silber, 38 | 3399.9900 |
| Road-750 Schwarz, 52 | 539,9900 |
| ... | ... |
Verwenden von SQL-Code
Das vorherige Beispiel hat gezeigt, wie Sie die Spark SQL-API verwenden können, um SQL-Ausdrücke in Spark-Code einzubetten. In einem Notebook können Sie den Magic-Befehl %sql auch dazu verwenden, SQL-Code auszuführen, der Objekte wie folgt im Katalog abfragt:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
Das SQL-Codebeispiel gibt ein Resultset zurück, das automatisch im Notebook als Tabelle angezeigt wird, wie die folgende:
| Kategorie | ProduktAnzahl |
|---|---|
| Trägershorts | 3 |
| Fahrradträger | 1 |
| Fahrradständer | 1 |
| ... | ... |