Freigeben über


Grundlagen von PySpark

In diesem Artikel wird die Verwendung von PySpark anhand einfacher Beispiele veranschaulicht. Es wird davon ausgegangen, dass Sie mit grundlegenden Apache Spark-Konzepten vertraut sind und Befehle in einem Azure Databricks-Notebook ausführen, das mit Computeressourcen verbunden ist. Sie erstellen DataFrames mithilfe von Beispieldaten, führen grundlegende Transformationen einschließlich Zeilen- und Spaltenvorgängen für diese Daten aus, kombinieren mehrere DataFrames und aggregieren, visualisieren und speichern diese Daten dann in einer Tabelle oder Datei.

Hochladen von Daten

In einigen Beispielen in diesem Artikel werden von Databricks bereitgestellte Beispieldaten verwendet, um zu veranschaulichen, wie Sie Daten mit DataFrames laden, transformieren und speichern können. Wenn Sie eigene Daten verwenden möchten, die noch nicht in Databricks importiert wurden, können Sie sie hochladen und einen DataFrame daraus erstellen. Weitere Informationen finden Sie unter Erstellen oder Ändern einer Tabelle mithilfe des Dateiuploads und Hochladen von Dateien auf ein Unity-Katalog-Volume.

Informationen zu Databricks-Beispieldaten

Databricks stellt Beispieldaten im samples-Katalog und im Verzeichnis /databricks-datasets bereit.

  • Verwenden Sie das Format samples.<schema-name>.<table-name>, um auf die Beispieldaten im samples-Katalog zuzugreifen. In diesem Artikel werden Tabellen im Schema samples.tpch verwendet, die die Daten eines fiktiven Unternehmens enthalten. Die customer-Tabelle enthält Informationen zu Kunden, und orders enthält Informationen zu Bestellungen, die von diesen Kunden aufgegeben wurden.
  • Verwenden Sie dbutils.fs.ls, um Daten in /databricks-datasets zu untersuchen. Verwenden Sie Spark SQL oder DataFrames, um Daten an diesem Speicherort mithilfe von Dateipfaden abzufragen. Weitere Informationen zu von Databricks bereitgestellten Beispieldaten finden Sie unter Beispieldatasets.

Importieren von Datentypen

Für viele PySpark-Vorgänge müssen Sie SQL-Funktionen verwenden oder mit nativen Spark-Typen interagieren. Sie können entweder nur die benötigten Funktionen und Typen direkt importieren, oder Sie können das gesamte Modul importieren.

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

Da einige importierte Funktionen möglicherweise integrierte Python-Funktionen überschreiben, importieren manche Benutzer diese Module mit einem Alias. Die folgenden Beispiele zeigen einen allgemeinen Alias, der in Apache Spark-Codebeispielen verwendet wird:

import pyspark.sql.types as T
import pyspark.sql.functions as F

Eine umfassende Liste der Datentypen finden Sie unter Spark-Datentypen.

Eine umfassende Liste der PySpark SQL-Funktionen finden Sie unter Spark-Funktionen.

Erstellen eines DataFrame

Es gibt mehrere Methoden, einen DataFrame zu erstellen. In der Regel definieren Sie einen DataFrame für eine Datenquelle, z. B. eine Tabelle oder eine Reihe von Dateien. Wie im Abschnitt zu den grundlegenden Konzepten in Apache Spark beschrieben, lösen Sie die auszuführenden Transformationen mit einer Aktion wie display aus. Die display-Methode gibt DataFrames aus.

Erstellen eines DataFrame mit angegebenen Werten

Um einen DataFrame mit angegebenen Werten zu erstellen, verwenden Sie die createDataFrame-Methode, wobei Zeilen als Tupelliste ausgedrückt werden:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Beachten Sie in der Ausgabe, dass die Datentypen von df_children-Spalten automatisch abgeleitet werden. Sie können die Typen stattdessen auch angeben, indem Sie ein Schema hinzufügen. Schemas werden mithilfe des Datentyps StructType definiert, der aus StructFields besteht. Diese Felder enthalten den Namen, den Datentyp und ein boolesches Flag, das angibt, ob sie einen NULL-Wert enthalten oder nicht. Sie müssen Datentypen aus pyspark.sql.types importieren.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Erstellen eines DataFrame aus einer Tabelle in Unity Catalog

