Comparteix via


Patrones comunes de carga de datos

El cargador automático simplifica una serie de tareas comunes de ingesta de datos. Esta referencia rápida proporciona ejemplos para varios patrones populares.

Ingesta de datos del almacenamiento de objetos en la nube como variante

El cargador automático puede cargar todos los datos de los orígenes de archivos admitidos como una sola VARIANT columna en una tabla de destino. Dado que VARIANT es flexible para los cambios de esquema y tipo y mantiene la distinción de mayúsculas y minúsculas y NULL los valores presentes en el origen de datos, este patrón es sólido para la mayoría de los escenarios de ingesta. Para más información, consulte Ingesta de datos del almacenamiento de objetos en la nube como variante.

Filtrado de directorios o archivos mediante patrones globales

Los patrones globales se pueden usar para filtrar directorios y archivos cuando se proporcionan en la ruta de acceso.

Modelo Descripción
? Coincide con cualquier carácter individual
* Coincide con cero o más caracteres
[abc] Coincide con un solo carácter del juego de caracteres {a,b,c}.
[a-z] Coincide con un solo carácter del intervalo de caracteres {a...z}.
[^a] Corresponde a un solo carácter que no es del conjunto de caracteres o del intervalo {a}. Tenga en cuenta que el carácter ^ debe aparecer inmediatamente a la derecha del corchete de apertura.
{ab,cd} Coincide con una cadena del conjunto de cadenas {ab, cd}.
{ab,c{de, fh}} Coincide con una cadena del conjunto de cadenas {ab, cde, cfh}.

Use el valor path para proporcionar patrones de prefijo, por ejemplo:

Pitón

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Importante

Debe usar la opción pathGlobFilter para proporcionar explícitamente patrones de sufijo. El valor path solo proporciona un filtro de prefijo.

Por ejemplo, si desea analizar solo los archivos png de un directorio que contiene archivos con sufijos diferentes, puede hacer lo siguiente:

Pitón

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Nota

El comportamiento global predeterminado de Auto Loader es diferente del comportamiento predeterminado de otros orígenes de archivos de Spark. Agregue .option("cloudFiles.useStrictGlobber", "true") a la lectura para usar una globalización que coincida con el comportamiento predeterminado de Spark en los orígenes de archivos. Consulte la siguiente tabla para obtener más información al respecto:

Modelo Ruta de acceso del archivo Patrón global predeterminado Patrón global estricto
/a/b /a/b/c/file.txt
/a/b /a/b_dir/c/file.txt No No
/a/b /a/b.txt No No
/a/b/ /a/b.txt No No
/a/*/c/ /a/b/c/file.txt
/a/*/c/ /a/b/c/d/file.txt
/a/*/c/ /a/b/x/y/c/file.txt No
/a/*/c /a/b/c_file.txt No
/a/*/c/ /a/b/c_file.txt No
/a/*/c/ /a/*/cookie/file.txt No
/a/b* /a/b.txt
/a/b* /a/b/file.txt
/a/{0.txt,1.txt} /a/0.txt
/a/*/{0.txt,1.txt} /a/0.txt No No
/a/b/[cde-h]/i/ /a/b/c/i/file.txt

Habilitación de ETL sencilla

Una manera fácil de obtener los datos en Delta Lake sin perder datos es usar el siguiente patrón y habilitar la inferencia de esquemas con Auto Loader. Databricks recomienda ejecutar el código siguiente en un trabajo de Azure Databricks para que reinicie automáticamente la secuencia cuando cambie el esquema de los datos de origen. De forma predeterminada, el esquema se infiere como tipos de cadena, los errores de análisis (no debería haber ninguno si todo permanece como una cadena) irán a _rescued_data y las columnas nuevas producirán un error en la secuencia y evolucionarán el esquema.

Pitón

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Prevención de la pérdida de datos en datos bien estructurados

Cuando conoces tu esquema, pero quieres saber cuándo recibes datos inesperados, Databricks recomienda usar lo siguiente rescuedDataColumn.

Pitón

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Si desea que la secuencia detenga el procesamiento si se introduce un nuevo campo que no coincide con el esquema, puede agregar:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Habilitación de canalizaciones de datos semiestructuradas flexibles

Al recibir datos de un proveedor que introduce nuevas columnas a la información que proporcionan, es posible que no tenga en cuenta exactamente cuándo lo hacen o que no tenga el ancho de banda para actualizar la canalización de datos. Ahora puede aprovechar la evolución del esquema para reiniciar la secuencia y permitir que el cargador automático actualice automáticamente el esquema inferido. También puede usar schemaHints para algunos de los campos sin esquema que podría proporcionar el proveedor.

Pitón

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Transformación de datos JSON anidados

Dado que el cargador automático deduce las columnas JSON de nivel superior como cadenas, puede dejarse con objetos JSON anidados que requieren transformaciones adicionales. Puede usar las API de acceso a datos semiestructurados para transformar aún más el contenido JSON complejo.

Pitón

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Inferencia de datos JSON anidados

Cuando tenga datos anidados, puede usar la cloudFiles.inferColumnTypes opción para deducir la estructura anidada de los datos y otros tipos de columna.

Pitón

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Carga de archivos CSV sin encabezados

Pitón

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Aplicación de un esquema en archivos CSV con encabezados

Pitón

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Ingesta de datos binarios o imágenes en Delta Lake para ML

Una vez que los datos se almacenan en Delta Lake, puede ejecutar la inferencia distribuida con los datos. Consulte Realización de inferencias distribuidas mediante UDF de Pandas.

Pitón

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Sintaxis de Auto Loader para canalizaciones declarativas de Lakeflow Spark

Las canalizaciones declarativas de Spark de Lakeflow proporcionan una sintaxis de Python ligeramente modificada para Auto Loader y agrega compatibilidad con SQL para Auto Loader.

En los ejemplos siguientes se usa el autocargador para crear conjuntos de datos a partir de archivos CSV y JSON:

Pitón

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dp.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv"
)

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders/",
  format => "json")

Puede usar las opciones de formato admitidas para Auto Loader. Las opciones de read_files son pares clave-valor. Para obtener más información sobre los formatos y opciones admitidos, consulte Opciones.

Por ejemplo:

CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
  FROM STREAM read_files(
    "/Volumes/my_volume/path/to/files/*",
    option-key => option-value,
    ...
  )

En el ejemplo siguiente se leen datos de archivos CSV delimitados por tabulaciones con un encabezado:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv",
  delimiter => "\t",
  header => "true"
)

Puede usar el schema para especificar el formato manualmente; debe especificar el schema para los formatos que no admiten la inferencia de esquema :

Pitón

@dp.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
  format => "parquet",
  schema => "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING"
)

Nota

Las canalizaciones declarativas de Spark de Lakeflow configuran y administran automáticamente los directorios de esquema y punto de comprobación al usar Auto Loader para leer archivos. Sin embargo, si configura manualmente cualquiera de estos directorios, la realización de una actualización completa no afecta al contenido de los directorios configurados. Databricks recomienda usar los directorios configurados automáticamente para evitar efectos secundarios inesperados durante el procesamiento.