Patrones comunes de carga de datos

El cargador automático simplifica una serie de tareas comunes de ingesta de datos. Esta referencia rápida proporciona ejemplos para varios patrones populares.

Filtrado de directorios o archivos mediante patrones globales

Los patrones globales se pueden usar para filtrar directorios y archivos cuando se proporcionan en la ruta de acceso.

Patrón Descripción
? Coincide con cualquier carácter individual.
* Coincide con cero o más caracteres.
[abc] Coincide con un solo carácter del juego de caracteres {a,b,c}.
[a-z] Coincide con un solo carácter del intervalo de caracteres {a...z}.
[^a] Coincide con un solo carácter que no es del juego de caracteres o el intervalo {a}. Tenga en cuenta que el carácter ^ debe aparecer inmediatamente a la derecha del corchete de apertura.
{ab,cd} Coincide con una cadena del conjunto de cadenas {ab, cd}.
{ab,c{de, fh}} Coincide con una cadena del conjunto de cadenas {ab, cde, cfh}.

Use el valor path para proporcionar patrones de prefijo, por ejemplo:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Importante

Debe usar la opción pathGlobFilter para proporcionar explícitamente patrones de sufijo. El valor path solo proporciona un filtro de prefijo.

Por ejemplo, si desea analizar solo los archivos png de un directorio que contiene archivos con sufijos diferentes, puede hacer lo siguiente:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Nota:

El comportamiento global predeterminado de Auto Loader es diferente del comportamiento predeterminado de otros orígenes de archivos de Spark. Agregue .option("cloudFiles.useStrictGlobber", "true") a la lectura para usar una globalización que coincida con el comportamiento predeterminado de Spark en los orígenes de archivos. Consulte la siguiente tabla para obtener más información al respecto:

Patrón Ruta de acceso del archivo Patrón global predeterminado Patrón global estricto
/a/b /a/b/c/file.txt
/a/b /a/b_dir/c/file.txt No No
/a/b /a/b.txt No No
/a/b/ /a/b.txt No No
/a/*/c/ /a/b/c/file.txt
/a/*/c/ /a/b/c/d/file.txt
/a/*/c/ /a/b/x/y/c/file.txt No
/a/*/c /a/b/c_file.txt No
/a/*/c/ /a/b/c_file.txt No
/a/*/c/ /a/*/cookie/file.txt No
/a/b* /a/b.txt
/a/b* /a/b/file.txt
/a/{0.txt,1.txt} /a/0.txt
/a/*/{0.txt,1.txt} /a/0.txt No No
/a/b/[cde-h]/i/ /a/b/c/i/file.txt

Habilitación de ETL sencilla

Una manera sencilla de obtener los datos en Delta Lake sin perder ningún dato consiste en usar el siguiente patrón y habilitar la inferencia de esquemas con el cargador automático. Databricks recomienda ejecutar el código siguiente en un trabajo de Azure Databricks para que reinicie automáticamente la secuencia cuando cambie el esquema de los datos de origen. De forma predeterminada, el esquema se infiere como tipos de cadena, los errores de análisis (no debería haber ninguno si todo permanece como una cadena) irán a _rescued_data y las columnas nuevas producirán un error en la secuencia y evolucionarán el esquema.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Prevención de la pérdida de datos en datos bien estructurados

Si conoce el esquema, pero quiere saber cuándo recibe datos inesperados, Databricks recomienda usar rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Si quiere que la secuencia deje de procesar si se introduce un nuevo campo que no coincide con el esquema, puede agregar lo siguiente:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Habilitación de canalizaciones de datos semiestructuradas flexibles

Cuando recibe datos de un proveedor que introduce nuevas columnas en la información que proporciona, es posible que usted no sepa exactamente cuándo lo hace o que no disponga del ancho de banda suficiente para actualizar la canalización de datos. Ahora puede aprovechar la evolución del esquema para reiniciar la secuencia y permitir que el cargador automático actualice automáticamente el esquema inferido. También puede usar schemaHints para algunos de los campos sin esquema que podría proporcionar el proveedor.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Transformación de datos JSON anidados

Dado que el cargador automático infiere las columnas JSON de nivel superior como cadenas, puede quedarse con objetos JSON anidados que requieren transformaciones adicionales. Puede usar las API de acceso a datos semiestructurados para transformar aún más el contenido JSON complejo.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Inferencia de datos JSON anidados

Cuando tenga datos anidados, puede usar la opción cloudFiles.inferColumnTypes para deducir la estructura anidada de los datos y otros tipos de columna.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Carga de archivos CSV sin encabezados

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Aplicación de un esquema en archivos CSV con encabezados

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Ingesta de datos binarios o de imagen en Delta Lake para ML

Una vez que los datos se almacenan en Delta Lake, puede ejecutar la inferencia distribuida con los datos. Consulte Realización de inferencias distribuidas mediante UDF de Pandas.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Sintaxis de Auto Loader para DLT

Delta Live Tables proporciona una sintaxis de Python ligeramente modificada para Auto Loader y agrega compatibilidad con SQL para Auto Loader.

En los ejemplos siguientes se usa el cargador automático para crear conjuntos de datos a partir de archivos CSV y JSON:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Puede usar las opciones de formato admitidas con el cargador automático. Con la función map(), puede pasar opciones al método cloud_files(). Las opciones son pares clave-valor, donde las claves y los valores son cadenas. A continuación se describe la sintaxis para trabajar con Auto Loader en SQL:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

En el ejemplo siguiente se leen datos de archivos CSV delimitados por tabulaciones con un encabezado:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

Puede usar schema para especificar el formato manualmente; debe especificar schema para los formatos que no admiten la inferencia de esquemas:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Nota:

Delta Live Tables configura y administra automáticamente los directorios de esquema y punto de control cuando se usa el cargador automático para leer archivos. Sin embargo, si configura manualmente cualquiera de estos directorios, la realización de una actualización completa no afecta al contenido de los directorios configurados. Databricks recomienda usar los directorios configurados automáticamente para evitar efectos secundarios inesperados durante el procesamiento.