Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Puede usar el COPY INTO comando SQL para cargar datos desde una ubicación de archivo en una tabla Delta.
COPY INTO es reintentable e idempotente: los archivos de la ubicación de origen que ya se han cargado se omiten en ejecuciones posteriores.
COPY INTO ofrece estas funcionalidades:
- Filtros de archivos o carpetas fácilmente configurables desde el almacenamiento en la nube, incluidos los volúmenes S3, ADLS, ABFS, GCS y Catálogo de Unity.
- Compatibilidad con varios formatos de archivo de origen: CSV, JSON, XML, Avro, ORC, Parquet, text y archivos binarios.
- Procesamiento de archivos exactamente una vez (idempotentes) de forma predeterminada.
- Inferencia de esquema de tabla de destino, asignación, combinación y evolución.
Nota:
Para una experiencia de ingesta de archivos más escalable y sólida, Databricks recomienda que los usuarios de SQL usen tablas de streaming. Para obtener más información, consulte Streaming tables (Tablas de streaming).
Advertencia
COPY INTO respeta la configuración del área de trabajo para los vectores de eliminación. Si está habilitado, los vectores de eliminación se habilitan en la tabla de destino cuando COPY INTO se ejecuta en un almacenamiento de SQL o en un proceso que ejecuta Databricks Runtime 14.0 o superior. Después de habilitar los vectores de eliminación, bloquean las consultas contra una tabla en Databricks Runtime 11.3 LTS y versiones anteriores. Consulte Vectores de eliminación en Databricks y Habilitar automáticamente vectores de eliminación.
Antes de empezar
Un administrador de cuentas debe seguir los pasos descritos en Configuración del acceso a datos para la ingesta para configurar el acceso a los datos en el almacenamiento de objetos en la nube antes de que los usuarios puedan cargar datos mediante COPY INTO.
Carga de datos en una tabla de Delta Lake sin esquema
En Databricks Runtime 11.3 LTS y versiones posteriores, puede crear tablas Delta de marcador de posición vacías para que el esquema se infiera durante un COPY INTO comando estableciendo mergeSchema en true en COPY_OPTIONS. En el ejemplo siguiente se usa el conjunto de datos wanderbricks . Reemplace <catalog>, <schema>y <volume> por un catálogo, un esquema y un volumen donde tenga CREATE TABLE permisos.
SQL
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;
COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Python
table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
" COPY_OPTIONS ('mergeSchema' = 'true')",
sep = ""
))
Scala
val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
Esta instrucción SQL es idempotente. Esto significa que puede programar que se ejecute repetidamente y solo cargará nuevos datos en la tabla Delta.
Nota:
La tabla Delta vacía no se puede usar fuera de COPY INTO.
INSERT INTO y MERGE INTO no se admiten para escribir datos en tablas Delta sin esquema. Una vez insertados los datos en la tabla con COPY INTO, la tabla se puede consultar.
Consulte Creación de tablas de destino para COPY INTO.
Establecimiento del esquema y carga de datos en una tabla de Delta Lake
En el ejemplo siguiente se crea una tabla Delta y se usa el COPY INTO comando SQL para cargar datos de ejemplo del conjunto de datos de Wanderbricks en la tabla. Los archivos de origen son archivos JSON almacenados en un volumen de catálogo de Unity. Puede ejecutar el ejemplo Python, R, Scala o código SQL desde un cuaderno asociado a un clúster de Azure Databricks. También puede ejecutar el código SQL desde una consulta asociada a una instancia de SQL Warehouse en Databricks SQL. Reemplace <catalog>, <schema>y <volume> por un catálogo, un esquema y un volumen donde tenga CREATE TABLE permisos.
SQL
DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;
CREATE TABLE <catalog>.<schema>.booking_updates_upload (
booking_id BIGINT,
user_id BIGINT,
status STRING,
total_amount DOUBLE
);
COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');
SELECT * FROM <catalog>.<schema>.booking_updates_upload;
Python
table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"booking_id BIGINT, " + \
"user_id BIGINT, " + \
"status STRING, " + \
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)
display(booking_updates_upload_data)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"booking_id BIGINT, ",
"user_id BIGINT, ",
"status STRING, ",
"total_amount DOUBLE)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('multiLine' = 'true')",
sep = ""
))
booking_updates_upload_data = tableToDF(table_name)
display(booking_updates_upload_data)
Scala
val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"booking_id BIGINT, " +
"user_id BIGINT, " +
"status STRING, " +
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
val booking_updates_upload_data = spark.table(table_name)
display(booking_updates_upload_data)
Para limpiarlo, ejecute el código siguiente para eliminar la tabla de ejemplo.
SQL
DROP TABLE <catalog>.<schema>.booking_updates_upload
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
Limpieza de archivos de metadatos
Puede ejecutar VACUUM para limpiar los archivos de metadatos sin referencia creados por COPY INTO en Databricks Runtime 15.2 y versiones posteriores.
Recursos adicionales
- Carga de datos mediante COPY INTO con volúmenes o ubicaciones externas de Unity Catalog
-
Patrones comunes de carga de datos mediante
COPY INTO.
- Databricks Runtime 7.x y versiones posteriores:
COPY INTO