Verwenden Sie zum Erstellen eines DataFrame aus einer Tabelle in Unity Catalog die table-Methode, und geben Sie die Tabelle im Format <catalog-name>.<schema-name>.<table-name> an. Klicken Sie in der linken Navigationsleiste auf Katalog, um mithilfe des Katalog-Explorers zur Tabelle zu navigieren. Klicken Sie darauf, und wählen Sie dann Tabellenpfad kopieren aus, um den Tabellenpfad in das Notebook einzufügen.

Im folgenden Beispiel wird die Tabelle samples.tpch.customergeladen. Sie können stattdessen aber auch den Pfad zu einer eigenen Tabelle angeben.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Erstellen eines DataFrame aus einer hochgeladenen Datei

Um einen DataFrame aus einer Datei zu erstellen, die Sie in ein Unity Catalog-Volume hochgeladen haben, verwenden Sie die read-Eigenschaft. Diese Methode gibt DataFrameReader zurück, womit dann das entsprechende Format gelesen werden kann. Klicken Sie links in der kleinen Randleiste auf die Katalogoption, und suchen Sie Ihre Datei mit dem Katalogbrowser. Wählen Sie sie aus, und klicken Sie dann auf Volumedateipfad kopieren.

Im folgenden Beispiel wird aus einer *.csv-Datei gelesen. DataFrameReader unterstützt das Hochladen von Dateien jedoch auch in vielen anderen Formaten. Weitere Informationen finden Sie unter DataFrameReader-Methoden.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Weitere Informationen zu Unity Catalog-Volumes finden Sie unter Was sind Unity Catalog-Volumes?.

Erstellen eines DataFrame aus einer JSON-Antwort

Wenn Sie einen DataFrame aus JSON-Antwortnutzdaten erstellen möchten, die von einer REST-API zurückgegeben werden, das requests-Paket von Python, um die Antwort abzufragen und zu parsen. Sie müssen das Paket importieren, um es verwenden zu können. In diesem Beispiel werden Daten aus der Datenbank der Arzneimittelzulassungsanträge der US-Behörde „Food and Drug Administration“ verwendet.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Informationen zum Arbeiten mit JSON und anderen halbstrukturierten Daten in Databricks finden Sie unter Modellieren von halbstrukturierte Daten.

Auswählen eines JSON-Felds oder -Objekts

Wenn Sie ein bestimmtes Feld oder Objekt aus dem konvertierten JSON-Code auswählen möchten, verwenden Sie die []-Notation. So wählen Sie beispielsweise das products-Feld aus, das selbst ein Array von Produkten ist:

display(df_drugs.select(df_drugs["products"]))

Sie können Methodenaufrufe auch verketten, um mehrere Felder zu durchlaufen. So geben Sie beispielsweise den Markennamen des ersten Produkts in einem Arzneimittelzulassungsantrag aus:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Erstellen eines DataFrame aus einer Datei

Um das Erstellen eines DataFrame aus einer Datei zu veranschaulichen, werden in diesem Beispiel CSV-Daten in das Verzeichnis /databricks-datasets geladen.

Um zu den Beispieldatasets zu navigieren, können Sie die Dateisystembefehle von Databricks Utilties verwenden. Im folgenden Beispiel werden die in /databricks-datasets verfügbaren Datasets mit dbutils aufgelistet:

display(dbutils.fs.ls('/databricks-datasets'))

Alternativ können Sie mit %fs auf die Dateisystembefehle der Databricks-CLI zugreifen, wie im folgenden Beispiel gezeigt:

%fs ls '/databricks-datasets'

Um einen DataFrame aus einer Datei oder einem Dateiverzeichnis zu erstellen, geben Sie den Pfad in der load-Methode an:

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

Transformieren von Daten mit DataFrames

DataFrames vereinfachen das Transformieren von Daten mithilfe integrierter Methoden zum Sortieren, Filtern und Aggregieren von Daten. Viele Transformationen werden nicht als Methoden für DataFrames angegeben, sondern im Paket spark.sql.functions bereitgestellt. Weitere Informationen finden Sie unter Spark SQL-Funktionen in Databricks

