Allgemeine Muster zum Laden von Daten

Autoloader vereinfacht eine Reihe allgemeiner Aufgaben für die Datenerfassung. Diese Kurzübersicht enthält Beispiele für mehrere beliebte Muster.

Filtern von Verzeichnissen oder Dateien mit Globmustern

Globmuster können zum Filtern von Verzeichnissen und Dateien verwendet werden, wenn sie im Pfad angegeben werden.

Muster BESCHREIBUNG
? Führt einen Abgleich für ein einzelnes Zeichen durch
* Entspricht null oder mehr Zeichen
[abc] Entspricht einem einzelnen Zeichen aus dem Zeichensatz {a,b,c}.
[a-z] Entspricht einem einzelnen Zeichen aus dem Zeichenbereich {a... z}.
[^a] Entspricht einem einzelnen Zeichen, das nicht aus dem Zeichensatz oder Bereich {a} stammt. Beachten Sie, dass das Zeichen ^ sofort rechts neben der öffnenden Klammer auftreten muss.
{ab,cd} Entspricht einer Zeichenfolge aus dem Zeichenfolgensatz {ab, cd}.
{ab,c{de, fh}} Entspricht einer Zeichenfolge aus dem Zeichenfolgensatz {ab, cde, cfh}.

Verwenden Sie zum Bereitstellen von Präfixmustern path, z. B.:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Wichtig

Sie müssen die Option pathGlobFilter zum expliziten Bereitstellen von Suffixmustern verwenden. path stellt nur einen Präfixfilter bereit.

Wenn Sie beispielsweise nur png-Dateien in einem Verzeichnis analysieren möchten, das Dateien mit unterschiedlichen Suffixen enthält, haben Sie folgende Möglichkeiten:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Hinweis

Das Standard-Globbingverhalten des Autoloaders unterscheidet sich vom Standardverhalten anderer Spark-Dateiquellen. Fügen Sie Ihrem Lesevorgang .option("cloudFiles.useStrictGlobber", "true") hinzu, um für Dateiquellen Globbing zu verwenden, das dem Spark-Standardverhalten entspricht. Weitere Informationen zu Globbing finden Sie in der folgenden Tabelle:

Muster Dateipfad Standard-Globber Strenger Globber
/a/b /a/b/c/file.txt Ja Ja
/a/b /a/b_dir/c/file.txt Nein Nein
/a/b /a/b.txt Nein Nein
/a/b/ /a/b.txt Nein Nein
/a/*/c/ /a/b/c/file.txt Ja Ja
/a/*/c/ /a/b/c/d/file.txt Ja Ja
/a/*/c/ /a/b/x/y/c/file.txt Ja Nein
/a/*/c /a/b/c_file.txt Ja Nein
/a/*/c/ /a/b/c_file.txt Ja Nein
/a/*/c/ /a/*/cookie/file.txt Ja Nein
/a/b* /a/b.txt Ja Ja
/a/b* /a/b/file.txt Ja Ja
/a/{0.txt,1.txt} /a/0.txt Ja Ja
/a/*/{0.txt,1.txt} /a/0.txt Nein Nein
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Ja Ja

Aktivieren der einfachen ETL

Eine einfache Möglichkeit, Ihre Daten ohne Datenverlust in Delta Lake zu übertragen, ist die Verwendung des folgenden Musters und die Aktivierung des Schemarückschlusses mit Autoloader. Databricks empfiehlt, den folgenden Code in einem Azure Databricks-Auftrag auszuführen, damit Ihr Stream automatisch neu gestartet wird, wenn sich das Schema Ihrer Quelldaten ändert. Standardmäßig wird das Schema als Zeichenfolgentyp abgeleitet, alle Analysefehler (es sollte keine geben, wenn alles als Zeichenfolge vorliegt) werden an _rescued_data weitergeleitet, und alle neuen Spalten lassen den Stream fehlschlagen und entwickeln das Schema weiter.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Verhindern von Datenverlust in gut strukturierten Daten

Wenn Sie Ihr Schema kennen, aber wissen möchten, wann Sie unerwartete Daten erhalten, empfiehlt Databricks die Verwendung von rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Falls Ihr Stream die Verarbeitung beenden soll, wenn ein neues Feld eingeführt wird, das nicht mit Ihrem Schema übereinstimmt, können Sie Folgendes hinzufügen:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Aktivieren flexibler semistrukturierter Datenpipelines

Wenn Sie Daten von einem Anbieter erhalten, der neue Spalten in die von ihm bereitgestellten Informationen einführt, wissen Sie möglicherweise nicht genau, wann er dies tut, oder Sie haben nicht die Bandbreite, um Ihre Datenpipeline zu aktualisieren. Sie können nun die Schemaentwicklung nutzen, um den Stream neu zu starten und das abgeleitete Schema automatisch von Autoloader aktualisieren zu lassen. Sie können schemaHints auch für einige der „schemalosen“ Felder nutzen, die der Anbieter möglicherweise bereitstellt.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Hinzufügen von geschachtelten JSON-Daten

Da Auto Loader die JSON-Spalten der obersten Ebene als Zeichenfolgen ableitet, können Sie mit verschachtelten JSON-Objekten zurückbleiben, die weitere Transformationen erfordern. Sie können die APIs für den semistrukturierten Datenzugriff nutzen, um komplexe JSON-Inhalte weiter zu transformieren.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Hinzufügen von geschachtelten JSON-Daten

Wenn Sie geschachtelte Daten haben, können Sie die cloudFiles.inferColumnTypes-Option verwenden, um die geschachtelte Struktur Ihrer Daten und anderer Spaltentypen abzuleiten.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Laden von CSV-Dateien ohne Kopfzeilen

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Erzwingen eines Schemas für CSV-Dateien mit Headern

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Nehmen Sie Bild- oder Binärdaten in Delta Lake für ML auf

Sobald die Daten in Delta Lake gespeichert sind, können Sie verteilte Rückschlüsse auf die Daten ausführen. Siehe Verteilte Inferenz mit Pandas-UDF durchführen.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Autoloader-Syntax für DLT

Delta Live Tables bietet eine leicht geänderte Python-Syntax für Auto Loader und fügt SQL-Unterstützung für Auto Loader hinzu.

In den folgenden Beispielen wird der Autoloader verwendet, um Datasets aus CSV- und JSON-Dateien zu erstellen:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Sie können unterstützte Formatoptionen mit dem Autoloader verwenden. Mithilfe der map()-Funktion können Sie Optionen an die cloud_files()-Methode übergeben. Die Optionen sind Schlüssel-Wert-Paare, bei denen die Schlüssel und Werte Zeichenfolgen sind. Im Folgenden wird die Syntax für die Arbeit mit dem Autoloader in SQL beschrieben:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Im folgenden Beispiel werden Daten aus CSV-Dateien mit Trennzeichen und einem Header gelesen:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

Sie können das schema verwenden, um das Format manuell anzugeben. Sie müssen das schema für Formate angeben, die keine Schemarückschlüsse unterstützen:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Hinweis

Delta Live Tables konfiguriert und verwaltet die Schema- und Prüfpunktverzeichnisse automatisch, wenn der Autoloader zum Lesen von Dateien verwendet wird. Wenn Sie jedoch eines dieser Verzeichnisse manuell konfigurieren, wirkt sich eine vollständige Aktualisierung nicht auf den Inhalt der konfigurierten Verzeichnisse aus. Databricks empfiehlt die Verwendung der automatisch konfigurierten Verzeichnisse, um unerwartete Nebenwirkungen bei der Verarbeitung zu vermeiden.