Patrones comunes de carga de datos
El cargador automático simplifica una serie de tareas comunes de ingesta de datos. Esta referencia rápida proporciona ejemplos para varios patrones populares.
Filtrado de directorios o archivos mediante patrones globales
Los patrones globales se pueden usar para filtrar directorios y archivos cuando se proporcionan en la ruta de acceso.
Patrón | Descripción |
---|---|
? |
Coincide con cualquier carácter individual. |
* |
Coincide con cero o más caracteres. |
[abc] |
Coincide con un solo carácter del juego de caracteres {a,b,c}. |
[a-z] |
Coincide con un solo carácter del intervalo de caracteres {a...z}. |
[^a] |
Coincide con un solo carácter que no es del juego de caracteres o el intervalo {a}. Tenga en cuenta que el carácter ^ debe aparecer inmediatamente a la derecha del corchete de apertura. |
{ab,cd} |
Coincide con una cadena del conjunto de cadenas {ab, cd}. |
{ab,c{de, fh}} |
Coincide con una cadena del conjunto de cadenas {ab, cde, cfh}. |
Use el valor path
para proporcionar patrones de prefijo, por ejemplo:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Importante
Debe usar la opción pathGlobFilter
para proporcionar explícitamente patrones de sufijo. El valor path
solo proporciona un filtro de prefijo.
Por ejemplo, si desea analizar solo los archivos png
de un directorio que contiene archivos con sufijos diferentes, puede hacer lo siguiente:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Nota:
El comportamiento global predeterminado de Auto Loader es diferente del comportamiento predeterminado de otros orígenes de archivos de Spark. Agregue .option("cloudFiles.useStrictGlobber", "true")
a la lectura para usar una globalización que coincida con el comportamiento predeterminado de Spark en los orígenes de archivos. Consulte la siguiente tabla para obtener más información al respecto:
Patrón | Ruta de acceso del archivo | Patrón global predeterminado | Patrón global estricto |
---|---|---|---|
/a/b | /a/b/c/file.txt | Sí | Sí |
/a/b | /a/b_dir/c/file.txt | No | No |
/a/b | /a/b.txt | No | No |
/a/b/ | /a/b.txt | No | No |
/a/*/c/ | /a/b/c/file.txt | Sí | Sí |
/a/*/c/ | /a/b/c/d/file.txt | Sí | Sí |
/a/*/c/ | /a/b/x/y/c/file.txt | Sí | No |
/a/*/c | /a/b/c_file.txt | Sí | No |
/a/*/c/ | /a/b/c_file.txt | Sí | No |
/a/*/c/ | /a/*/cookie/file.txt | Sí | No |
/a/b* | /a/b.txt | Sí | Sí |
/a/b* | /a/b/file.txt | Sí | Sí |
/a/{0.txt,1.txt} | /a/0.txt | Sí | Sí |
/a/*/{0.txt,1.txt} | /a/0.txt | No | No |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Sí | Sí |
Habilitación de ETL sencilla
Una manera sencilla de obtener los datos en Delta Lake sin perder ningún dato consiste en usar el siguiente patrón y habilitar la inferencia de esquemas con el cargador automático. Databricks recomienda ejecutar el código siguiente en un trabajo de Azure Databricks para que reinicie automáticamente la secuencia cuando cambie el esquema de los datos de origen. De forma predeterminada, el esquema se infiere como tipos de cadena, los errores de análisis (no debería haber ninguno si todo permanece como una cadena) irán a _rescued_data
y las columnas nuevas producirán un error en la secuencia y evolucionarán el esquema.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Prevención de la pérdida de datos en datos bien estructurados
Si conoce el esquema, pero quiere saber cuándo recibe datos inesperados, Databricks recomienda usar rescuedDataColumn
.
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Si quiere que la secuencia deje de procesar si se introduce un nuevo campo que no coincide con el esquema, puede agregar lo siguiente:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Habilitación de canalizaciones de datos semiestructuradas flexibles
Cuando recibe datos de un proveedor que introduce nuevas columnas en la información que proporciona, es posible que usted no sepa exactamente cuándo lo hace o que no disponga del ancho de banda suficiente para actualizar la canalización de datos. Ahora puede aprovechar la evolución del esquema para reiniciar la secuencia y permitir que el cargador automático actualice automáticamente el esquema inferido. También puede usar schemaHints
para algunos de los campos sin esquema que podría proporcionar el proveedor.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Transformación de datos JSON anidados
Dado que el cargador automático infiere las columnas JSON de nivel superior como cadenas, puede quedarse con objetos JSON anidados que requieren transformaciones adicionales. Puede usar las API de acceso a datos semiestructurados para transformar aún más el contenido JSON complejo.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Inferencia de datos JSON anidados
Cuando tenga datos anidados, puede usar la opción cloudFiles.inferColumnTypes
para deducir la estructura anidada de los datos y otros tipos de columna.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Carga de archivos CSV sin encabezados
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Aplicación de un esquema en archivos CSV con encabezados
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Ingesta de datos binarios o de imagen en Delta Lake para ML
Una vez que los datos se almacenan en Delta Lake, puede ejecutar la inferencia distribuida con los datos. Consulte Realización de inferencias distribuidas mediante UDF de Pandas.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Sintaxis de Auto Loader para DLT
Delta Live Tables proporciona una sintaxis de Python ligeramente modificada para Auto Loader y agrega compatibilidad con SQL para Auto Loader.
En los ejemplos siguientes se usa el cargador automático para crear conjuntos de datos a partir de archivos CSV y JSON:
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
Puede usar las opciones de formato admitidas con el cargador automático. Con la función map()
, puede pasar opciones al método cloud_files()
. Las opciones son pares clave-valor, donde las claves y los valores son cadenas. A continuación se describe la sintaxis para trabajar con Auto Loader en SQL:
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
En el ejemplo siguiente se leen datos de archivos CSV delimitados por tabulaciones con un encabezado:
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
Puede usar schema
para especificar el formato manualmente; debe especificar schema
para los formatos que no admiten la inferencia de esquemas:
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
Nota:
Delta Live Tables configura y administra automáticamente los directorios de esquema y punto de control cuando se usa el cargador automático para leer archivos. Sin embargo, si configura manualmente cualquiera de estos directorios, la realización de una actualización completa no afecta al contenido de los directorios configurados. Databricks recomienda usar los directorios configurados automáticamente para evitar efectos secundarios inesperados durante el procesamiento.