Spaltenvorgänge (Column operations)

Spark bietet viele grundlegende Spaltenvorgänge:

Tipp

Um alle Spalten in einem DataFrame auszugeben, verwenden Sie columns, z. B. df_customer.columns.

Spalten auswählen

Sie können bestimmte Spalten mit select und col auswählen. Die col-Funktion befindet sich im Submodul pyspark.sql.functions.

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

Sie können auch mit der Funktion expr auf eine Spalte verweisen, die einen als Zeichenfolge definierten Ausdruck annimmt:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

Sie können auch die Funktion selectExpr verwenden, die SQL-Ausdrücke akzeptiert:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Wenn Sie Spalten mit einem Zeichenfolgenliteral auswählen möchten, gehen Sie wie folgt vor:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Wenn Sie eine Spalte explizit aus einem bestimmten DataFrame auswählen möchten, können Sie den []- oder den .-Operator verwenden. (Mit dem .-Operator können keine Spalten ausgewählt werden, die mit einer ganzen Zahl beginnen oder ein Leerzeichen oder Sonderzeichen enthalten.) Dies kann besonders hilfreich sein, wenn Sie DataFrames verknüpfen, bei denen einige Spalten denselben Namen haben.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Spalten erstellen

Verwenden Sie zum Erstellen einer neuen Spalte die withColumn-Methode. Im folgenden Beispiel wird eine neue Spalte erstellt, die einen booleschen Wert enthält, der darauf basiert, ob der Kontostand des Kundenkontos c_acctbal größer als 1000 ist:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Umbenennen von Spalten

Um eine Spalte umzubenennen, verwenden Sie die withColumnRenamed-Methode, die die vorhandenen und neuen Spaltennamen akzeptiert:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

Die alias-Methode ist besonders hilfreich, wenn Sie Spalten im Rahmen von Aggregationen umbenennen möchten:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Umwandeln von Spaltentypen

In einigen Fällen sollten Sie den Datentyp für mindestens eine Spalte in Ihrem DataFrame ändern. Verwenden Sie die cast-Methode, um Spaltendatentypen umzuwandeln. Das folgende Beispiel zeigt, wie eine Spalte von einer ganzen Zahl in eine Zeichenfolge umgewandelt wird, wobei mithilfe der col-Methode auf eine Spalte verwiesen wird:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Entfernen von Spalten

Um Spalten zu entfernen, können Sie sie während einer Auswahl oder select(*) except auslassen, oder Sie können die drop-Methode verwenden:

df_customer_flag_renamed.drop("balance_flag_renamed")

Sie können auch mehrere Spalten gleichzeitig entfernen:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Zeilenvorgänge (Row operations)

Spark bietet viele grundlegende Zeilenvorgänge:

Zeilen filtern

Um Zeilen zu filtern und nur bestimmte Zeilen zurückzugeben, verwenden Sie die filter- oder where-Methode für einen DataFrame. Um eine Spalte zu ermitteln, nach der gefiltert werden soll, verwenden Sie die col-Methode oder einen Ausdruck, der zu einer Spalte ausgewertet wird.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Verwenden Sie logische Operatoren, um nach mehreren Bedingungen zu filtern. Mit & und | können Sie beispielsweise die AND- bzw. OR-Bedingung verwenden. Im folgenden Beispiel werden Zeilen gefiltert, in denen c_nationkey gleich 20 und c_acctbal größer als 1000 ist.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Entfernen doppelter Zeilen

Verwenden Sie die Transformation distinct zum Deduplizieren doppelter Zeilen, die nur eindeutige Zeilen zurückgibt.

df_unique = df_customer.distinct()

Behandeln von NULL-Werten

Um Nullwerte zu behandeln, entfernen Sie Zeilen, die NULL-Werte enthalten, mithilfe der na.drop-Methode. Mit dieser Methode können Sie angeben, ob Zeilen, die any- oder all-NULL-Werte enthalten, entfernt werden sollen.

