Modelli di caricamento dei dati comuni

Il caricatore automatico semplifica una serie di attività comuni di inserimento dei dati. Questo riferimento rapido fornisce esempi per diversi modelli popolari.

Filtro di directory o file usando modelli glob

I modelli Glob possono essere usati per filtrare directory e file quando forniti nel percorso.

Modello Descrizione
? Corrisponde a qualsiasi carattere singolo
* Corrisponde a zero o più caratteri
[abc] Corrisponde a un singolo carattere dal set di caratteri {a,b,c}.
[a-z] Corrisponde a un singolo carattere dall'intervallo di caratteri {a... z}.
[^a] Corrisponde a un singolo carattere che non proviene dal set di caratteri o dall'intervallo {a}. Si noti che il ^ carattere deve verificarsi immediatamente a destra della parentesi quadre di apertura.
{ab,cd} Corrisponde a una stringa dal set di stringhe {ab, cd}.
{ab,c{de, fh}} Corrisponde a una stringa dal set di stringhe {ab, cde, cfh}.

Usare l'oggetto path per fornire modelli di prefisso, ad esempio:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base_path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base_path>/*/files")

Importante

È necessario usare l'opzione pathGlobFilter per fornire in modo esplicito modelli di suffisso. L'unico path oggetto fornisce un filtro prefisso.

Ad esempio, se si desidera analizzare solo png i file in una directory contenente file con suffissi diversi, è possibile eseguire le operazioni seguenti:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base_path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base_path>)

Abilitare facilmente ETL

Un modo semplice per ottenere i dati in Delta Lake senza perdere dati consiste nell'usare il modello seguente e abilitare l'inferenza dello schema con Il caricatore automatico. Databricks consiglia di eseguire il codice seguente in un processo di Azure Databricks per riavviare automaticamente il flusso quando lo schema dei dati di origine cambia. Per impostazione predefinita, lo schema viene dedotto come tipi di stringa, eventuali errori di analisi (non dovrebbe essere presente se tutto rimane come stringa) passano a _rescued_datae qualsiasi nuova colonna avrà esito negativo nel flusso ed evolverà lo schema.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>")
  .load("<path_to_source_data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Impedire la perdita di dati in dati ben strutturati

Quando si conosce lo schema, ma si vuole conoscere ogni volta che si ricevono dati imprevisti, Databricks consiglia di usare .rescuedDataColumn

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Se si vuole che il flusso interrompa l'elaborazione se viene introdotto un nuovo campo che non corrisponde allo schema, è possibile aggiungere:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Abilitare pipeline di dati semistrutturate flessibili

Quando si ricevono dati da un fornitore che introduce nuove colonne alle informazioni fornite, potrebbe non essere consapevole esattamente quando lo fanno o potrebbe non avere la larghezza di banda per aggiornare la pipeline di dati. È ora possibile sfruttare l'evoluzione dello schema per riavviare il flusso e consentire all'aggiornamento automatico dello schema dedotto automaticamente. È anche possibile sfruttare schemaHints per alcuni dei campi "senza schema" che il fornitore può fornire.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Trasformare i dati JSON annidati

Poiché il caricatore automatico inferisce le colonne JSON di primo livello come stringhe, è possibile essere lasciati con oggetti JSON annidati che richiedono ulteriori trasformazioni. È possibile usare le API di accesso ai dati semistrutturate per trasformare ulteriormente il contenuto JSON complesso.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .load("<source_data_with_nested_json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<source_data_with_nested_json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Inferi dati JSON annidati

Quando sono stati annidati i dati, è possibile usare l'opzione per dedurre la cloudFiles.inferColumnTypes struttura annidata dei dati e altri tipi di colonna.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source_data_with_nested_json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source_data_with_nested_json>")

Caricare file CSV senza intestazioni

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Applicare uno schema nei file CSV con intestazioni

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Inserire dati immagine o binari in Delta Lake per ML

Dopo aver archiviato i dati in Delta Lake, è possibile eseguire l'inferenza distribuita sui dati. Vedere Eseguire l'inferenza distribuita usando pandas UDF.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")