Modelli comuni di caricamento dei dati

Il caricatore automatico semplifica una serie di attività comuni di inserimento dati. Questo riferimento rapido fornisce esempi per diversi modelli comuni.

Filtro di directory o file usando modelli GLOB

I modelli Glob possono essere usati per filtrare directory e file quando specificati nel percorso.

Modello Descrizione
? Corrisponde a qualsiasi carattere singolo
* Corrisponde a zero o più caratteri
[abc] Trova la corrispondenza di un singolo carattere del set di caratteri {a,b,c}.
[a-z] Trova la corrispondenza di un singolo carattere dall'intervallo di caratteri {a... z}.
[^a] Trova la corrispondenza di un singolo carattere non incluso nel set di caratteri o nell'intervallo {a}. Si noti che il ^ carattere deve essere immediatamente a destra della parentesi aperta.
{ab,cd} Trova la corrispondenza di una stringa dal set di stringhe {ab, cd}.
{ab,c{de, fh}} Trova la corrispondenza di una stringa dal set di stringhe {ab, cde, cfh}.

path Usare per fornire modelli di prefisso, ad esempio:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Importante

È necessario usare l'opzione pathGlobFilter per fornire in modo esplicito modelli di suffisso. Fornisce path solo un filtro di prefisso.

Ad esempio, se si desidera analizzare solo png i file in una directory contenente file con suffissi diversi, è possibile eseguire le operazioni seguenti:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Nota

Il comportamento predefinito del caricatore automatico è diverso dal comportamento predefinito di altre origini file Spark. Aggiungere .option("cloudFiles.useStrictGlobber", "true") alla lettura per usare il globbing che corrisponde al comportamento predefinito di Spark rispetto alle origini file. Per altre informazioni sul globbing, vedere la tabella seguente:

Modello Percorso file Globber predefinito Globber rigoroso
/a/b /a/b/c/file.txt
/a/b /a/b_dir/c/file.txt No No
/a/b /a/b.txt No No
/a/b/ /a/b.txt No No
/a/*/c/ /a/b/c/file.txt
/a/*/c/ /a/b/c/d/file.txt
/a/*/c/ /a/b/x/y/c/file.txt No
/a/*/c /a/b/c_file.txt No
/a/*/c/ /a/b/c_file.txt No
/a/*/c/ /a/*/cookie/file.txt No
/a/b* /a/b.txt
/a/b* /a/b/file.txt
/a/{0.txt,1.txt} /a/0.txt
/a/*/{0.txt,1.txt} /a/0.txt No No
/a/b/[cde-h]/i/ /a/b/c/i/file.txt

Abilitare easy ETL

Un modo semplice per ottenere i dati in Delta Lake senza perdere dati consiste nell'usare il modello seguente e abilitare l'inferenza dello schema con il caricatore automatico. Databricks consiglia di eseguire il codice seguente in un processo di Azure Databricks per riavviare automaticamente il flusso quando lo schema dei dati di origine cambia. Per impostazione predefinita, lo schema viene dedotto come tipi stringa, tutti gli errori di analisi (non dovrebbero verificarsi se tutto rimane come stringa) andranno a _rescued_datae qualsiasi nuova colonna non riuscirà il flusso ed evolverà lo schema.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Evitare la perdita di dati in dati ben strutturati

Quando si conosce lo schema, ma si vuole sapere ogni volta che si ricevono dati imprevisti, Databricks consiglia di usare .rescuedDataColumn

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Se si vuole che il flusso interrompa l'elaborazione se viene introdotto un nuovo campo che non corrisponde allo schema, è possibile aggiungere:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Abilitare pipeline di dati semistrutturate flessibili

Quando si ricevono dati da un fornitore che introduce nuove colonne alle informazioni fornite, è possibile che non si sappia esattamente quando lo fanno o che non si dispone della larghezza di banda per aggiornare la pipeline di dati. È ora possibile sfruttare l'evoluzione dello schema per riavviare il flusso e consentire al caricatore automatico di aggiornare automaticamente lo schema dedotto. È anche possibile sfruttare schemaHints per alcuni dei campi "senza schema" che il fornitore potrebbe fornire.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Trasformare i dati JSON annidati

Poiché il caricatore automatico deduce le colonne JSON di primo livello come stringhe, è possibile rimanere con oggetti JSON annidati che richiedono ulteriori trasformazioni. È possibile usare le API di accesso ai dati semistrutturate per trasformare ulteriormente il contenuto JSON complesso.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Dedurre i dati JSON annidati

Dopo aver annidato i dati, è possibile usare l'opzione cloudFiles.inferColumnTypes per dedurre la struttura annidata dei dati e di altri tipi di colonna.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Caricare file CSV senza intestazioni

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Applicare uno schema ai file CSV con intestazioni

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Inserire dati immagine o binari in Delta Lake per ML

Dopo aver archiviato i dati in Delta Lake, è possibile eseguire l'inferenza distribuita sui dati. Vedere Eseguire l'inferenza distribuita usando la funzione definita dall'utente pandas.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Sintassi del caricatore automatico per DLT

Le tabelle live delta forniscono una sintassi Python leggermente modificata per Il caricatore automatico aggiunge il supporto SQL per il caricatore automatico.

Gli esempi seguenti usano Il caricatore automatico per creare set di dati da file CSV e JSON:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

È possibile usare le opzioni di formato supportate con Il caricatore automatico. Usando la map() funzione , è possibile passare le opzioni al cloud_files() metodo . Le opzioni sono coppie chiave-valore, in cui le chiavi e i valori sono stringhe. Di seguito viene descritta la sintassi per l'uso del caricatore automatico in SQL:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

L'esempio seguente legge i dati dai file CSV delimitati da tabulazioni con un'intestazione:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

È possibile utilizzare schema per specificare manualmente il formato. È necessario specificare per i formati che non supportano l'inferenza schemadello schema:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Nota

Le tabelle live delta configurano e gestiscono automaticamente le directory dello schema e del checkpoint quando si usa il caricatore automatico per leggere i file. Tuttavia, se si configura manualmente una di queste directory, l'esecuzione di un aggiornamento completo non influisce sul contenuto delle directory configurate. Databricks consiglia di usare le directory configurate automaticamente per evitare effetti collaterali imprevisti durante l'elaborazione.