Wenn Sie NULL-Werte entfernen möchten, verwenden Sie eines der folgenden Beispiele.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Wenn Sie stattdessen nur Zeilen herausfiltern möchten, die alle NULL-Werte enthalten, verwenden Sie folgenden Code:

df_customer_no_nulls = df_customer.na.drop("all")

Mit dem folgenden Code können Sie dies auch auf einen Teil der Spalten anwenden:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Um die fehlenden Werte auszufüllen, verwenden Sie die fill-Methode. Sie können sie auf alle Spalten oder nur einen Teil davon anwenden. Im folgenden Beispiel werden Kontostände (c_acctbal), die einen NULL-Wert aufweisen, mit 0 gefüllt.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Verwenden Sie die replace-Methode, um Zeichenfolgen durch andere Werte zu ersetzen. Im folgenden Beispiel werden alle leeren Adresszeichenfolgen durch das Wort UNKNOWN ersetzt:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Anfügen von Zeilen

Zum Anfügen von Zeilen müssen Sie mithilfe der union-Methode einen neuen DataFrame erstellen. Im folgenden Beispiel wird der zuvor erstellte DataFrame df_that_one_customer mit df_filtered_customer kombiniert, wodurch ein DataFrame mit drei Kunden zurückgegeben wird:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Hinweis

Sie können DataFrames auch kombinieren, indem Sie sie in eine Tabelle schreiben und dann neue Zeilen anfügen. Bei Produktionsworkloads mit stetig wachsender Datenmenge kann die inkrementelle Verarbeitung von Datenquellen in eine Zieltabelle die Wartezeit und die Computekosten drastisch reduzieren. Weitere Informationen finden Sie unter Erfassen von Daten in einem Databricks-Lakehouse.

Zeilen sortieren

Wichtig

Die Sortierung kann bei großen Datenmengen teuer sein, und wenn Sie sortierte Daten speichern und die Daten mit Spark neu laden, ist die Reihenfolge nicht garantiert. Achten Sie darauf, dass Sie die Sortierung wohlüberlegt verwenden.

Um Zeilen nach mindestens einer Spalte zu sortieren, verwenden Sie die sort- oder orderBy-Methode. Diese Methoden sortieren standardmäßig in aufsteigender Reihenfolge:

df_customer.orderBy(col("c_acctbal"))

Um in absteigender Reihenfolge zu filtern, verwenden Sie desc:

df_customer.sort(col("c_custkey").desc())

Im folgenden Beispiel wird gezeigt, wie Sie nach zwei Spalten filtern:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Um die Anzahl der Zeilen zu begrenzen, die zurückgegeben werden, nachdem der DataFrame sortiert wurde, verwenden Sie die limit-Methode. Im folgenden Beispiel werden nur die obersten 10 Ergebnisse angezeigt:

display(df_sorted.limit(10))

Verknüpfen von Dataframes

Um mindestens zwei DataFrames zu verknüpfen, verwenden Sie die join-Methode. Sie können in den Parametern how (dem Jointyp) und on (auf welchen Spalten der Join basieren soll) angeben, wie die DataFrames verknüpft werden sollen. Gängige Jointypen sind zum Beispiel:

  • inner: Dies ist der Standardjoin. Er gibt einen DataFrame zurück, der nur die Zeilen enthält, für die der on-Parameter in allen DataFrames übereinstimmt.
  • left: Dadurch werden alle Zeilen des ersten angegebenen DataFrame und nur Zeilen des zweiten beibehalten, die eine Übereinstimmung mit dem ersten aufweisen.
  • outer: Bei einem äußeren Join werden alle Zeilen aus beiden DataFrames beibehalten, unabhängig davon, ob es seine Übereinstimmung gibt.

Ausführliche Informationen zu Joins finden Sie unter Arbeiten mit Joins in Azure Databricks. Eine Liste der in PySpark unterstützten Joins finden Sie unter DataFrame-Joins.

Im folgenden Beispiel wird ein einzelner DataFrame zurückgegeben, in dem jede Zeile des orders-DataFrame mit der entsprechenden Zeile aus dem customers-DataFrame verknüpft ist. Hier wird ein innerer Join verwendet, da erwartet wird, dass jeder Auftrag genau einem Kunden entspricht.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Wenn Sie einen Join mit mehreren Bedingungen durchführen möchten, verwenden Sie boolesche Operatoren wie & und |,um AND bzw. OR anzugeben. Im folgenden Beispiel wird eine zusätzliche Bedingung hinzugefügt, und es werden nur die Zeilen gefiltert, in denen o_totalprice größer als 500,000 ist:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Aggregatdaten

Um Daten in einem DataFrame zu aggregieren, ähnlich wie mit GROUP BY in SQL, geben Sie mit der groupBy-Methode die Spalten an, nach denen gruppiert werden soll, und die agg-Methode, um Aggregationen anzugeben. Importieren Sie allgemeine Aggregationen wie avg, sum, max und min aus pyspark.sql.functions. Das folgende Beispiel zeigt den durchschnittlichen Kontostand von Kunden nach Marktsegment:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Einige Aggregationen sind Aktionen, was bedeutet, dass sie Berechnungen auslösen. In diesem Fall müssen Sie keine anderen Aktionen zum Ausgeben von Ergebnissen verwenden.

Verwenden Sie die count-Methode, um die Zeilen in einem DataFrame zu zählen.

df_customer.count()

Verketten von Aufrufen

Methoden, die DataFrames transformieren, geben DataFrames zurück, und Spark reagiert erst dann auf Transformationen, wenn Aktionen aufgerufen werden. Diese sogenannte verzögerte Auswertung bedeutet, dass Sie mehrere Methoden verketten können, um die Benutzerfreundlichkeit und Lesbarkeit zu verbessern. Im folgenden Beispiel wird gezeigt, wie Filterung, Aggregation und Sortierung verkettet werden:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Visualisieren Ihres DataFrame

Um einen DataFrame in einem Notebook zu visualisieren, klicken Sie neben der Tabelle oben links im DataFrame auf das +-Symbol, und wählen Sie dann Visualisierung aus, um mindestens ein Diagramm basierend auf Ihrem DataFrame hinzuzufügen. Weitere Informationen zu Visualisierungen finden Sie unter Visualisierungen in Databricks-Notebooks.

display(df_order)

Um zusätzliche Visualisierungen durchzuführen, empfiehlt Databricks die Verwendung der Pandas-API für Spark. Mit .pandas_api() können Sie zur entsprechenden Pandas-API für einen Spark-DataFrame umwandeln. Weitere Informationen finden Sie unter Pandas-API in Spark.

Speichern Ihrer Daten

Nachdem Sie Ihre Daten transformiert haben, können Sie sie mithilfe der DataFrameWriter-Methoden speichern. Eine vollständige Liste dieser Methoden finden Sie unter DataFrameWriter. In den folgenden Abschnitten wird gezeigt, wie Sie Ihren DataFrame als Tabelle und als Sammlung von Datendateien speichern.

Speichern Ihres DataFrame als Tabelle

Um Ihren DataFrame als Tabelle in Unity Catalog zu speichern, geben Sie den Pfad mithilfe der write.saveAsTable-Methode im Format <catalog-name>.<schema-name>.<table-name> an.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Schreiben Ihres DataFrame als CSV

Wenn Sie Ihren DataFrame im *.csv-Format schreiben möchten, geben Sie mithilfe der write.csv Methode das Format und die Optionen an. Wenn Daten im angegebenen Pfad vorhanden sind, schlägt der Schreibvorgang standardmäßig fehl. Sie können einen der folgenden Modi angeben, um eine andere Aktion auszuführen:

  • overwrite überschreibt alle vorhandenen Daten im Zielpfad mit dem DataFrame-Inhalt.
  • append fügt den Inhalt des DataFrame an Daten im Zielpfad an.
  • ignore lässt den Schreibvorgang ohne Meldung fehlschlagen, wenn Daten im Zielpfad vorhanden sind.

Im folgenden Beispiel wird das Überschreiben von Daten mit DataFrame-Inhalten als CSV-Dateien veranschaulicht:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Nächste Schritte

Informationen zu weiteren Spark-Funktionen in Databricks finden Sie unter: